Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph...
authorLinus Torvalds <torvalds@linux-foundation.org>
Tue, 9 Jul 2013 19:39:10 +0000 (12:39 -0700)
committerLinus Torvalds <torvalds@linux-foundation.org>
Tue, 9 Jul 2013 19:39:10 +0000 (12:39 -0700)
Pull Ceph updates from Sage Weil:
 "There is some follow-on RBD cleanup after the last window's code drop,
  a series from Yan fixing multi-mds behavior in cephfs, and then a
  sprinkling of bug fixes all around.  Some warnings, sleeping while
  atomic, a null dereference, and cleanups"

* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client: (36 commits)
  libceph: fix invalid unsigned->signed conversion for timespec encoding
  libceph: call r_unsafe_callback when unsafe reply is received
  ceph: fix race between cap issue and revoke
  ceph: fix cap revoke race
  ceph: fix pending vmtruncate race
  ceph: avoid accessing invalid memory
  libceph: Fix NULL pointer dereference in auth client code
  ceph: Reconstruct the func ceph_reserve_caps.
  ceph: Free mdsc if alloc mdsc->mdsmap failed.
  ceph: remove sb_start/end_write in ceph_aio_write.
  ceph: avoid meaningless calling ceph_caps_revoking if sync_mode == WB_SYNC_ALL.
  ceph: fix sleeping function called from invalid context.
  ceph: move inode to proper flushing list when auth MDS changes
  rbd: fix a couple warnings
  ceph: clear migrate seq when MDS restarts
  ceph: check migrate seq before changing auth cap
  ceph: fix race between page writeback and truncate
  ceph: reset iov_len when discarding cap release messages
  ceph: fix cap release race
  libceph: fix truncate size calculation
  ...

15 files changed:
drivers/block/rbd.c
fs/ceph/addr.c
fs/ceph/caps.c
fs/ceph/file.c
fs/ceph/inode.c
fs/ceph/locks.c
fs/ceph/mds_client.c
fs/ceph/mdsmap.c
fs/ceph/super.c
fs/ceph/super.h
fs/ceph/xattr.c
include/linux/ceph/decode.h
include/linux/ceph/osd_client.h
net/ceph/auth_none.c
net/ceph/osd_client.c

index aff789d..4ad2ad9 100644 (file)
@@ -372,7 +372,7 @@ enum rbd_dev_flags {
        RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
 };
 
-static DEFINE_MUTEX(ctl_mutex);          /* Serialize open/close/setup/teardown */
+static DEFINE_MUTEX(client_mutex);     /* Serialize client creation */
 
 static LIST_HEAD(rbd_dev_list);    /* devices */
 static DEFINE_SPINLOCK(rbd_dev_list_lock);
@@ -489,10 +489,8 @@ static int rbd_open(struct block_device *bdev, fmode_t mode)
        if (removing)
                return -ENOENT;
 
-       mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
        (void) get_device(&rbd_dev->dev);
        set_device_ro(bdev, rbd_dev->mapping.read_only);
-       mutex_unlock(&ctl_mutex);
 
        return 0;
 }
@@ -507,9 +505,7 @@ static void rbd_release(struct gendisk *disk, fmode_t mode)
        spin_unlock_irq(&rbd_dev->lock);
        rbd_assert(open_count_before > 0);
 
-       mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
        put_device(&rbd_dev->dev);
-       mutex_unlock(&ctl_mutex);
 }
 
 static const struct block_device_operations rbd_bd_ops = {
@@ -520,7 +516,7 @@ static const struct block_device_operations rbd_bd_ops = {
 
 /*
  * Initialize an rbd client instance.  Success or not, this function
- * consumes ceph_opts.
+ * consumes ceph_opts.  Caller holds client_mutex.
  */
 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
 {
@@ -535,30 +531,25 @@ static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
        kref_init(&rbdc->kref);
        INIT_LIST_HEAD(&rbdc->node);
 
-       mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
-
        rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
        if (IS_ERR(rbdc->client))
-               goto out_mutex;
+               goto out_rbdc;
        ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
 
        ret = ceph_open_session(rbdc->client);
        if (ret < 0)
-               goto out_err;
+               goto out_client;
 
        spin_lock(&rbd_client_list_lock);
        list_add_tail(&rbdc->node, &rbd_client_list);
        spin_unlock(&rbd_client_list_lock);
 
-       mutex_unlock(&ctl_mutex);
        dout("%s: rbdc %p\n", __func__, rbdc);
 
        return rbdc;
-
-out_err:
+out_client:
        ceph_destroy_client(rbdc->client);
-out_mutex:
-       mutex_unlock(&ctl_mutex);
+out_rbdc:
        kfree(rbdc);
 out_opt:
        if (ceph_opts)
@@ -682,11 +673,13 @@ static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
 {
        struct rbd_client *rbdc;
 
+       mutex_lock_nested(&client_mutex, SINGLE_DEPTH_NESTING);
        rbdc = rbd_client_find(ceph_opts);
        if (rbdc)       /* using an existing client */
                ceph_destroy_options(ceph_opts);
        else
                rbdc = rbd_client_create(ceph_opts);
+       mutex_unlock(&client_mutex);
 
        return rbdc;
 }
@@ -840,7 +833,6 @@ static int rbd_header_from_disk(struct rbd_device *rbd_dev,
 
        /* We won't fail any more, fill in the header */
 
-       down_write(&rbd_dev->header_rwsem);
        if (first_time) {
                header->object_prefix = object_prefix;
                header->obj_order = ondisk->options.order;
@@ -869,8 +861,6 @@ static int rbd_header_from_disk(struct rbd_device *rbd_dev,
                if (rbd_dev->mapping.size != header->image_size)
                        rbd_dev->mapping.size = header->image_size;
 
-       up_write(&rbd_dev->header_rwsem);
-
        return 0;
 out_2big:
        ret = -EIO;
@@ -1126,6 +1116,7 @@ static void zero_bio_chain(struct bio *chain, int start_ofs)
                                buf = bvec_kmap_irq(bv, &flags);
                                memset(buf + remainder, 0,
                                       bv->bv_len - remainder);
+                               flush_dcache_page(bv->bv_page);
                                bvec_kunmap_irq(buf, &flags);
                        }
                        pos += bv->bv_len;
@@ -1153,11 +1144,12 @@ static void zero_pages(struct page **pages, u64 offset, u64 end)
                unsigned long flags;
                void *kaddr;
 
-               page_offset = (size_t)(offset & ~PAGE_MASK);
-               length = min(PAGE_SIZE - page_offset, (size_t)(end - offset));
+               page_offset = offset & ~PAGE_MASK;
+               length = min_t(size_t, PAGE_SIZE - page_offset, end - offset);
                local_irq_save(flags);
                kaddr = kmap_atomic(*page);
                memset(kaddr + page_offset, 0, length);
+               flush_dcache_page(*page);
                kunmap_atomic(kaddr);
                local_irq_restore(flags);
 
@@ -2171,9 +2163,9 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request,
        struct rbd_obj_request *obj_request = NULL;
        struct rbd_obj_request *next_obj_request;
        bool write_request = img_request_write_test(img_request);
-       struct bio *bio_list;
+       struct bio *bio_list = 0;
        unsigned int bio_offset = 0;
-       struct page **pages;
+       struct page **pages = 0;
        u64 img_offset;
        u64 resid;
        u16 opcode;
@@ -2535,6 +2527,7 @@ static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
         */
        orig_request = obj_request->obj_request;
        obj_request->obj_request = NULL;
+       rbd_obj_request_put(orig_request);
        rbd_assert(orig_request);
        rbd_assert(orig_request->img_request);
 
@@ -2555,7 +2548,6 @@ static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
        if (!rbd_dev->parent_overlap) {
                struct ceph_osd_client *osdc;
 
-               rbd_obj_request_put(orig_request);
                osdc = &rbd_dev->rbd_client->client->osdc;
                result = rbd_obj_request_submit(osdc, orig_request);
                if (!result)
@@ -2585,7 +2577,6 @@ static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
 out:
        if (orig_request->result)
                rbd_obj_request_complete(orig_request);
-       rbd_obj_request_put(orig_request);
 }
 
 static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
@@ -2859,7 +2850,7 @@ static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
                (unsigned int)opcode);
        ret = rbd_dev_refresh(rbd_dev);
        if (ret)
-               rbd_warn(rbd_dev, "header refresh error (%d)\n", ret);
+               rbd_warn(rbd_dev, "header refresh error (%d)\n", ret);
 
        rbd_obj_notify_ack(rbd_dev, notify_id);
 }
@@ -3339,8 +3330,8 @@ static int rbd_dev_refresh(struct rbd_device *rbd_dev)
        int ret;
 
        rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
+       down_write(&rbd_dev->header_rwsem);
        mapping_size = rbd_dev->mapping.size;
-       mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
        if (rbd_dev->image_format == 1)
                ret = rbd_dev_v1_header_info(rbd_dev);
        else
@@ -3349,7 +3340,8 @@ static int rbd_dev_refresh(struct rbd_device *rbd_dev)
        /* If it's a mapped snapshot, validate its EXISTS flag */
 
        rbd_exists_validate(rbd_dev);
-       mutex_unlock(&ctl_mutex);
+       up_write(&rbd_dev->header_rwsem);
+
        if (mapping_size != rbd_dev->mapping.size) {
                sector_t size;
 
@@ -3813,6 +3805,7 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
        void *end;
        u64 pool_id;
        char *image_id;
+       u64 snap_id;
        u64 overlap;
        int ret;
 
@@ -3872,24 +3865,56 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
                        (unsigned long long)pool_id, U32_MAX);
                goto out_err;
        }
-       parent_spec->pool_id = pool_id;
 
        image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
        if (IS_ERR(image_id)) {
                ret = PTR_ERR(image_id);
                goto out_err;
        }
-       parent_spec->image_id = image_id;
-       ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
+       ceph_decode_64_safe(&p, end, snap_id, out_err);
        ceph_decode_64_safe(&p, end, overlap, out_err);
 
-       if (overlap) {
-               rbd_spec_put(rbd_dev->parent_spec);
+       /*
+        * The parent won't change (except when the clone is
+        * flattened, already handled that).  So we only need to
+        * record the parent spec we have not already done so.
+        */
+       if (!rbd_dev->parent_spec) {
+               parent_spec->pool_id = pool_id;
+               parent_spec->image_id = image_id;
+               parent_spec->snap_id = snap_id;
                rbd_dev->parent_spec = parent_spec;
                parent_spec = NULL;     /* rbd_dev now owns this */
-               rbd_dev->parent_overlap = overlap;
-       } else {
-               rbd_warn(rbd_dev, "ignoring parent of clone with overlap 0\n");
+       }
+
+       /*
+        * We always update the parent overlap.  If it's zero we
+        * treat it specially.
+        */
+       rbd_dev->parent_overlap = overlap;
+       smp_mb();
+       if (!overlap) {
+
+               /* A null parent_spec indicates it's the initial probe */
+
+               if (parent_spec) {
+                       /*
+                        * The overlap has become zero, so the clone
+                        * must have been resized down to 0 at some
+                        * point.  Treat this the same as a flatten.
+                        */
+                       rbd_dev_parent_put(rbd_dev);
+                       pr_info("%s: clone image now standalone\n",
+                               rbd_dev->disk->disk_name);
+               } else {
+                       /*
+                        * For the initial probe, if we find the
+                        * overlap is zero we just pretend there was
+                        * no parent image.
+                        */
+                       rbd_warn(rbd_dev, "ignoring parent of "
+                                               "clone with overlap 0\n");
+               }
        }
 out:
        ret = 0;
@@ -4245,16 +4270,14 @@ static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
        bool first_time = rbd_dev->header.object_prefix == NULL;
        int ret;
 
-       down_write(&rbd_dev->header_rwsem);
-
        ret = rbd_dev_v2_image_size(rbd_dev);
        if (ret)
-               goto out;
+               return ret;
 
        if (first_time) {
                ret = rbd_dev_v2_header_onetime(rbd_dev);
                if (ret)
-                       goto out;
+                       return ret;
        }
 
        /*
@@ -4269,7 +4292,7 @@ static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
 
                ret = rbd_dev_v2_parent_info(rbd_dev);
                if (ret)
-                       goto out;
+                       return ret;
 
                /*
                 * Print a warning if this is the initial probe and
@@ -4290,8 +4313,6 @@ static int rbd_dev_v2_header_info(struct rbd_device *rbd_dev)
 
        ret = rbd_dev_v2_snap_context(rbd_dev);
        dout("rbd_dev_v2_snap_context returned %d\n", ret);
-out:
-       up_write(&rbd_dev->header_rwsem);
 
        return ret;
 }
@@ -4301,8 +4322,6 @@ static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
        struct device *dev;
        int ret;
 
-       mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
-
        dev = &rbd_dev->dev;
        dev->bus = &rbd_bus_type;
        dev->type = &rbd_device_type;
@@ -4311,8 +4330,6 @@ static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
        dev_set_name(dev, "%d", rbd_dev->dev_id);
        ret = device_register(dev);
 
-       mutex_unlock(&ctl_mutex);
-
        return ret;
 }
 
@@ -5059,23 +5076,6 @@ err_out_module:
        return (ssize_t)rc;
 }
 
-static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
-{
-       struct list_head *tmp;
-       struct rbd_device *rbd_dev;
-
-       spin_lock(&rbd_dev_list_lock);
-       list_for_each(tmp, &rbd_dev_list) {
-               rbd_dev = list_entry(tmp, struct rbd_device, node);
-               if (rbd_dev->dev_id == dev_id) {
-                       spin_unlock(&rbd_dev_list_lock);
-                       return rbd_dev;
-               }
-       }
-       spin_unlock(&rbd_dev_list_lock);
-       return NULL;
-}
-
 static void rbd_dev_device_release(struct device *dev)
 {
        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
@@ -5120,8 +5120,10 @@ static ssize_t rbd_remove(struct bus_type *bus,
                          size_t count)
 {
        struct rbd_device *rbd_dev = NULL;
-       int target_id;
+       struct list_head *tmp;
+       int dev_id;
        unsigned long ul;
+       bool already = false;
        int ret;
 
        ret = strict_strtoul(buf, 10, &ul);
@@ -5129,37 +5131,40 @@ static ssize_t rbd_remove(struct bus_type *bus,
                return ret;
 
        /* convert to int; abort if we lost anything in the conversion */
-       target_id = (int) ul;
-       if (target_id != ul)
+       dev_id = (int)ul;
+       if (dev_id != ul)
                return -EINVAL;
 
-       mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
-
-       rbd_dev = __rbd_get_dev(target_id);
-       if (!rbd_dev) {
-               ret = -ENOENT;
-               goto done;
+       ret = -ENOENT;
+       spin_lock(&rbd_dev_list_lock);
+       list_for_each(tmp, &rbd_dev_list) {
+               rbd_dev = list_entry(tmp, struct rbd_device, node);
+               if (rbd_dev->dev_id == dev_id) {
+                       ret = 0;
+                       break;
+               }
+       }
+       if (!ret) {
+               spin_lock_irq(&rbd_dev->lock);
+               if (rbd_dev->open_count)
+                       ret = -EBUSY;
+               else
+                       already = test_and_set_bit(RBD_DEV_FLAG_REMOVING,
+                                                       &rbd_dev->flags);
+               spin_unlock_irq(&rbd_dev->lock);
        }
+       spin_unlock(&rbd_dev_list_lock);
+       if (ret < 0 || already)
+               return ret;
 
-       spin_lock_irq(&rbd_dev->lock);
-       if (rbd_dev->open_count)
-               ret = -EBUSY;
-       else
-               set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
-       spin_unlock_irq(&rbd_dev->lock);
-       if (ret < 0)
-               goto done;
        rbd_bus_del_dev(rbd_dev);
        ret = rbd_dev_header_watch_sync(rbd_dev, false);
        if (ret)
                rbd_warn(rbd_dev, "failed to cancel watch event (%d)\n", ret);
        rbd_dev_image_release(rbd_dev);
        module_put(THIS_MODULE);
-       ret = count;
-done:
-       mutex_unlock(&ctl_mutex);
 
-       return ret;
+       return count;
 }
 
 /*
@@ -5267,6 +5272,7 @@ static void __exit rbd_exit(void)
 module_init(rbd_init);
 module_exit(rbd_exit);
 
+MODULE_AUTHOR("Alex Elder <elder@inktank.com>");
 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
 MODULE_DESCRIPTION("rados block device");
index 38b5c1b..5318a3b 100644 (file)
@@ -439,13 +439,12 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
        struct ceph_inode_info *ci;
        struct ceph_fs_client *fsc;
        struct ceph_osd_client *osdc;
-       loff_t page_off = page_offset(page);
-       int len = PAGE_CACHE_SIZE;
-       loff_t i_size;
-       int err = 0;
        struct ceph_snap_context *snapc, *oldest;
-       u64 snap_size = 0;
+       loff_t page_off = page_offset(page);
        long writeback_stat;
+       u64 truncate_size, snap_size = 0;
+       u32 truncate_seq;
+       int err = 0, len = PAGE_CACHE_SIZE;
 
        dout("writepage %p idx %lu\n", page, page->index);
 
@@ -475,13 +474,20 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
        }
        ceph_put_snap_context(oldest);
 
+       spin_lock(&ci->i_ceph_lock);
+       truncate_seq = ci->i_truncate_seq;
+       truncate_size = ci->i_truncate_size;
+       if (!snap_size)
+               snap_size = i_size_read(inode);
+       spin_unlock(&ci->i_ceph_lock);
+
        /* is this a partial page at end of file? */
-       if (snap_size)
-               i_size = snap_size;
-       else
-               i_size = i_size_read(inode);
-       if (i_size < page_off + len)
-               len = i_size - page_off;
+       if (page_off >= snap_size) {
+               dout("%p page eof %llu\n", page, snap_size);
+               goto out;
+       }
+       if (snap_size < page_off + len)
+               len = snap_size - page_off;
 
        dout("writepage %p page %p index %lu on %llu~%u snapc %p\n",
             inode, page, page->index, page_off, len, snapc);
@@ -495,7 +501,7 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
        err = ceph_osdc_writepages(osdc, ceph_vino(inode),
                                   &ci->i_layout, snapc,
                                   page_off, len,
-                                  ci->i_truncate_seq, ci->i_truncate_size,
+                                  truncate_seq, truncate_size,
                                   &inode->i_mtime, &page, 1);
        if (err < 0) {
                dout("writepage setting page/mapping error %d %p\n", err, page);
@@ -632,25 +638,6 @@ static void writepages_finish(struct ceph_osd_request *req,
        ceph_osdc_put_request(req);
 }
 
-static struct ceph_osd_request *
-ceph_writepages_osd_request(struct inode *inode, u64 offset, u64 *len,
-                               struct ceph_snap_context *snapc, int num_ops)
-{
-       struct ceph_fs_client *fsc;
-       struct ceph_inode_info *ci;
-       struct ceph_vino vino;
-
-       fsc = ceph_inode_to_client(inode);
-       ci = ceph_inode(inode);
-       vino = ceph_vino(inode);
-       /* BUG_ON(vino.snap != CEPH_NOSNAP); */
-
-       return ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
-                       vino, offset, len, num_ops, CEPH_OSD_OP_WRITE,
-                       CEPH_OSD_FLAG_WRITE|CEPH_OSD_FLAG_ONDISK,
-                       snapc, ci->i_truncate_seq, ci->i_truncate_size, true);
-}
-
 /*
  * initiate async writeback
  */
@@ -659,7 +646,8 @@ static int ceph_writepages_start(struct address_space *mapping,
 {
        struct inode *inode = mapping->host;
        struct ceph_inode_info *ci = ceph_inode(inode);
-       struct ceph_fs_client *fsc;
+       struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
+       struct ceph_vino vino = ceph_vino(inode);
        pgoff_t index, start, end;
        int range_whole = 0;
        int should_loop = 1;
@@ -671,22 +659,22 @@ static int ceph_writepages_start(struct address_space *mapping,
        unsigned wsize = 1 << inode->i_blkbits;
        struct ceph_osd_request *req = NULL;
        int do_sync;
-       u64 snap_size;
+       u64 truncate_size, snap_size;
+       u32 truncate_seq;
 
        /*
         * Include a 'sync' in the OSD request if this is a data
         * integrity write (e.g., O_SYNC write or fsync()), or if our
         * cap is being revoked.
         */
-       do_sync = wbc->sync_mode == WB_SYNC_ALL;
-       if (ceph_caps_revoking(ci, CEPH_CAP_FILE_BUFFER))
+       if ((wbc->sync_mode == WB_SYNC_ALL) ||
+               ceph_caps_revoking(ci, CEPH_CAP_FILE_BUFFER))
                do_sync = 1;
        dout("writepages_start %p dosync=%d (mode=%s)\n",
             inode, do_sync,
             wbc->sync_mode == WB_SYNC_NONE ? "NONE" :
             (wbc->sync_mode == WB_SYNC_ALL ? "ALL" : "HOLD"));
 
-       fsc = ceph_inode_to_client(inode);
        if (fsc->mount_state == CEPH_MOUNT_SHUTDOWN) {
                pr_warning("writepage_start %p on forced umount\n", inode);
                return -EIO; /* we're in a forced umount, don't write! */
@@ -729,6 +717,14 @@ retry:
                snap_size = i_size_read(inode);
        dout(" oldest snapc is %p seq %lld (%d snaps)\n",
             snapc, snapc->seq, snapc->num_snaps);
+
+       spin_lock(&ci->i_ceph_lock);
+       truncate_seq = ci->i_truncate_seq;
+       truncate_size = ci->i_truncate_size;
+       if (!snap_size)
+               snap_size = i_size_read(inode);
+       spin_unlock(&ci->i_ceph_lock);
+
        if (last_snapc && snapc != last_snapc) {
                /* if we switched to a newer snapc, restart our scan at the
                 * start of the original file range. */
@@ -740,7 +736,6 @@ retry:
 
        while (!done && index <= end) {
                int num_ops = do_sync ? 2 : 1;
-               struct ceph_vino vino;
                unsigned i;
                int first;
                pgoff_t next;
@@ -834,17 +829,18 @@ get_more_pages:
                         * that it will use.
                         */
                        if (locked_pages == 0) {
-                               size_t size;
-
                                BUG_ON(pages);
-
                                /* prepare async write request */
                                offset = (u64)page_offset(page);
                                len = wsize;
-                               req = ceph_writepages_osd_request(inode,
-                                                       offset, &len, snapc,
-                                                       num_ops);
-
+                               req = ceph_osdc_new_request(&fsc->client->osdc,
+                                                       &ci->i_layout, vino,
+                                                       offset, &len, num_ops,
+                                                       CEPH_OSD_OP_WRITE,
+                                                       CEPH_OSD_FLAG_WRITE |
+                                                       CEPH_OSD_FLAG_ONDISK,
+                                                       snapc, truncate_seq,
+                                                       truncate_size, true);
                                if (IS_ERR(req)) {
                                        rc = PTR_ERR(req);
                                        unlock_page(page);
@@ -855,8 +851,8 @@ get_more_pages:
                                req->r_inode = inode;
 
                                max_pages = calc_pages_for(0, (u64)len);
-                               size = max_pages * sizeof (*pages);
-                               pages = kmalloc(size, GFP_NOFS);
+                               pages = kmalloc(max_pages * sizeof (*pages),
+                                               GFP_NOFS);
                                if (!pages) {
                                        pool = fsc->wb_pagevec_pool;
                                        pages = mempool_alloc(pool, GFP_NOFS);
index da0f9b8..25442b4 100644 (file)
@@ -147,7 +147,7 @@ void ceph_adjust_min_caps(struct ceph_mds_client *mdsc, int delta)
        spin_unlock(&mdsc->caps_list_lock);
 }
 
-int ceph_reserve_caps(struct ceph_mds_client *mdsc,
+void ceph_reserve_caps(struct ceph_mds_client *mdsc,
                      struct ceph_cap_reservation *ctx, int need)
 {
        int i;
@@ -155,7 +155,6 @@ int ceph_reserve_caps(struct ceph_mds_client *mdsc,
        int have;
        int alloc = 0;
        LIST_HEAD(newcaps);
-       int ret = 0;
 
        dout("reserve caps ctx=%p need=%d\n", ctx, need);
 
@@ -174,14 +173,15 @@ int ceph_reserve_caps(struct ceph_mds_client *mdsc,
 
        for (i = have; i < need; i++) {
                cap = kmem_cache_alloc(ceph_cap_cachep, GFP_NOFS);
-               if (!cap) {
-                       ret = -ENOMEM;
-                       goto out_alloc_count;
-               }
+               if (!cap)
+                       break;
                list_add(&cap->caps_item, &newcaps);
                alloc++;
        }
-       BUG_ON(have + alloc != need);
+       /* we didn't manage to reserve as much as we needed */
+       if (have + alloc != need)
+               pr_warn("reserve caps ctx=%p ENOMEM need=%d got=%d\n",
+                       ctx, need, have + alloc);
 
        spin_lock(&mdsc->caps_list_lock);
        mdsc->caps_total_count += alloc;
@@ -197,13 +197,6 @@ int ceph_reserve_caps(struct ceph_mds_client *mdsc,
        dout("reserve caps ctx=%p %d = %d used + %d resv + %d avail\n",
             ctx, mdsc->caps_total_count, mdsc->caps_use_count,
             mdsc->caps_reserve_count, mdsc->caps_avail_count);
-       return 0;
-
-out_alloc_count:
-       /* we didn't manage to reserve as much as we needed */
-       pr_warning("reserve caps ctx=%p ENOMEM need=%d got=%d\n",
-                  ctx, need, have);
-       return ret;
 }
 
 int ceph_unreserve_caps(struct ceph_mds_client *mdsc,
@@ -612,9 +605,11 @@ retry:
                __cap_delay_requeue(mdsc, ci);
        }
 
-       if (flags & CEPH_CAP_FLAG_AUTH)
-               ci->i_auth_cap = cap;
-       else if (ci->i_auth_cap == cap) {
+       if (flags & CEPH_CAP_FLAG_AUTH) {
+               if (ci->i_auth_cap == NULL ||
+                   ceph_seq_cmp(ci->i_auth_cap->mseq, mseq) < 0)
+                       ci->i_auth_cap = cap;
+       } else if (ci->i_auth_cap == cap) {
                ci->i_auth_cap = NULL;
                spin_lock(&mdsc->cap_dirty_lock);
                if (!list_empty(&ci->i_dirty_item)) {
@@ -695,6 +690,15 @@ int __ceph_caps_issued(struct ceph_inode_info *ci, int *implemented)
                if (implemented)
                        *implemented |= cap->implemented;
        }
+       /*
+        * exclude caps issued by non-auth MDS, but are been revoking
+        * by the auth MDS. The non-auth MDS should be revoking/exporting
+        * these caps, but the message is delayed.
+        */
+       if (ci->i_auth_cap) {
+               cap = ci->i_auth_cap;
+               have &= ~cap->implemented | cap->issued;
+       }
        return have;
 }
 
@@ -802,22 +806,28 @@ int __ceph_caps_issued_mask(struct ceph_inode_info *ci, int mask, int touch)
 /*
  * Return true if mask caps are currently being revoked by an MDS.
  */
-int ceph_caps_revoking(struct ceph_inode_info *ci, int mask)
+int __ceph_caps_revoking_other(struct ceph_inode_info *ci,
+                              struct ceph_cap *ocap, int mask)
 {
-       struct inode *inode = &ci->vfs_inode;
        struct ceph_cap *cap;
        struct rb_node *p;
-       int ret = 0;
 
-       spin_lock(&ci->i_ceph_lock);
        for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
                cap = rb_entry(p, struct ceph_cap, ci_node);
-               if (__cap_is_valid(cap) &&
-                   (cap->implemented & ~cap->issued & mask)) {
-                       ret = 1;
-                       break;
-               }
+               if (cap != ocap && __cap_is_valid(cap) &&
+                   (cap->implemented & ~cap->issued & mask))
+                       return 1;
        }
+       return 0;
+}
+
+int ceph_caps_revoking(struct ceph_inode_info *ci, int mask)
+{
+       struct inode *inode = &ci->vfs_inode;
+       int ret;
+
+       spin_lock(&ci->i_ceph_lock);
+       ret = __ceph_caps_revoking_other(ci, NULL, mask);
        spin_unlock(&ci->i_ceph_lock);
        dout("ceph_caps_revoking %p %s = %d\n", inode,
             ceph_cap_string(mask), ret);
@@ -1980,8 +1990,15 @@ static void kick_flushing_inode_caps(struct ceph_mds_client *mdsc,
        cap = ci->i_auth_cap;
        dout("kick_flushing_inode_caps %p flushing %s flush_seq %lld\n", inode,
             ceph_cap_string(ci->i_flushing_caps), ci->i_cap_flush_seq);
+
        __ceph_flush_snaps(ci, &session, 1);
+
        if (ci->i_flushing_caps) {
+               spin_lock(&mdsc->cap_dirty_lock);
+               list_move_tail(&ci->i_flushing_item,
+                              &cap->session->s_cap_flushing);
+               spin_unlock(&mdsc->cap_dirty_lock);
+
                delayed = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH,
                                     __ceph_caps_used(ci),
                                     __ceph_caps_wanted(ci),
@@ -2055,7 +2072,11 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want,
        /* finish pending truncate */
        while (ci->i_truncate_pending) {
                spin_unlock(&ci->i_ceph_lock);
-               __ceph_do_pending_vmtruncate(inode, !(need & CEPH_CAP_FILE_WR));
+               if (!(need & CEPH_CAP_FILE_WR))
+                       mutex_lock(&inode->i_mutex);
+               __ceph_do_pending_vmtruncate(inode);
+               if (!(need & CEPH_CAP_FILE_WR))
+                       mutex_unlock(&inode->i_mutex);
                spin_lock(&ci->i_ceph_lock);
        }
 
@@ -2473,6 +2494,11 @@ static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
        } else {
                dout("grant: %s -> %s\n", ceph_cap_string(cap->issued),
                     ceph_cap_string(newcaps));
+               /* non-auth MDS is revoking the newly grant caps ? */
+               if (cap == ci->i_auth_cap &&
+                   __ceph_caps_revoking_other(ci, cap, newcaps))
+                   check_caps = 2;
+
                cap->issued = newcaps;
                cap->implemented |= newcaps; /* add bits only, to
                                              * avoid stepping on a
@@ -3042,21 +3068,19 @@ int ceph_encode_inode_release(void **p, struct inode *inode,
                     (cap->issued & unless) == 0)) {
                        if ((cap->issued & drop) &&
                            (cap->issued & unless) == 0) {
-                               dout("encode_inode_release %p cap %p %s -> "
-                                    "%s\n", inode, cap,
+                               int wanted = __ceph_caps_wanted(ci);
+                               if ((ci->i_ceph_flags & CEPH_I_NODELAY) == 0)
+                                       wanted |= cap->mds_wanted;
+                               dout("encode_inode_release %p cap %p "
+                                    "%s -> %s, wanted %s -> %s\n", inode, cap,
                                     ceph_cap_string(cap->issued),
-                                    ceph_cap_string(cap->issued & ~drop));
+                                    ceph_cap_string(cap->issued & ~drop),
+                                    ceph_cap_string(cap->mds_wanted),
+                                    ceph_cap_string(wanted));
+
                                cap->issued &= ~drop;
                                cap->implemented &= ~drop;
-                               if (ci->i_ceph_flags & CEPH_I_NODELAY) {
-                                       int wanted = __ceph_caps_wanted(ci);
-                                       dout("  wanted %s -> %s (act %s)\n",
-                                            ceph_cap_string(cap->mds_wanted),
-                                            ceph_cap_string(cap->mds_wanted &
-                                                            ~wanted),
-                                            ceph_cap_string(wanted));
-                                       cap->mds_wanted &= wanted;
-                               }
+                               cap->mds_wanted = wanted;
                        } else {
                                dout("encode_inode_release %p cap %p %s"
                                     " (force)\n", inode, cap,
index 16c989d..2ddf061 100644 (file)
@@ -716,7 +716,6 @@ static ssize_t ceph_aio_write(struct kiocb *iocb, const struct iovec *iov,
        if (ceph_snap(inode) != CEPH_NOSNAP)
                return -EROFS;
 
-       sb_start_write(inode->i_sb);
        mutex_lock(&inode->i_mutex);
        hold_mutex = true;
 
@@ -809,7 +808,6 @@ retry_snap:
 out:
        if (hold_mutex)
                mutex_unlock(&inode->i_mutex);
-       sb_end_write(inode->i_sb);
        current->backing_dev_info = NULL;
 
        return written ? written : err;
@@ -824,7 +822,7 @@ static loff_t ceph_llseek(struct file *file, loff_t offset, int whence)
        int ret;
 
        mutex_lock(&inode->i_mutex);
-       __ceph_do_pending_vmtruncate(inode, false);
+       __ceph_do_pending_vmtruncate(inode);
 
        if (whence == SEEK_END || whence == SEEK_DATA || whence == SEEK_HOLE) {
                ret = ceph_do_getattr(inode, CEPH_STAT_CAP_SIZE);
index bd2289a..f3a2abf 100644 (file)
@@ -1465,7 +1465,9 @@ static void ceph_vmtruncate_work(struct work_struct *work)
        struct inode *inode = &ci->vfs_inode;
 
        dout("vmtruncate_work %p\n", inode);
-       __ceph_do_pending_vmtruncate(inode, true);
+       mutex_lock(&inode->i_mutex);
+       __ceph_do_pending_vmtruncate(inode);
+       mutex_unlock(&inode->i_mutex);
        iput(inode);
 }
 
@@ -1492,7 +1494,7 @@ void ceph_queue_vmtruncate(struct inode *inode)
  * Make sure any pending truncation is applied before doing anything
  * that may depend on it.
  */
-void __ceph_do_pending_vmtruncate(struct inode *inode, bool needlock)
+void __ceph_do_pending_vmtruncate(struct inode *inode)
 {
        struct ceph_inode_info *ci = ceph_inode(inode);
        u64 to;
@@ -1525,11 +1527,7 @@ retry:
             ci->i_truncate_pending, to);
        spin_unlock(&ci->i_ceph_lock);
 
-       if (needlock)
-               mutex_lock(&inode->i_mutex);
        truncate_inode_pages(inode->i_mapping, to);
-       if (needlock)
-               mutex_unlock(&inode->i_mutex);
 
        spin_lock(&ci->i_ceph_lock);
        if (to == ci->i_truncate_size) {
@@ -1588,7 +1586,7 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr)
        if (ceph_snap(inode) != CEPH_NOSNAP)
                return -EROFS;
 
-       __ceph_do_pending_vmtruncate(inode, false);
+       __ceph_do_pending_vmtruncate(inode);
 
        err = inode_change_ok(inode, attr);
        if (err != 0)
@@ -1770,7 +1768,7 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr)
             ceph_cap_string(dirtied), mask);
 
        ceph_mdsc_put_request(req);
-       __ceph_do_pending_vmtruncate(inode, false);
+       __ceph_do_pending_vmtruncate(inode);
        return err;
 out:
        spin_unlock(&ci->i_ceph_lock);
index 690f73f..ae6d14e 100644 (file)
@@ -169,7 +169,7 @@ int ceph_flock(struct file *file, int cmd, struct file_lock *fl)
 }
 
 /**
- * Must be called with BKL already held. Fills in the passed
+ * Must be called with lock_flocks() already held. Fills in the passed
  * counter variables, so you can prepare pagelist metadata before calling
  * ceph_encode_locks.
  */
index 99890b0..187bf21 100644 (file)
@@ -1391,6 +1391,7 @@ static void discard_cap_releases(struct ceph_mds_client *mdsc,
        num = le32_to_cpu(head->num);
        dout("discard_cap_releases mds%d %p %u\n", session->s_mds, msg, num);
        head->num = cpu_to_le32(0);
+       msg->front.iov_len = sizeof(*head);
        session->s_num_cap_releases += num;
 
        /* requeue completed messages */
@@ -2454,6 +2455,7 @@ static int encode_caps_cb(struct inode *inode, struct ceph_cap *cap,
        spin_lock(&ci->i_ceph_lock);
        cap->seq = 0;        /* reset cap seq */
        cap->issue_seq = 0;  /* and issue_seq */
+       cap->mseq = 0;       /* and migrate_seq */
 
        if (recon_state->flock) {
                rec.v2.cap_id = cpu_to_le64(cap->cap_id);
@@ -3040,8 +3042,10 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc)
        fsc->mdsc = mdsc;
        mutex_init(&mdsc->mutex);
        mdsc->mdsmap = kzalloc(sizeof(*mdsc->mdsmap), GFP_NOFS);
-       if (mdsc->mdsmap == NULL)
+       if (mdsc->mdsmap == NULL) {
+               kfree(mdsc);
                return -ENOMEM;
+       }
 
        init_completion(&mdsc->safe_umount_waiters);
        init_waitqueue_head(&mdsc->session_close_wq);
index 9278dec..132b64e 100644 (file)
@@ -92,6 +92,7 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end)
                u32 num_export_targets;
                void *pexport_targets = NULL;
                struct ceph_timespec laggy_since;
+               struct ceph_mds_info *info;
 
                ceph_decode_need(p, end, sizeof(u64)*2 + 1 + sizeof(u32), bad);
                global_id = ceph_decode_64(p);
@@ -126,24 +127,27 @@ struct ceph_mdsmap *ceph_mdsmap_decode(void **p, void *end)
                     i+1, n, global_id, mds, inc,
                     ceph_pr_addr(&addr.in_addr),
                     ceph_mds_state_name(state));
-               if (mds >= 0 && mds < m->m_max_mds && state > 0) {
-                       m->m_info[mds].global_id = global_id;
-                       m->m_info[mds].state = state;
-                       m->m_info[mds].addr = addr;
-                       m->m_info[mds].laggy =
-                               (laggy_since.tv_sec != 0 ||
-                                laggy_since.tv_nsec != 0);
-                       m->m_info[mds].num_export_targets = num_export_targets;
-                       if (num_export_targets) {
-                               m->m_info[mds].export_targets =
-                                       kcalloc(num_export_targets, sizeof(u32),
-                                               GFP_NOFS);
-                               for (j = 0; j < num_export_targets; j++)
-                                       m->m_info[mds].export_targets[j] =
-                                              ceph_decode_32(&pexport_targets);
-                       } else {
-                               m->m_info[mds].export_targets = NULL;
-                       }
+
+               if (mds < 0 || mds >= m->m_max_mds || state <= 0)
+                       continue;
+
+               info = &m->m_info[mds];
+               info->global_id = global_id;
+               info->state = state;
+               info->addr = addr;
+               info->laggy = (laggy_since.tv_sec != 0 ||
+                              laggy_since.tv_nsec != 0);
+               info->num_export_targets = num_export_targets;
+               if (num_export_targets) {
+                       info->export_targets = kcalloc(num_export_targets,
+                                                      sizeof(u32), GFP_NOFS);
+                       if (info->export_targets == NULL)
+                               goto badmem;
+                       for (j = 0; j < num_export_targets; j++)
+                               info->export_targets[j] =
+                                      ceph_decode_32(&pexport_targets);
+               } else {
+                       info->export_targets = NULL;
                }
        }
 
@@ -170,7 +174,7 @@ bad:
                       DUMP_PREFIX_OFFSET, 16, 1,
                       start, end - start, true);
        ceph_mdsmap_destroy(m);
-       return ERR_PTR(-EINVAL);
+       return ERR_PTR(err);
 }
 
 void ceph_mdsmap_destroy(struct ceph_mdsmap *m)
index 7d377c9..6627b26 100644 (file)
@@ -357,7 +357,7 @@ static int parse_mount_options(struct ceph_mount_options **pfsopt,
        }
        err = -EINVAL;
        dev_name_end--;         /* back up to ':' separator */
-       if (*dev_name_end != ':') {
+       if (dev_name_end < dev_name || *dev_name_end != ':') {
                pr_err("device name is missing path (no : separator in %s)\n",
                                dev_name);
                goto out;
index 7ccfdb4..cbded57 100644 (file)
@@ -534,7 +534,7 @@ extern int __ceph_caps_mds_wanted(struct ceph_inode_info *ci);
 extern void ceph_caps_init(struct ceph_mds_client *mdsc);
 extern void ceph_caps_finalize(struct ceph_mds_client *mdsc);
 extern void ceph_adjust_min_caps(struct ceph_mds_client *mdsc, int delta);
-extern int ceph_reserve_caps(struct ceph_mds_client *mdsc,
+extern void ceph_reserve_caps(struct ceph_mds_client *mdsc,
                             struct ceph_cap_reservation *ctx, int need);
 extern int ceph_unreserve_caps(struct ceph_mds_client *mdsc,
                               struct ceph_cap_reservation *ctx);
@@ -692,7 +692,7 @@ extern int ceph_readdir_prepopulate(struct ceph_mds_request *req,
 extern int ceph_inode_holds_cap(struct inode *inode, int mask);
 
 extern int ceph_inode_set_size(struct inode *inode, loff_t size);
-extern void __ceph_do_pending_vmtruncate(struct inode *inode, bool needlock);
+extern void __ceph_do_pending_vmtruncate(struct inode *inode);
 extern void ceph_queue_vmtruncate(struct inode *inode);
 
 extern void ceph_queue_invalidate(struct inode *inode);
index 9b6b2b6..be661d8 100644 (file)
@@ -675,17 +675,18 @@ ssize_t ceph_getxattr(struct dentry *dentry, const char *name, void *value,
        if (!ceph_is_valid_xattr(name))
                return -ENODATA;
 
-       spin_lock(&ci->i_ceph_lock);
-       dout("getxattr %p ver=%lld index_ver=%lld\n", inode,
-            ci->i_xattrs.version, ci->i_xattrs.index_version);
 
        /* let's see if a virtual xattr was requested */
        vxattr = ceph_match_vxattr(inode, name);
        if (vxattr && !(vxattr->exists_cb && !vxattr->exists_cb(ci))) {
                err = vxattr->getxattr_cb(ci, value, size);
-               goto out;
+               return err;
        }
 
+       spin_lock(&ci->i_ceph_lock);
+       dout("getxattr %p ver=%lld index_ver=%lld\n", inode,
+            ci->i_xattrs.version, ci->i_xattrs.index_version);
+
        if (__ceph_caps_issued_mask(ci, CEPH_CAP_XATTR_SHARED, 1) &&
            (ci->i_xattrs.index_version >= ci->i_xattrs.version)) {
                goto get_xattr;
index 379f715..0442c3d 100644 (file)
@@ -160,11 +160,6 @@ static inline void ceph_decode_timespec(struct timespec *ts,
 static inline void ceph_encode_timespec(struct ceph_timespec *tv,
                                        const struct timespec *ts)
 {
-       BUG_ON(ts->tv_sec < 0);
-       BUG_ON(ts->tv_sec > (__kernel_time_t)U32_MAX);
-       BUG_ON(ts->tv_nsec < 0);
-       BUG_ON(ts->tv_nsec > (long)U32_MAX);
-
        tv->tv_sec = cpu_to_le32((u32)ts->tv_sec);
        tv->tv_nsec = cpu_to_le32((u32)ts->tv_nsec);
 }
index 186db0b..ce6df39 100644 (file)
@@ -145,7 +145,6 @@ struct ceph_osd_request {
        s32               r_reply_op_result[CEPH_OSD_MAX_OP];
        int               r_got_reply;
        int               r_linger;
-       int               r_completed;
 
        struct ceph_osd_client *r_osdc;
        struct kref       r_kref;
index 925ca58..8c93fa8 100644 (file)
@@ -39,6 +39,11 @@ static int should_authenticate(struct ceph_auth_client *ac)
        return xi->starting;
 }
 
+static int build_request(struct ceph_auth_client *ac, void *buf, void *end)
+{
+       return 0;
+}
+
 /*
  * the generic auth code decode the global_id, and we carry no actual
  * authenticate state, so nothing happens here.
@@ -106,6 +111,7 @@ static const struct ceph_auth_client_ops ceph_auth_none_ops = {
        .destroy = destroy,
        .is_authenticated = is_authenticated,
        .should_authenticate = should_authenticate,
+       .build_request = build_request,
        .handle_reply = handle_reply,
        .create_authorizer = ceph_auth_none_create_authorizer,
        .destroy_authorizer = ceph_auth_none_destroy_authorizer,
index 3a246a6..dd47889 100644 (file)
@@ -733,12 +733,14 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
 
        object_size = le32_to_cpu(layout->fl_object_size);
        object_base = off - objoff;
-       if (truncate_size <= object_base) {
-               truncate_size = 0;
-       } else {
-               truncate_size -= object_base;
-               if (truncate_size > object_size)
-                       truncate_size = object_size;
+       if (!(truncate_seq == 1 && truncate_size == -1ULL)) {
+               if (truncate_size <= object_base) {
+                       truncate_size = 0;
+               } else {
+                       truncate_size -= object_base;
+                       if (truncate_size > object_size)
+                               truncate_size = object_size;
+               }
        }
 
        osd_req_op_extent_init(req, 0, opcode, objoff, objlen,
@@ -1174,6 +1176,7 @@ static void __register_linger_request(struct ceph_osd_client *osdc,
                                    struct ceph_osd_request *req)
 {
        dout("__register_linger_request %p\n", req);
+       ceph_osdc_get_request(req);
        list_add_tail(&req->r_linger_item, &osdc->req_linger);
        if (req->r_osd)
                list_add_tail(&req->r_linger_osd,
@@ -1196,6 +1199,7 @@ static void __unregister_linger_request(struct ceph_osd_client *osdc,
                if (list_empty(&req->r_osd_item))
                        req->r_osd = NULL;
        }
+       ceph_osdc_put_request(req);
 }
 
 void ceph_osdc_unregister_linger_request(struct ceph_osd_client *osdc,
@@ -1203,9 +1207,8 @@ void ceph_osdc_unregister_linger_request(struct ceph_osd_client *osdc,
 {
        mutex_lock(&osdc->request_mutex);
        if (req->r_linger) {
-               __unregister_linger_request(osdc, req);
                req->r_linger = 0;
-               ceph_osdc_put_request(req);
+               __unregister_linger_request(osdc, req);
        }
        mutex_unlock(&osdc->request_mutex);
 }
@@ -1217,11 +1220,6 @@ void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc,
        if (!req->r_linger) {
                dout("set_request_linger %p\n", req);
                req->r_linger = 1;
-               /*
-                * caller is now responsible for calling
-                * unregister_linger_request
-                */
-               ceph_osdc_get_request(req);
        }
 }
 EXPORT_SYMBOL(ceph_osdc_set_request_linger);
@@ -1339,10 +1337,6 @@ static void __send_request(struct ceph_osd_client *osdc,
 
        ceph_msg_get(req->r_request); /* send consumes a ref */
 
-       /* Mark the request unsafe if this is the first timet's being sent. */
-
-       if (!req->r_sent && req->r_unsafe_callback)
-               req->r_unsafe_callback(req, true);
        req->r_sent = req->r_osd->o_incarnation;
 
        ceph_con_send(&req->r_osd->o_con, req->r_request);
@@ -1433,8 +1427,6 @@ static void handle_osds_timeout(struct work_struct *work)
 
 static void complete_request(struct ceph_osd_request *req)
 {
-       if (req->r_unsafe_callback)
-               req->r_unsafe_callback(req, false);
        complete_all(&req->r_safe_completion);  /* fsync waiter */
 }
 
@@ -1526,6 +1518,8 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
        for (i = 0; i < numops; i++)
                req->r_reply_op_result[i] = ceph_decode_32(&p);
 
+       already_completed = req->r_got_reply;
+
        if (!req->r_got_reply) {
 
                req->r_result = result;
@@ -1556,19 +1550,23 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
            ((flags & CEPH_OSD_FLAG_WRITE) == 0))
                __unregister_request(osdc, req);
 
-       already_completed = req->r_completed;
-       req->r_completed = 1;
        mutex_unlock(&osdc->request_mutex);
-       if (already_completed)
-               goto done;
 
-       if (req->r_callback)
-               req->r_callback(req, msg);
-       else
-               complete_all(&req->r_completion);
+       if (!already_completed) {
+               if (req->r_unsafe_callback &&
+                   result >= 0 && !(flags & CEPH_OSD_FLAG_ONDISK))
+                       req->r_unsafe_callback(req, true);
+               if (req->r_callback)
+                       req->r_callback(req, msg);
+               else
+                       complete_all(&req->r_completion);
+       }
 
-       if (flags & CEPH_OSD_FLAG_ONDISK)
+       if (flags & CEPH_OSD_FLAG_ONDISK) {
+               if (req->r_unsafe_callback && already_completed)
+                       req->r_unsafe_callback(req, false);
                complete_request(req);
+       }
 
 done:
        dout("req=%p req->r_linger=%d\n", req, req->r_linger);
@@ -1633,8 +1631,10 @@ static void kick_requests(struct ceph_osd_client *osdc, int force_resend)
                        dout("%p tid %llu restart on osd%d\n",
                             req, req->r_tid,
                             req->r_osd ? req->r_osd->o_osd : -1);
+                       ceph_osdc_get_request(req);
                        __unregister_request(osdc, req);
                        __register_linger_request(osdc, req);
+                       ceph_osdc_put_request(req);
                        continue;
                }
 
@@ -2123,7 +2123,6 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc,
        __register_request(osdc, req);
        req->r_sent = 0;
        req->r_got_reply = 0;
-       req->r_completed = 0;
        rc = __map_request(osdc, req, 0);
        if (rc < 0) {
                if (nofail) {
@@ -2456,8 +2455,10 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
        ceph_msg_revoke_incoming(req->r_reply);
 
        if (front > req->r_reply->front.iov_len) {
-               pr_warning("get_reply front %d > preallocated %d\n",
-                          front, (int)req->r_reply->front.iov_len);
+               pr_warning("get_reply front %d > preallocated %d (%u#%llu)\n",
+                          front, (int)req->r_reply->front.iov_len,
+                          (unsigned int)con->peer_name.type,
+                          le64_to_cpu(con->peer_name.num));
                m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front, GFP_NOFS, false);
                if (!m)
                        goto out;