Merge branch 'master' of git://git.kernel.org/pub/scm/linux/kernel/git/jkirsher/net...
[cascardo/linux.git] / drivers / block / rbd.c
index 5e7110a..6c81a4c 100644 (file)
@@ -170,7 +170,9 @@ typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
 struct rbd_obj_request;
 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
 
-enum obj_request_type { OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES };
+enum obj_request_type {
+       OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
+};
 
 struct rbd_obj_request {
        const char              *object_name;
@@ -194,7 +196,7 @@ struct rbd_obj_request {
 
        u64                     xferred;        /* bytes transferred */
        u64                     version;
-       s32                     result;
+       int                     result;
        atomic_t                done;
 
        rbd_obj_callback_t      callback;
@@ -224,11 +226,11 @@ struct rbd_img_request {
 };
 
 #define for_each_obj_request(ireq, oreq) \
-       list_for_each_entry(oreq, &ireq->obj_requests, links)
+       list_for_each_entry(oreq, &(ireq)->obj_requests, links)
 #define for_each_obj_request_from(ireq, oreq) \
-       list_for_each_entry_from(oreq, &ireq->obj_requests, links)
+       list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
 #define for_each_obj_request_safe(ireq, oreq, n) \
-       list_for_each_entry_safe_reverse(oreq, n, &ireq->obj_requests, links)
+       list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
 
 struct rbd_snap {
        struct  device          dev;
@@ -259,10 +261,10 @@ struct rbd_device {
 
        char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
 
-       spinlock_t              lock;           /* queue lock */
+       spinlock_t              lock;           /* queue, flags, open_count */
 
        struct rbd_image_header header;
-       atomic_t                exists;
+       unsigned long           flags;          /* possibly lock protected */
        struct rbd_spec         *spec;
 
        char                    *header_name;
@@ -270,7 +272,7 @@ struct rbd_device {
        struct ceph_file_layout layout;
 
        struct ceph_osd_event   *watch_event;
-       struct ceph_osd_request *watch_request;
+       struct rbd_obj_request  *watch_request;
 
        struct rbd_spec         *parent_spec;
        u64                     parent_overlap;
@@ -287,7 +289,19 @@ struct rbd_device {
 
        /* sysfs related */
        struct device           dev;
-       unsigned long           open_count;
+       unsigned long           open_count;     /* protected by lock */
+};
+
+/*
+ * Flag bits for rbd_dev->flags.  If atomicity is required,
+ * rbd_dev->lock is used to protect access.
+ *
+ * Currently, only the "removing" flag (which is coupled with the
+ * "open_count" field) requires atomic access.
+ */
+enum rbd_dev_flags {
+       RBD_DEV_FLAG_EXISTS,    /* mapped snapshot has not been deleted */
+       RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
 };
 
 static DEFINE_MUTEX(ctl_mutex);          /* Serialize open/close/setup/teardown */
@@ -375,14 +389,23 @@ static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
 static int rbd_open(struct block_device *bdev, fmode_t mode)
 {
        struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
+       bool removing = false;
 
        if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
                return -EROFS;
 
+       spin_lock_irq(&rbd_dev->lock);
+       if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
+               removing = true;
+       else
+               rbd_dev->open_count++;
+       spin_unlock_irq(&rbd_dev->lock);
+       if (removing)
+               return -ENOENT;
+
        mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
        (void) get_device(&rbd_dev->dev);
        set_device_ro(bdev, rbd_dev->mapping.read_only);
-       rbd_dev->open_count++;
        mutex_unlock(&ctl_mutex);
 
        return 0;
@@ -391,10 +414,14 @@ static int rbd_open(struct block_device *bdev, fmode_t mode)
 static int rbd_release(struct gendisk *disk, fmode_t mode)
 {
        struct rbd_device *rbd_dev = disk->private_data;
+       unsigned long open_count_before;
+
+       spin_lock_irq(&rbd_dev->lock);
+       open_count_before = rbd_dev->open_count--;
+       spin_unlock_irq(&rbd_dev->lock);
+       rbd_assert(open_count_before > 0);
 
        mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
-       rbd_assert(rbd_dev->open_count > 0);
-       rbd_dev->open_count--;
        put_device(&rbd_dev->dev);
        mutex_unlock(&ctl_mutex);
 
@@ -416,7 +443,7 @@ static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
        struct rbd_client *rbdc;
        int ret = -ENOMEM;
 
-       dout("rbd_client_create\n");
+       dout("%s:\n", __func__);
        rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
        if (!rbdc)
                goto out_opt;
@@ -440,8 +467,8 @@ static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
        spin_unlock(&rbd_client_list_lock);
 
        mutex_unlock(&ctl_mutex);
+       dout("%s: rbdc %p\n", __func__, rbdc);
 
-       dout("rbd_client_create created %p\n", rbdc);
        return rbdc;
 
 out_err:
@@ -452,6 +479,8 @@ out_mutex:
 out_opt:
        if (ceph_opts)
                ceph_destroy_options(ceph_opts);
+       dout("%s: error %d\n", __func__, ret);
+
        return ERR_PTR(ret);
 }
 
@@ -578,7 +607,7 @@ static void rbd_client_release(struct kref *kref)
 {
        struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
 
-       dout("rbd_release_client %p\n", rbdc);
+       dout("%s: rbdc %p\n", __func__, rbdc);
        spin_lock(&rbd_client_list_lock);
        list_del(&rbdc->node);
        spin_unlock(&rbd_client_list_lock);
@@ -780,7 +809,8 @@ static int rbd_dev_set_mapping(struct rbd_device *rbd_dev)
                        goto done;
                rbd_dev->mapping.read_only = true;
        }
-       atomic_set(&rbd_dev->exists, 1);
+       set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
+
 done:
        return ret;
 }
@@ -1036,6 +1066,8 @@ out_err:
 
 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
 {
+       dout("%s: obj %p (was %d)\n", __func__, obj_request,
+               atomic_read(&obj_request->kref.refcount));
        kref_get(&obj_request->kref);
 }
 
@@ -1043,11 +1075,15 @@ static void rbd_obj_request_destroy(struct kref *kref);
 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
 {
        rbd_assert(obj_request != NULL);
+       dout("%s: obj %p (was %d)\n", __func__, obj_request,
+               atomic_read(&obj_request->kref.refcount));
        kref_put(&obj_request->kref, rbd_obj_request_destroy);
 }
 
 static void rbd_img_request_get(struct rbd_img_request *img_request)
 {
+       dout("%s: img %p (was %d)\n", __func__, img_request,
+               atomic_read(&img_request->kref.refcount));
        kref_get(&img_request->kref);
 }
 
@@ -1055,34 +1091,48 @@ static void rbd_img_request_destroy(struct kref *kref);
 static void rbd_img_request_put(struct rbd_img_request *img_request)
 {
        rbd_assert(img_request != NULL);
+       dout("%s: img %p (was %d)\n", __func__, img_request,
+               atomic_read(&img_request->kref.refcount));
        kref_put(&img_request->kref, rbd_img_request_destroy);
 }
 
 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
                                        struct rbd_obj_request *obj_request)
 {
+       rbd_assert(obj_request->img_request == NULL);
+
        rbd_obj_request_get(obj_request);
        obj_request->img_request = img_request;
-       list_add_tail(&obj_request->links, &img_request->obj_requests);
-       obj_request->which = img_request->obj_request_count++;
+       obj_request->which = img_request->obj_request_count;
        rbd_assert(obj_request->which != BAD_WHICH);
+       img_request->obj_request_count++;
+       list_add_tail(&obj_request->links, &img_request->obj_requests);
+       dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
+               obj_request->which);
 }
 
 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
                                        struct rbd_obj_request *obj_request)
 {
        rbd_assert(obj_request->which != BAD_WHICH);
-       obj_request->which = BAD_WHICH;
+
+       dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
+               obj_request->which);
        list_del(&obj_request->links);
+       rbd_assert(img_request->obj_request_count > 0);
+       img_request->obj_request_count--;
+       rbd_assert(obj_request->which == img_request->obj_request_count);
+       obj_request->which = BAD_WHICH;
        rbd_assert(obj_request->img_request == img_request);
-       obj_request->callback = NULL;
        obj_request->img_request = NULL;
+       obj_request->callback = NULL;
        rbd_obj_request_put(obj_request);
 }
 
 static bool obj_request_type_valid(enum obj_request_type type)
 {
        switch (type) {
+       case OBJ_REQUEST_NODATA:
        case OBJ_REQUEST_BIO:
        case OBJ_REQUEST_PAGES:
                return true;
@@ -1091,7 +1141,7 @@ static bool obj_request_type_valid(enum obj_request_type type)
        }
 }
 
-struct ceph_osd_req_op *rbd_osd_req_op_create(u16 opcode, ...)
+static struct ceph_osd_req_op *rbd_osd_req_op_create(u16 opcode, ...)
 {
        struct ceph_osd_req_op *op;
        va_list args;
@@ -1112,6 +1162,8 @@ struct ceph_osd_req_op *rbd_osd_req_op_create(u16 opcode, ...)
                if (opcode == CEPH_OSD_OP_WRITE)
                        op->payload_len = op->extent.length;
                break;
+       case CEPH_OSD_OP_STAT:
+               break;
        case CEPH_OSD_OP_CALL:
                /* rbd_osd_req_op_create(CALL, class, method, data, datalen) */
                op->cls.class_name = va_arg(args, char *);
@@ -1159,140 +1211,17 @@ static void rbd_osd_req_op_destroy(struct ceph_osd_req_op *op)
        kfree(op);
 }
 
-/*
- * Send ceph osd request
- */
-static int rbd_do_request(struct request *rq,
-                         struct rbd_device *rbd_dev,
-                         struct ceph_snap_context *snapc,
-                         u64 snapid,
-                         const char *object_name, u64 ofs, u64 len,
-                         struct bio *bio,
-                         struct page **pages,
-                         int num_pages,
-                         int flags,
-                         struct ceph_osd_req_op *op,
-                         void (*rbd_cb)(struct ceph_osd_request *,
-                                        struct ceph_msg *),
-                         u64 *ver)
-{
-       struct ceph_osd_client *osdc;
-       struct ceph_osd_request *osd_req;
-       struct timespec mtime = CURRENT_TIME;
-       int ret;
-
-       dout("rbd_do_request object_name=%s ofs=%llu len=%llu\n",
-               object_name, (unsigned long long) ofs,
-               (unsigned long long) len);
-
-       osdc = &rbd_dev->rbd_client->client->osdc;
-       osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_NOIO);
-       if (!osd_req)
-               return -ENOMEM;
-
-       osd_req->r_flags = flags;
-       osd_req->r_pages = pages;
-       if (bio) {
-               osd_req->r_bio = bio;
-               bio_get(osd_req->r_bio);
-       }
-
-       osd_req->r_callback = rbd_cb;
-       osd_req->r_priv = NULL;
-
-       strncpy(osd_req->r_oid, object_name, sizeof(osd_req->r_oid));
-       osd_req->r_oid_len = strlen(osd_req->r_oid);
-
-       osd_req->r_file_layout = rbd_dev->layout;       /* struct */
-       osd_req->r_num_pages = calc_pages_for(ofs, len);
-       osd_req->r_page_alignment = ofs & ~PAGE_MASK;
-
-       ceph_osdc_build_request(osd_req, ofs, len, 1, op,
-                               snapc, snapid, &mtime);
-
-       if (op->op == CEPH_OSD_OP_WATCH && op->watch.flag) {
-               ceph_osdc_set_request_linger(osdc, osd_req);
-               rbd_dev->watch_request = osd_req;
-       }
-
-       ret = ceph_osdc_start_request(osdc, osd_req, false);
-       if (ret < 0)
-               goto done_err;
-
-       if (!rbd_cb) {
-               u64 version;
-
-               ret = ceph_osdc_wait_request(osdc, osd_req);
-               version = le64_to_cpu(osd_req->r_reassert_version.version);
-               if (ver)
-                       *ver = version;
-               dout("reassert_ver=%llu\n", (unsigned long long) version);
-               ceph_osdc_put_request(osd_req);
-       }
-       return ret;
-
-done_err:
-       if (bio)
-               bio_chain_put(osd_req->r_bio);
-       ceph_osdc_put_request(osd_req);
-
-       return ret;
-}
-
-static void rbd_simple_req_cb(struct ceph_osd_request *osd_req,
-                               struct ceph_msg *msg)
-{
-       ceph_osdc_put_request(osd_req);
-}
-
-/*
- * Do a synchronous ceph osd operation
- */
-static int rbd_req_sync_op(struct rbd_device *rbd_dev,
-                          int flags,
-                          struct ceph_osd_req_op *op,
-                          const char *object_name,
-                          u64 ofs, u64 inbound_size,
-                          char *inbound,
-                          u64 *ver)
-{
-       int ret;
-       struct page **pages;
-       int num_pages;
-
-       rbd_assert(op != NULL);
-
-       num_pages = calc_pages_for(ofs, inbound_size);
-       pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
-       if (IS_ERR(pages))
-               return PTR_ERR(pages);
-
-       ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
-                         object_name, ofs, inbound_size, NULL,
-                         pages, num_pages,
-                         flags,
-                         op,
-                         NULL,
-                         ver);
-       if (ret < 0)
-               goto done;
-
-       if ((flags & CEPH_OSD_FLAG_READ) && inbound)
-               ret = ceph_copy_from_page_vector(pages, inbound, ofs, ret);
-
-done:
-       ceph_release_page_vector(pages, num_pages);
-       return ret;
-}
-
 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
                                struct rbd_obj_request *obj_request)
 {
+       dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
+
        return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
 }
 
 static void rbd_img_request_complete(struct rbd_img_request *img_request)
 {
+       dout("%s: img %p\n", __func__, img_request);
        if (img_request->callback)
                img_request->callback(img_request);
        else
@@ -1303,202 +1232,138 @@ static void rbd_img_request_complete(struct rbd_img_request *img_request)
 
 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
 {
+       dout("%s: obj %p\n", __func__, obj_request);
+
        return wait_for_completion_interruptible(&obj_request->completion);
 }
 
-static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
+static void obj_request_done_init(struct rbd_obj_request *obj_request)
 {
-       if (obj_request->callback)
-               obj_request->callback(obj_request);
-       else
-               complete_all(&obj_request->completion);
+       atomic_set(&obj_request->done, 0);
+       smp_wmb();
 }
 
-/*
- * Request sync osd watch
- */
-static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
-                                  u64 ver,
-                                  u64 notify_id)
+static void obj_request_done_set(struct rbd_obj_request *obj_request)
 {
-       struct ceph_osd_req_op *op;
-       int ret;
-
-       op = rbd_osd_req_op_create(CEPH_OSD_OP_NOTIFY_ACK, notify_id, ver);
-       if (!op)
-               return -ENOMEM;
-
-       ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
-                         rbd_dev->header_name, 0, 0, NULL,
-                         NULL, 0,
-                         CEPH_OSD_FLAG_READ,
-                         op,
-                         rbd_simple_req_cb, NULL);
+       int done;
 
-       rbd_osd_req_op_destroy(op);
+       done = atomic_inc_return(&obj_request->done);
+       if (done > 1) {
+               struct rbd_img_request *img_request = obj_request->img_request;
+               struct rbd_device *rbd_dev;
 
-       return ret;
+               rbd_dev = img_request ? img_request->rbd_dev : NULL;
+               rbd_warn(rbd_dev, "obj_request %p was already done\n",
+                       obj_request);
+       }
 }
 
-static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
+static bool obj_request_done_test(struct rbd_obj_request *obj_request)
 {
-       struct rbd_device *rbd_dev = (struct rbd_device *)data;
-       u64 hver;
-       int rc;
-
-       if (!rbd_dev)
-               return;
-
-       dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
-               rbd_dev->header_name, (unsigned long long) notify_id,
-               (unsigned int) opcode);
-       rc = rbd_dev_refresh(rbd_dev, &hver);
-       if (rc)
-               rbd_warn(rbd_dev, "got notification but failed to "
-                          " update snaps: %d\n", rc);
-
-       rbd_req_sync_notify_ack(rbd_dev, hver, notify_id);
+       smp_mb();
+       return atomic_read(&obj_request->done) != 0;
 }
 
-/*
- * Request sync osd watch/unwatch.  The value of "start" determines
- * whether a watch request is being initiated or torn down.
- */
-static int rbd_req_sync_watch(struct rbd_device *rbd_dev, int start)
+static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
 {
-       struct ceph_osd_req_op *op;
-       int ret = 0;
-
-       rbd_assert(start ^ !!rbd_dev->watch_event);
-       rbd_assert(start ^ !!rbd_dev->watch_request);
-
-       if (start) {
-               struct ceph_osd_client *osdc;
-
-               osdc = &rbd_dev->rbd_client->client->osdc;
-               ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0, rbd_dev,
-                                               &rbd_dev->watch_event);
-               if (ret < 0)
-                       return ret;
-       }
-
-       op = rbd_osd_req_op_create(CEPH_OSD_OP_WATCH,
-                               rbd_dev->watch_event->cookie,
-                               rbd_dev->header.obj_version, start);
-       if (op)
-               ret = rbd_req_sync_op(rbd_dev,
-                             CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
-                             op, rbd_dev->header_name,
-                             0, 0, NULL, NULL);
-
-       /* Cancel the event if we're tearing down, or on error */
-
-       if (!start || !op || ret < 0) {
-               ceph_osdc_cancel_event(rbd_dev->watch_event);
-               rbd_dev->watch_event = NULL;
-       }
-       rbd_osd_req_op_destroy(op);
-
-       return ret;
+       dout("%s: obj %p cb %p\n", __func__, obj_request,
+               obj_request->callback);
+       if (obj_request->callback)
+               obj_request->callback(obj_request);
+       else
+               complete_all(&obj_request->completion);
 }
 
-/*
- * Synchronous osd object method call
- */
-static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
-                            const char *object_name,
-                            const char *class_name,
-                            const char *method_name,
-                            const char *outbound,
-                            size_t outbound_size,
-                            char *inbound,
-                            size_t inbound_size,
-                            u64 *ver)
+static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
 {
-       struct ceph_osd_req_op *op;
-       int ret;
-
-       /*
-        * Any input parameters required by the method we're calling
-        * will be sent along with the class and method names as
-        * part of the message payload.  That data and its size are
-        * supplied via the indata and indata_len fields (named from
-        * the perspective of the server side) in the OSD request
-        * operation.
-        */
-       op = rbd_osd_req_op_create(CEPH_OSD_OP_CALL, class_name,
-                                       method_name, outbound, outbound_size);
-       if (!op)
-               return -ENOMEM;
-
-       ret = rbd_req_sync_op(rbd_dev, CEPH_OSD_FLAG_READ, op,
-                              object_name, 0, inbound_size, inbound,
-                              ver);
-
-       rbd_osd_req_op_destroy(op);
-
-       dout("cls_exec returned %d\n", ret);
-       return ret;
+       dout("%s: obj %p\n", __func__, obj_request);
+       obj_request_done_set(obj_request);
 }
 
-static void rbd_osd_read_callback(struct rbd_obj_request *obj_request,
-                               struct ceph_osd_op *op)
+static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
 {
-       u64 xferred;
-
+       dout("%s: obj %p result %d %llu/%llu\n", __func__, obj_request,
+               obj_request->result, obj_request->xferred, obj_request->length);
        /*
-        * We support a 64-bit length, but ultimately it has to be
-        * passed to blk_end_request(), which takes an unsigned int.
+        * ENOENT means a hole in the object.  We zero-fill the
+        * entire length of the request.  A short read also implies
+        * zero-fill to the end of the request.  Either way we
+        * update the xferred count to indicate the whole request
+        * was satisfied.
         */
-       xferred = le64_to_cpu(op->extent.length);
-       rbd_assert(xferred < (u64) UINT_MAX);
-       if (obj_request->result == (s32) -ENOENT) {
+       if (obj_request->result == -ENOENT) {
                zero_bio_chain(obj_request->bio_list, 0);
                obj_request->result = 0;
-       } else if (xferred < obj_request->length && !obj_request->result) {
-               zero_bio_chain(obj_request->bio_list, xferred);
-               xferred = obj_request->length;
+               obj_request->xferred = obj_request->length;
+       } else if (obj_request->xferred < obj_request->length &&
+                       !obj_request->result) {
+               zero_bio_chain(obj_request->bio_list, obj_request->xferred);
+               obj_request->xferred = obj_request->length;
        }
-       obj_request->xferred = xferred;
-       atomic_set(&obj_request->done, 1);
+       obj_request_done_set(obj_request);
+}
+
+static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
+{
+       dout("%s: obj %p result %d %llu\n", __func__, obj_request,
+               obj_request->result, obj_request->length);
+       /*
+        * There is no such thing as a successful short write.
+        * Our xferred value is the number of bytes transferred
+        * back.  Set it to our originally-requested length.
+        */
+       obj_request->xferred = obj_request->length;
+       obj_request_done_set(obj_request);
 }
 
-static void rbd_osd_write_callback(struct rbd_obj_request *obj_request,
-                               struct ceph_osd_op *op)
+/*
+ * For a simple stat call there's nothing to do.  We'll do more if
+ * this is part of a write sequence for a layered image.
+ */
+static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
 {
-       obj_request->xferred = le64_to_cpu(op->extent.length);
-       atomic_set(&obj_request->done, 1);
+       dout("%s: obj %p\n", __func__, obj_request);
+       obj_request_done_set(obj_request);
 }
 
 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
                                struct ceph_msg *msg)
 {
        struct rbd_obj_request *obj_request = osd_req->r_priv;
-       struct ceph_osd_reply_head *reply_head;
-       struct ceph_osd_op *op;
-       u32 num_ops;
        u16 opcode;
 
+       dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
        rbd_assert(osd_req == obj_request->osd_req);
        rbd_assert(!!obj_request->img_request ^
                                (obj_request->which == BAD_WHICH));
 
-       obj_request->xferred = le32_to_cpu(msg->hdr.data_len);
-       reply_head = msg->front.iov_base;
-       obj_request->result = (s32) le32_to_cpu(reply_head->result);
+       if (osd_req->r_result < 0)
+               obj_request->result = osd_req->r_result;
        obj_request->version = le64_to_cpu(osd_req->r_reassert_version.version);
 
-       num_ops = le32_to_cpu(reply_head->num_ops);
-       WARN_ON(num_ops != 1);  /* For now */
+       WARN_ON(osd_req->r_num_ops != 1);       /* For now */
 
-       op = &reply_head->ops[0];
-       opcode = le16_to_cpu(op->op);
+       /*
+        * We support a 64-bit length, but ultimately it has to be
+        * passed to blk_end_request(), which takes an unsigned int.
+        */
+       obj_request->xferred = osd_req->r_reply_op_len[0];
+       rbd_assert(obj_request->xferred < (u64) UINT_MAX);
+       opcode = osd_req->r_request_ops[0].op;
        switch (opcode) {
        case CEPH_OSD_OP_READ:
-               rbd_osd_read_callback(obj_request, op);
+               rbd_osd_read_callback(obj_request);
                break;
        case CEPH_OSD_OP_WRITE:
-               rbd_osd_write_callback(obj_request, op);
+               rbd_osd_write_callback(obj_request);
+               break;
+       case CEPH_OSD_OP_STAT:
+               rbd_osd_stat_callback(obj_request);
+               break;
+       case CEPH_OSD_OP_CALL:
+       case CEPH_OSD_OP_NOTIFY_ACK:
+       case CEPH_OSD_OP_WATCH:
+               rbd_osd_trivial_callback(obj_request);
                break;
        default:
                rbd_warn(NULL, "%s: unsupported op %hu\n",
@@ -1506,7 +1371,7 @@ static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
                break;
        }
 
-       if (atomic_read(&obj_request->done))
+       if (obj_request_done_test(obj_request))
                rbd_obj_request_complete(obj_request);
 }
 
@@ -1543,12 +1408,11 @@ static struct ceph_osd_request *rbd_osd_req_create(
 
        rbd_assert(obj_request_type_valid(obj_request->type));
        switch (obj_request->type) {
+       case OBJ_REQUEST_NODATA:
+               break;          /* Nothing to do */
        case OBJ_REQUEST_BIO:
                rbd_assert(obj_request->bio_list != NULL);
                osd_req->r_bio = obj_request->bio_list;
-               bio_get(osd_req->r_bio);
-               /* osd client requires "num pages" even for bio */
-               osd_req->r_num_pages = calc_pages_for(offset, length);
                break;
        case OBJ_REQUEST_PAGES:
                osd_req->r_pages = obj_request->pages;
@@ -1614,10 +1478,13 @@ static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
        obj_request->which = BAD_WHICH;
        obj_request->type = type;
        INIT_LIST_HEAD(&obj_request->links);
-       atomic_set(&obj_request->done, 0);
+       obj_request_done_init(obj_request);
        init_completion(&obj_request->completion);
        kref_init(&obj_request->kref);
 
+       dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
+               offset, length, (int)type, obj_request);
+
        return obj_request;
 }
 
@@ -1627,6 +1494,8 @@ static void rbd_obj_request_destroy(struct kref *kref)
 
        obj_request = container_of(kref, struct rbd_obj_request, kref);
 
+       dout("%s: obj %p\n", __func__, obj_request);
+
        rbd_assert(obj_request->img_request == NULL);
        rbd_assert(obj_request->which == BAD_WHICH);
 
@@ -1635,6 +1504,8 @@ static void rbd_obj_request_destroy(struct kref *kref)
 
        rbd_assert(obj_request_type_valid(obj_request->type));
        switch (obj_request->type) {
+       case OBJ_REQUEST_NODATA:
+               break;          /* Nothing to do */
        case OBJ_REQUEST_BIO:
                if (obj_request->bio_list)
                        bio_chain_put(obj_request->bio_list);
@@ -1654,7 +1525,8 @@ static void rbd_obj_request_destroy(struct kref *kref)
  * that comprises the image request, and the Linux request pointer
  * (if there is one).
  */
-struct rbd_img_request *rbd_img_request_create(struct rbd_device *rbd_dev,
+static struct rbd_img_request *rbd_img_request_create(
+                                       struct rbd_device *rbd_dev,
                                        u64 offset, u64 length,
                                        bool write_request)
 {
@@ -1694,6 +1566,10 @@ struct rbd_img_request *rbd_img_request_create(struct rbd_device *rbd_dev,
        rbd_img_request_get(img_request);       /* Avoid a warning */
        rbd_img_request_put(img_request);       /* TEMPORARY */
 
+       dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
+               write_request ? "write" : "read", offset, length,
+               img_request);
+
        return img_request;
 }
 
@@ -1705,8 +1581,11 @@ static void rbd_img_request_destroy(struct kref *kref)
 
        img_request = container_of(kref, struct rbd_img_request, kref);
 
+       dout("%s: img %p\n", __func__, img_request);
+
        for_each_obj_request_safe(img_request, obj_request, next_obj_request)
                rbd_img_obj_request_del(img_request, obj_request);
+       rbd_assert(img_request->obj_request_count == 0);
 
        if (img_request->write_request)
                ceph_put_snap_context(img_request->snapc);
@@ -1725,12 +1604,15 @@ static int rbd_img_request_fill_bio(struct rbd_img_request *img_request,
        u64 resid;
        u16 opcode;
 
+       dout("%s: img %p bio %p\n", __func__, img_request, bio_list);
+
        opcode = img_request->write_request ? CEPH_OSD_OP_WRITE
                                              : CEPH_OSD_OP_READ;
        bio_offset = 0;
        image_offset = img_request->offset;
        rbd_assert(image_offset == bio_list->bi_sector << SECTOR_SHIFT);
        resid = img_request->length;
+       rbd_assert(resid > 0);
        while (resid) {
                const char *object_name;
                unsigned int clone_size;
@@ -1798,8 +1680,11 @@ static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
        bool more = true;
 
        img_request = obj_request->img_request;
+
+       dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
        rbd_assert(img_request != NULL);
        rbd_assert(img_request->rq != NULL);
+       rbd_assert(img_request->obj_request_count > 0);
        rbd_assert(which != BAD_WHICH);
        rbd_assert(which < img_request->obj_request_count);
        rbd_assert(which >= img_request->next_completion);
@@ -1815,7 +1700,7 @@ static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
                rbd_assert(more);
                rbd_assert(which < img_request->obj_request_count);
 
-               if (!atomic_read(&obj_request->done))
+               if (!obj_request_done_test(obj_request))
                        break;
 
                rbd_assert(obj_request->xferred <= (u64) UINT_MAX);
@@ -1829,6 +1714,7 @@ static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
                more = blk_end_request(img_request->rq, result, xferred);
                which++;
        }
+
        rbd_assert(more ^ (which == img_request->obj_request_count));
        img_request->next_completion = which;
 out:
@@ -1844,6 +1730,7 @@ static int rbd_img_request_submit(struct rbd_img_request *img_request)
        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
        struct rbd_obj_request *obj_request;
 
+       dout("%s: img %p\n", __func__, img_request);
        for_each_obj_request(img_request, obj_request) {
                int ret;
 
@@ -1862,7 +1749,219 @@ static int rbd_img_request_submit(struct rbd_img_request *img_request)
        return 0;
 }
 
+static int rbd_obj_notify_ack(struct rbd_device *rbd_dev,
+                                  u64 ver, u64 notify_id)
+{
+       struct rbd_obj_request *obj_request;
+       struct ceph_osd_req_op *op;
+       struct ceph_osd_client *osdc;
+       int ret;
+
+       obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
+                                                       OBJ_REQUEST_NODATA);
+       if (!obj_request)
+               return -ENOMEM;
+
+       ret = -ENOMEM;
+       op = rbd_osd_req_op_create(CEPH_OSD_OP_NOTIFY_ACK, notify_id, ver);
+       if (!op)
+               goto out;
+       obj_request->osd_req = rbd_osd_req_create(rbd_dev, false,
+                                               obj_request, op);
+       rbd_osd_req_op_destroy(op);
+       if (!obj_request->osd_req)
+               goto out;
+
+       osdc = &rbd_dev->rbd_client->client->osdc;
+       obj_request->callback = rbd_obj_request_put;
+       ret = rbd_obj_request_submit(osdc, obj_request);
+out:
+       if (ret)
+               rbd_obj_request_put(obj_request);
+
+       return ret;
+}
+
+static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
+{
+       struct rbd_device *rbd_dev = (struct rbd_device *)data;
+       u64 hver;
+       int rc;
+
+       if (!rbd_dev)
+               return;
+
+       dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
+               rbd_dev->header_name, (unsigned long long) notify_id,
+               (unsigned int) opcode);
+       rc = rbd_dev_refresh(rbd_dev, &hver);
+       if (rc)
+               rbd_warn(rbd_dev, "got notification but failed to "
+                          " update snaps: %d\n", rc);
+
+       rbd_obj_notify_ack(rbd_dev, hver, notify_id);
+}
+
+/*
+ * Request sync osd watch/unwatch.  The value of "start" determines
+ * whether a watch request is being initiated or torn down.
+ */
+static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
+{
+       struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
+       struct rbd_obj_request *obj_request;
+       struct ceph_osd_req_op *op;
+       int ret;
+
+       rbd_assert(start ^ !!rbd_dev->watch_event);
+       rbd_assert(start ^ !!rbd_dev->watch_request);
+
+       if (start) {
+               ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
+                                               &rbd_dev->watch_event);
+               if (ret < 0)
+                       return ret;
+               rbd_assert(rbd_dev->watch_event != NULL);
+       }
+
+       ret = -ENOMEM;
+       obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
+                                                       OBJ_REQUEST_NODATA);
+       if (!obj_request)
+               goto out_cancel;
+
+       op = rbd_osd_req_op_create(CEPH_OSD_OP_WATCH,
+                               rbd_dev->watch_event->cookie,
+                               rbd_dev->header.obj_version, start);
+       if (!op)
+               goto out_cancel;
+       obj_request->osd_req = rbd_osd_req_create(rbd_dev, true,
+                                                       obj_request, op);
+       rbd_osd_req_op_destroy(op);
+       if (!obj_request->osd_req)
+               goto out_cancel;
+
+       if (start)
+               ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
+       else
+               ceph_osdc_unregister_linger_request(osdc,
+                                       rbd_dev->watch_request->osd_req);
+       ret = rbd_obj_request_submit(osdc, obj_request);
+       if (ret)
+               goto out_cancel;
+       ret = rbd_obj_request_wait(obj_request);
+       if (ret)
+               goto out_cancel;
+       ret = obj_request->result;
+       if (ret)
+               goto out_cancel;
+
+       /*
+        * A watch request is set to linger, so the underlying osd
+        * request won't go away until we unregister it.  We retain
+        * a pointer to the object request during that time (in
+        * rbd_dev->watch_request), so we'll keep a reference to
+        * it.  We'll drop that reference (below) after we've
+        * unregistered it.
+        */
+       if (start) {
+               rbd_dev->watch_request = obj_request;
+
+               return 0;
+       }
+
+       /* We have successfully torn down the watch request */
+
+       rbd_obj_request_put(rbd_dev->watch_request);
+       rbd_dev->watch_request = NULL;
+out_cancel:
+       /* Cancel the event if we're tearing down, or on error */
+       ceph_osdc_cancel_event(rbd_dev->watch_event);
+       rbd_dev->watch_event = NULL;
+       if (obj_request)
+               rbd_obj_request_put(obj_request);
+
+       return ret;
+}
+
+/*
+ * Synchronous osd object method call
+ */
+static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
+                            const char *object_name,
+                            const char *class_name,
+                            const char *method_name,
+                            const char *outbound,
+                            size_t outbound_size,
+                            char *inbound,
+                            size_t inbound_size,
+                            u64 *version)
+{
+       struct rbd_obj_request *obj_request;
+       struct ceph_osd_client *osdc;
+       struct ceph_osd_req_op *op;
+       struct page **pages;
+       u32 page_count;
+       int ret;
+
+       /*
+        * Method calls are ultimately read operations but they
+        * don't involve object data (so no offset or length).
+        * The result should placed into the inbound buffer
+        * provided.  They also supply outbound data--parameters for
+        * the object method.  Currently if this is present it will
+        * be a snapshot id.
+        */
+       page_count = (u32) calc_pages_for(0, inbound_size);
+       pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
+       if (IS_ERR(pages))
+               return PTR_ERR(pages);
+
+       ret = -ENOMEM;
+       obj_request = rbd_obj_request_create(object_name, 0, 0,
+                                                       OBJ_REQUEST_PAGES);
+       if (!obj_request)
+               goto out;
+
+       obj_request->pages = pages;
+       obj_request->page_count = page_count;
+
+       op = rbd_osd_req_op_create(CEPH_OSD_OP_CALL, class_name,
+                                       method_name, outbound, outbound_size);
+       if (!op)
+               goto out;
+       obj_request->osd_req = rbd_osd_req_create(rbd_dev, false,
+                                               obj_request, op);
+       rbd_osd_req_op_destroy(op);
+       if (!obj_request->osd_req)
+               goto out;
+
+       osdc = &rbd_dev->rbd_client->client->osdc;
+       ret = rbd_obj_request_submit(osdc, obj_request);
+       if (ret)
+               goto out;
+       ret = rbd_obj_request_wait(obj_request);
+       if (ret)
+               goto out;
+
+       ret = obj_request->result;
+       if (ret < 0)
+               goto out;
+       ret = 0;
+       ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
+       if (version)
+               *version = obj_request->version;
+out:
+       if (obj_request)
+               rbd_obj_request_put(obj_request);
+       else
+               ceph_release_page_vector(pages, page_count);
+
+       return ret;
+}
+
 static void rbd_request_fn(struct request_queue *q)
+               __releases(q->queue_lock) __acquires(q->queue_lock)
 {
        struct rbd_device *rbd_dev = q->queuedata;
        bool read_only = rbd_dev->mapping.read_only;
@@ -1878,6 +1977,19 @@ static void rbd_request_fn(struct request_queue *q)
                /* Ignore any non-FS requests that filter through. */
 
                if (rq->cmd_type != REQ_TYPE_FS) {
+                       dout("%s: non-fs request type %d\n", __func__,
+                               (int) rq->cmd_type);
+                       __blk_end_request_all(rq, 0);
+                       continue;
+               }
+
+               /* Ignore/skip any zero-length requests */
+
+               offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
+               length = (u64) blk_rq_bytes(rq);
+
+               if (!length) {
+                       dout("%s: zero-length request\n", __func__);
                        __blk_end_request_all(rq, 0);
                        continue;
                }
@@ -1893,18 +2005,20 @@ static void rbd_request_fn(struct request_queue *q)
                        rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
                }
 
-               /* Quit early if the snapshot has disappeared */
-
-               if (!atomic_read(&rbd_dev->exists)) {
+               /*
+                * Quit early if the mapped snapshot no longer
+                * exists.  It's still possible the snapshot will
+                * have disappeared by the time our request arrives
+                * at the osd, but there's no sense in sending it if
+                * we already know.
+                */
+               if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
                        dout("request for non-existent snapshot");
                        rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
                        result = -ENXIO;
                        goto end_request;
                }
 
-               offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
-               length = (u64) blk_rq_bytes(rq);
-
                result = -EINVAL;
                if (WARN_ON(offset && length > U64_MAX - offset + 1))
                        goto end_request;       /* Shouldn't happen */
@@ -2003,6 +2117,7 @@ static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
        struct ceph_osd_client *osdc;
        struct page **pages = NULL;
        u32 page_count;
+       size_t size;
        int ret;
 
        page_count = (u32) calc_pages_for(offset, length);
@@ -2012,7 +2127,7 @@ static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
 
        ret = -ENOMEM;
        obj_request = rbd_obj_request_create(object_name, offset, length,
-                                               OBJ_REQUEST_PAGES);
+                                                       OBJ_REQUEST_PAGES);
        if (!obj_request)
                goto out;
 
@@ -2039,7 +2154,12 @@ static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
        ret = obj_request->result;
        if (ret < 0)
                goto out;
-       ret = ceph_copy_from_page_vector(pages, buf, 0, obj_request->xferred);
+
+       rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
+       size = (size_t) obj_request->xferred;
+       ceph_copy_from_page_vector(pages, buf, 0, size);
+       rbd_assert(size <= (size_t) INT_MAX);
+       ret = (int) size;
        if (version)
                *version = obj_request->version;
 out:
@@ -2092,7 +2212,6 @@ rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
                ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
                                       0, size,
                                       (char *) ondisk, version);
-
                if (ret < 0)
                        goto out_err;
                if (WARN_ON((size_t) ret < size)) {
@@ -2577,7 +2696,7 @@ static void rbd_spec_free(struct kref *kref)
        kfree(spec);
 }
 
-struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
+static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
                                struct rbd_spec *spec)
 {
        struct rbd_device *rbd_dev;
@@ -2587,7 +2706,7 @@ struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
                return NULL;
 
        spin_lock_init(&rbd_dev->lock);
-       atomic_set(&rbd_dev->exists, 0);
+       rbd_dev->flags = 0;
        INIT_LIST_HEAD(&rbd_dev->node);
        INIT_LIST_HEAD(&rbd_dev->snaps);
        init_rwsem(&rbd_dev->header_rwsem);
@@ -2712,11 +2831,11 @@ static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
                __le64 size;
        } __attribute__ ((packed)) size_buf = { 0 };
 
-       ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
+       ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
                                "rbd", "get_size",
                                (char *) &snapid, sizeof (snapid),
                                (char *) &size_buf, sizeof (size_buf), NULL);
-       dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
+       dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
        if (ret < 0)
                return ret;
 
@@ -2747,14 +2866,13 @@ static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
        if (!reply_buf)
                return -ENOMEM;
 
-       ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
+       ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
                                "rbd", "get_object_prefix",
                                NULL, 0,
                                reply_buf, RBD_OBJ_PREFIX_LEN_MAX, NULL);
-       dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
+       dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
        if (ret < 0)
                goto out;
-       ret = 0;    /* rbd_req_sync_exec() can return positive */
 
        p = reply_buf;
        rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
@@ -2785,12 +2903,12 @@ static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
        u64 incompat;
        int ret;
 
-       ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
+       ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
                                "rbd", "get_features",
                                (char *) &snapid, sizeof (snapid),
                                (char *) &features_buf, sizeof (features_buf),
                                NULL);
-       dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
+       dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
        if (ret < 0)
                return ret;
 
@@ -2841,11 +2959,11 @@ static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
        }
 
        snapid = cpu_to_le64(CEPH_NOSNAP);
-       ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
+       ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
                                "rbd", "get_parent",
                                (char *) &snapid, sizeof (snapid),
                                (char *) reply_buf, size, NULL);
-       dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
+       dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
        if (ret < 0)
                goto out_err;
 
@@ -2912,7 +3030,7 @@ static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
        if (!reply_buf)
                goto out;
 
-       ret = rbd_req_sync_exec(rbd_dev, RBD_DIRECTORY,
+       ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
                                "rbd", "dir_get_name",
                                image_id, image_id_size,
                                (char *) reply_buf, size, NULL);
@@ -3018,11 +3136,11 @@ static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
        if (!reply_buf)
                return -ENOMEM;
 
-       ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
+       ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
                                "rbd", "get_snapcontext",
                                NULL, 0,
                                reply_buf, size, ver);
-       dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
+       dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
        if (ret < 0)
                goto out;
 
@@ -3087,11 +3205,11 @@ static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
                return ERR_PTR(-ENOMEM);
 
        snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
-       ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
+       ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
                                "rbd", "get_snapshot_name",
                                (char *) &snap_id, sizeof (snap_id),
                                reply_buf, size, NULL);
-       dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
+       dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
        if (ret < 0)
                goto out;
 
@@ -3216,10 +3334,17 @@ static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
                if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
                        struct list_head *next = links->next;
 
-                       /* Existing snapshot not in the new snap context */
-
+                       /*
+                        * A previously-existing snapshot is not in
+                        * the new snap context.
+                        *
+                        * If the now missing snapshot is the one the
+                        * image is mapped to, clear its exists flag
+                        * so we can avoid sending any more requests
+                        * to it.
+                        */
                        if (rbd_dev->spec->snap_id == snap->id)
-                               atomic_set(&rbd_dev->exists, 0);
+                               clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
                        rbd_remove_snap_dev(snap);
                        dout("%ssnap id %llu has been removed\n",
                                rbd_dev->spec->snap_id == snap->id ?
@@ -3293,7 +3418,7 @@ static int rbd_dev_snaps_register(struct rbd_device *rbd_dev)
        struct rbd_snap *snap;
        int ret = 0;
 
-       dout("%s called\n", __func__);
+       dout("%s:\n", __func__);
        if (WARN_ON(!device_is_registered(&rbd_dev->dev)))
                return -EIO;
 
@@ -3679,14 +3804,13 @@ static int rbd_dev_image_id(struct rbd_device *rbd_dev)
                goto out;
        }
 
-       ret = rbd_req_sync_exec(rbd_dev, object_name,
+       ret = rbd_obj_method_sync(rbd_dev, object_name,
                                "rbd", "get_id",
                                NULL, 0,
                                response, RBD_IMAGE_ID_LEN_MAX, NULL);
-       dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
+       dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
        if (ret < 0)
                goto out;
-       ret = 0;    /* rbd_req_sync_exec() can return positive */
 
        p = response;
        rbd_dev->spec->image_id = ceph_extract_encoded_string(&p,
@@ -3879,7 +4003,7 @@ static int rbd_dev_probe_finish(struct rbd_device *rbd_dev)
        if (ret)
                goto err_out_bus;
 
-       ret = rbd_req_sync_watch(rbd_dev, 1);
+       ret = rbd_dev_header_watch_sync(rbd_dev, 1);
        if (ret)
                goto err_out_bus;
 
@@ -4035,14 +4159,8 @@ static void rbd_dev_release(struct device *dev)
 {
        struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
 
-       if (rbd_dev->watch_request) {
-               struct ceph_client *client = rbd_dev->rbd_client->client;
-
-               ceph_osdc_unregister_linger_request(&client->osdc,
-                                                   rbd_dev->watch_request);
-       }
        if (rbd_dev->watch_event)
-               rbd_req_sync_watch(rbd_dev, 0);
+               rbd_dev_header_watch_sync(rbd_dev, 0);
 
        /* clean up and free blkdev */
        rbd_free_disk(rbd_dev);
@@ -4086,10 +4204,14 @@ static ssize_t rbd_remove(struct bus_type *bus,
                goto done;
        }
 
-       if (rbd_dev->open_count) {
+       spin_lock_irq(&rbd_dev->lock);
+       if (rbd_dev->open_count)
                ret = -EBUSY;
+       else
+               set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
+       spin_unlock_irq(&rbd_dev->lock);
+       if (ret < 0)
                goto done;
-       }
 
        rbd_remove_all_snaps(rbd_dev);
        rbd_bus_del_dev(rbd_dev);
@@ -4125,10 +4247,15 @@ static void rbd_sysfs_cleanup(void)
        device_unregister(&rbd_root_dev);
 }
 
-int __init rbd_init(void)
+static int __init rbd_init(void)
 {
        int rc;
 
+       if (!libceph_compatible(NULL)) {
+               rbd_warn(NULL, "libceph incompatibility (quitting)");
+
+               return -EINVAL;
+       }
        rc = rbd_sysfs_init();
        if (rc)
                return rc;
@@ -4136,7 +4263,7 @@ int __init rbd_init(void)
        return 0;
 }
 
-void __exit rbd_exit(void)
+static void __exit rbd_exit(void)
 {
        rbd_sysfs_cleanup();
 }