struct rbd_obj_request;
typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
-enum obj_request_type { OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES };
+enum obj_request_type {
+ OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
+};
struct rbd_obj_request {
const char *object_name;
u64 xferred; /* bytes transferred */
u64 version;
- s32 result;
+ int result;
atomic_t done;
rbd_obj_callback_t callback;
};
#define for_each_obj_request(ireq, oreq) \
- list_for_each_entry(oreq, &ireq->obj_requests, links)
+ list_for_each_entry(oreq, &(ireq)->obj_requests, links)
#define for_each_obj_request_from(ireq, oreq) \
- list_for_each_entry_from(oreq, &ireq->obj_requests, links)
+ list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
#define for_each_obj_request_safe(ireq, oreq, n) \
- list_for_each_entry_safe_reverse(oreq, n, &ireq->obj_requests, links)
+ list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
struct rbd_snap {
struct device dev;
char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
- spinlock_t lock; /* queue lock */
+ spinlock_t lock; /* queue, flags, open_count */
struct rbd_image_header header;
- atomic_t exists;
+ unsigned long flags; /* possibly lock protected */
struct rbd_spec *spec;
char *header_name;
struct ceph_file_layout layout;
struct ceph_osd_event *watch_event;
- struct ceph_osd_request *watch_request;
+ struct rbd_obj_request *watch_request;
struct rbd_spec *parent_spec;
u64 parent_overlap;
/* sysfs related */
struct device dev;
- unsigned long open_count;
+ unsigned long open_count; /* protected by lock */
+};
+
+/*
+ * Flag bits for rbd_dev->flags. If atomicity is required,
+ * rbd_dev->lock is used to protect access.
+ *
+ * Currently, only the "removing" flag (which is coupled with the
+ * "open_count" field) requires atomic access.
+ */
+enum rbd_dev_flags {
+ RBD_DEV_FLAG_EXISTS, /* mapped snapshot has not been deleted */
+ RBD_DEV_FLAG_REMOVING, /* this mapping is being removed */
};
static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
static int rbd_open(struct block_device *bdev, fmode_t mode)
{
struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
+ bool removing = false;
if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
return -EROFS;
+ spin_lock_irq(&rbd_dev->lock);
+ if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
+ removing = true;
+ else
+ rbd_dev->open_count++;
+ spin_unlock_irq(&rbd_dev->lock);
+ if (removing)
+ return -ENOENT;
+
mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
(void) get_device(&rbd_dev->dev);
set_device_ro(bdev, rbd_dev->mapping.read_only);
- rbd_dev->open_count++;
mutex_unlock(&ctl_mutex);
return 0;
static int rbd_release(struct gendisk *disk, fmode_t mode)
{
struct rbd_device *rbd_dev = disk->private_data;
+ unsigned long open_count_before;
+
+ spin_lock_irq(&rbd_dev->lock);
+ open_count_before = rbd_dev->open_count--;
+ spin_unlock_irq(&rbd_dev->lock);
+ rbd_assert(open_count_before > 0);
mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
- rbd_assert(rbd_dev->open_count > 0);
- rbd_dev->open_count--;
put_device(&rbd_dev->dev);
mutex_unlock(&ctl_mutex);
struct rbd_client *rbdc;
int ret = -ENOMEM;
- dout("rbd_client_create\n");
+ dout("%s:\n", __func__);
rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
if (!rbdc)
goto out_opt;
spin_unlock(&rbd_client_list_lock);
mutex_unlock(&ctl_mutex);
+ dout("%s: rbdc %p\n", __func__, rbdc);
- dout("rbd_client_create created %p\n", rbdc);
return rbdc;
out_err:
out_opt:
if (ceph_opts)
ceph_destroy_options(ceph_opts);
+ dout("%s: error %d\n", __func__, ret);
+
return ERR_PTR(ret);
}
{
struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
- dout("rbd_release_client %p\n", rbdc);
+ dout("%s: rbdc %p\n", __func__, rbdc);
spin_lock(&rbd_client_list_lock);
list_del(&rbdc->node);
spin_unlock(&rbd_client_list_lock);
goto done;
rbd_dev->mapping.read_only = true;
}
- atomic_set(&rbd_dev->exists, 1);
+ set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
+
done:
return ret;
}
static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
{
+ dout("%s: obj %p (was %d)\n", __func__, obj_request,
+ atomic_read(&obj_request->kref.refcount));
kref_get(&obj_request->kref);
}
static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
{
rbd_assert(obj_request != NULL);
+ dout("%s: obj %p (was %d)\n", __func__, obj_request,
+ atomic_read(&obj_request->kref.refcount));
kref_put(&obj_request->kref, rbd_obj_request_destroy);
}
static void rbd_img_request_get(struct rbd_img_request *img_request)
{
+ dout("%s: img %p (was %d)\n", __func__, img_request,
+ atomic_read(&img_request->kref.refcount));
kref_get(&img_request->kref);
}
static void rbd_img_request_put(struct rbd_img_request *img_request)
{
rbd_assert(img_request != NULL);
+ dout("%s: img %p (was %d)\n", __func__, img_request,
+ atomic_read(&img_request->kref.refcount));
kref_put(&img_request->kref, rbd_img_request_destroy);
}
static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
struct rbd_obj_request *obj_request)
{
+ rbd_assert(obj_request->img_request == NULL);
+
rbd_obj_request_get(obj_request);
obj_request->img_request = img_request;
- list_add_tail(&obj_request->links, &img_request->obj_requests);
- obj_request->which = img_request->obj_request_count++;
+ obj_request->which = img_request->obj_request_count;
rbd_assert(obj_request->which != BAD_WHICH);
+ img_request->obj_request_count++;
+ list_add_tail(&obj_request->links, &img_request->obj_requests);
+ dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
+ obj_request->which);
}
static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
struct rbd_obj_request *obj_request)
{
rbd_assert(obj_request->which != BAD_WHICH);
- obj_request->which = BAD_WHICH;
+
+ dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
+ obj_request->which);
list_del(&obj_request->links);
+ rbd_assert(img_request->obj_request_count > 0);
+ img_request->obj_request_count--;
+ rbd_assert(obj_request->which == img_request->obj_request_count);
+ obj_request->which = BAD_WHICH;
rbd_assert(obj_request->img_request == img_request);
- obj_request->callback = NULL;
obj_request->img_request = NULL;
+ obj_request->callback = NULL;
rbd_obj_request_put(obj_request);
}
static bool obj_request_type_valid(enum obj_request_type type)
{
switch (type) {
+ case OBJ_REQUEST_NODATA:
case OBJ_REQUEST_BIO:
case OBJ_REQUEST_PAGES:
return true;
}
}
-struct ceph_osd_req_op *rbd_osd_req_op_create(u16 opcode, ...)
+static struct ceph_osd_req_op *rbd_osd_req_op_create(u16 opcode, ...)
{
struct ceph_osd_req_op *op;
va_list args;
if (opcode == CEPH_OSD_OP_WRITE)
op->payload_len = op->extent.length;
break;
+ case CEPH_OSD_OP_STAT:
+ break;
case CEPH_OSD_OP_CALL:
/* rbd_osd_req_op_create(CALL, class, method, data, datalen) */
op->cls.class_name = va_arg(args, char *);
kfree(op);
}
-/*
- * Send ceph osd request
- */
-static int rbd_do_request(struct request *rq,
- struct rbd_device *rbd_dev,
- struct ceph_snap_context *snapc,
- u64 snapid,
- const char *object_name, u64 ofs, u64 len,
- struct bio *bio,
- struct page **pages,
- int num_pages,
- int flags,
- struct ceph_osd_req_op *op,
- void (*rbd_cb)(struct ceph_osd_request *,
- struct ceph_msg *),
- u64 *ver)
-{
- struct ceph_osd_client *osdc;
- struct ceph_osd_request *osd_req;
- struct timespec mtime = CURRENT_TIME;
- int ret;
-
- dout("rbd_do_request object_name=%s ofs=%llu len=%llu\n",
- object_name, (unsigned long long) ofs,
- (unsigned long long) len);
-
- osdc = &rbd_dev->rbd_client->client->osdc;
- osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_NOIO);
- if (!osd_req)
- return -ENOMEM;
-
- osd_req->r_flags = flags;
- osd_req->r_pages = pages;
- if (bio) {
- osd_req->r_bio = bio;
- bio_get(osd_req->r_bio);
- }
-
- osd_req->r_callback = rbd_cb;
- osd_req->r_priv = NULL;
-
- strncpy(osd_req->r_oid, object_name, sizeof(osd_req->r_oid));
- osd_req->r_oid_len = strlen(osd_req->r_oid);
-
- osd_req->r_file_layout = rbd_dev->layout; /* struct */
- osd_req->r_num_pages = calc_pages_for(ofs, len);
- osd_req->r_page_alignment = ofs & ~PAGE_MASK;
-
- ceph_osdc_build_request(osd_req, ofs, len, 1, op,
- snapc, snapid, &mtime);
-
- if (op->op == CEPH_OSD_OP_WATCH && op->watch.flag) {
- ceph_osdc_set_request_linger(osdc, osd_req);
- rbd_dev->watch_request = osd_req;
- }
-
- ret = ceph_osdc_start_request(osdc, osd_req, false);
- if (ret < 0)
- goto done_err;
-
- if (!rbd_cb) {
- u64 version;
-
- ret = ceph_osdc_wait_request(osdc, osd_req);
- version = le64_to_cpu(osd_req->r_reassert_version.version);
- if (ver)
- *ver = version;
- dout("reassert_ver=%llu\n", (unsigned long long) version);
- ceph_osdc_put_request(osd_req);
- }
- return ret;
-
-done_err:
- if (bio)
- bio_chain_put(osd_req->r_bio);
- ceph_osdc_put_request(osd_req);
-
- return ret;
-}
-
-static void rbd_simple_req_cb(struct ceph_osd_request *osd_req,
- struct ceph_msg *msg)
-{
- ceph_osdc_put_request(osd_req);
-}
-
-/*
- * Do a synchronous ceph osd operation
- */
-static int rbd_req_sync_op(struct rbd_device *rbd_dev,
- int flags,
- struct ceph_osd_req_op *op,
- const char *object_name,
- u64 ofs, u64 inbound_size,
- char *inbound,
- u64 *ver)
-{
- int ret;
- struct page **pages;
- int num_pages;
-
- rbd_assert(op != NULL);
-
- num_pages = calc_pages_for(ofs, inbound_size);
- pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
- if (IS_ERR(pages))
- return PTR_ERR(pages);
-
- ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
- object_name, ofs, inbound_size, NULL,
- pages, num_pages,
- flags,
- op,
- NULL,
- ver);
- if (ret < 0)
- goto done;
-
- if ((flags & CEPH_OSD_FLAG_READ) && inbound)
- ret = ceph_copy_from_page_vector(pages, inbound, ofs, ret);
-
-done:
- ceph_release_page_vector(pages, num_pages);
- return ret;
-}
-
static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
struct rbd_obj_request *obj_request)
{
+ dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
+
return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
}
static void rbd_img_request_complete(struct rbd_img_request *img_request)
{
+ dout("%s: img %p\n", __func__, img_request);
if (img_request->callback)
img_request->callback(img_request);
else
static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
{
+ dout("%s: obj %p\n", __func__, obj_request);
+
return wait_for_completion_interruptible(&obj_request->completion);
}
-static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
+static void obj_request_done_init(struct rbd_obj_request *obj_request)
{
- if (obj_request->callback)
- obj_request->callback(obj_request);
- else
- complete_all(&obj_request->completion);
+ atomic_set(&obj_request->done, 0);
+ smp_wmb();
}
-/*
- * Request sync osd watch
- */
-static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
- u64 ver,
- u64 notify_id)
+static void obj_request_done_set(struct rbd_obj_request *obj_request)
{
- struct ceph_osd_req_op *op;
- int ret;
-
- op = rbd_osd_req_op_create(CEPH_OSD_OP_NOTIFY_ACK, notify_id, ver);
- if (!op)
- return -ENOMEM;
-
- ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
- rbd_dev->header_name, 0, 0, NULL,
- NULL, 0,
- CEPH_OSD_FLAG_READ,
- op,
- rbd_simple_req_cb, NULL);
+ int done;
- rbd_osd_req_op_destroy(op);
+ done = atomic_inc_return(&obj_request->done);
+ if (done > 1) {
+ struct rbd_img_request *img_request = obj_request->img_request;
+ struct rbd_device *rbd_dev;
- return ret;
+ rbd_dev = img_request ? img_request->rbd_dev : NULL;
+ rbd_warn(rbd_dev, "obj_request %p was already done\n",
+ obj_request);
+ }
}
-static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
+static bool obj_request_done_test(struct rbd_obj_request *obj_request)
{
- struct rbd_device *rbd_dev = (struct rbd_device *)data;
- u64 hver;
- int rc;
-
- if (!rbd_dev)
- return;
-
- dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
- rbd_dev->header_name, (unsigned long long) notify_id,
- (unsigned int) opcode);
- rc = rbd_dev_refresh(rbd_dev, &hver);
- if (rc)
- rbd_warn(rbd_dev, "got notification but failed to "
- " update snaps: %d\n", rc);
-
- rbd_req_sync_notify_ack(rbd_dev, hver, notify_id);
+ smp_mb();
+ return atomic_read(&obj_request->done) != 0;
}
-/*
- * Request sync osd watch/unwatch. The value of "start" determines
- * whether a watch request is being initiated or torn down.
- */
-static int rbd_req_sync_watch(struct rbd_device *rbd_dev, int start)
+static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
{
- struct ceph_osd_req_op *op;
- int ret = 0;
-
- rbd_assert(start ^ !!rbd_dev->watch_event);
- rbd_assert(start ^ !!rbd_dev->watch_request);
-
- if (start) {
- struct ceph_osd_client *osdc;
-
- osdc = &rbd_dev->rbd_client->client->osdc;
- ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0, rbd_dev,
- &rbd_dev->watch_event);
- if (ret < 0)
- return ret;
- }
-
- op = rbd_osd_req_op_create(CEPH_OSD_OP_WATCH,
- rbd_dev->watch_event->cookie,
- rbd_dev->header.obj_version, start);
- if (op)
- ret = rbd_req_sync_op(rbd_dev,
- CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
- op, rbd_dev->header_name,
- 0, 0, NULL, NULL);
-
- /* Cancel the event if we're tearing down, or on error */
-
- if (!start || !op || ret < 0) {
- ceph_osdc_cancel_event(rbd_dev->watch_event);
- rbd_dev->watch_event = NULL;
- }
- rbd_osd_req_op_destroy(op);
-
- return ret;
+ dout("%s: obj %p cb %p\n", __func__, obj_request,
+ obj_request->callback);
+ if (obj_request->callback)
+ obj_request->callback(obj_request);
+ else
+ complete_all(&obj_request->completion);
}
-/*
- * Synchronous osd object method call
- */
-static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
- const char *object_name,
- const char *class_name,
- const char *method_name,
- const char *outbound,
- size_t outbound_size,
- char *inbound,
- size_t inbound_size,
- u64 *ver)
+static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
{
- struct ceph_osd_req_op *op;
- int ret;
-
- /*
- * Any input parameters required by the method we're calling
- * will be sent along with the class and method names as
- * part of the message payload. That data and its size are
- * supplied via the indata and indata_len fields (named from
- * the perspective of the server side) in the OSD request
- * operation.
- */
- op = rbd_osd_req_op_create(CEPH_OSD_OP_CALL, class_name,
- method_name, outbound, outbound_size);
- if (!op)
- return -ENOMEM;
-
- ret = rbd_req_sync_op(rbd_dev, CEPH_OSD_FLAG_READ, op,
- object_name, 0, inbound_size, inbound,
- ver);
-
- rbd_osd_req_op_destroy(op);
-
- dout("cls_exec returned %d\n", ret);
- return ret;
+ dout("%s: obj %p\n", __func__, obj_request);
+ obj_request_done_set(obj_request);
}
-static void rbd_osd_read_callback(struct rbd_obj_request *obj_request,
- struct ceph_osd_op *op)
+static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
{
- u64 xferred;
-
+ dout("%s: obj %p result %d %llu/%llu\n", __func__, obj_request,
+ obj_request->result, obj_request->xferred, obj_request->length);
/*
- * We support a 64-bit length, but ultimately it has to be
- * passed to blk_end_request(), which takes an unsigned int.
+ * ENOENT means a hole in the object. We zero-fill the
+ * entire length of the request. A short read also implies
+ * zero-fill to the end of the request. Either way we
+ * update the xferred count to indicate the whole request
+ * was satisfied.
*/
- xferred = le64_to_cpu(op->extent.length);
- rbd_assert(xferred < (u64) UINT_MAX);
- if (obj_request->result == (s32) -ENOENT) {
+ if (obj_request->result == -ENOENT) {
zero_bio_chain(obj_request->bio_list, 0);
obj_request->result = 0;
- } else if (xferred < obj_request->length && !obj_request->result) {
- zero_bio_chain(obj_request->bio_list, xferred);
- xferred = obj_request->length;
+ obj_request->xferred = obj_request->length;
+ } else if (obj_request->xferred < obj_request->length &&
+ !obj_request->result) {
+ zero_bio_chain(obj_request->bio_list, obj_request->xferred);
+ obj_request->xferred = obj_request->length;
}
- obj_request->xferred = xferred;
- atomic_set(&obj_request->done, 1);
+ obj_request_done_set(obj_request);
+}
+
+static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
+{
+ dout("%s: obj %p result %d %llu\n", __func__, obj_request,
+ obj_request->result, obj_request->length);
+ /*
+ * There is no such thing as a successful short write.
+ * Our xferred value is the number of bytes transferred
+ * back. Set it to our originally-requested length.
+ */
+ obj_request->xferred = obj_request->length;
+ obj_request_done_set(obj_request);
}
-static void rbd_osd_write_callback(struct rbd_obj_request *obj_request,
- struct ceph_osd_op *op)
+/*
+ * For a simple stat call there's nothing to do. We'll do more if
+ * this is part of a write sequence for a layered image.
+ */
+static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
{
- obj_request->xferred = le64_to_cpu(op->extent.length);
- atomic_set(&obj_request->done, 1);
+ dout("%s: obj %p\n", __func__, obj_request);
+ obj_request_done_set(obj_request);
}
static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
struct ceph_msg *msg)
{
struct rbd_obj_request *obj_request = osd_req->r_priv;
- struct ceph_osd_reply_head *reply_head;
- struct ceph_osd_op *op;
- u32 num_ops;
u16 opcode;
+ dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
rbd_assert(osd_req == obj_request->osd_req);
rbd_assert(!!obj_request->img_request ^
(obj_request->which == BAD_WHICH));
- obj_request->xferred = le32_to_cpu(msg->hdr.data_len);
- reply_head = msg->front.iov_base;
- obj_request->result = (s32) le32_to_cpu(reply_head->result);
+ if (osd_req->r_result < 0)
+ obj_request->result = osd_req->r_result;
obj_request->version = le64_to_cpu(osd_req->r_reassert_version.version);
- num_ops = le32_to_cpu(reply_head->num_ops);
- WARN_ON(num_ops != 1); /* For now */
+ WARN_ON(osd_req->r_num_ops != 1); /* For now */
- op = &reply_head->ops[0];
- opcode = le16_to_cpu(op->op);
+ /*
+ * We support a 64-bit length, but ultimately it has to be
+ * passed to blk_end_request(), which takes an unsigned int.
+ */
+ obj_request->xferred = osd_req->r_reply_op_len[0];
+ rbd_assert(obj_request->xferred < (u64) UINT_MAX);
+ opcode = osd_req->r_request_ops[0].op;
switch (opcode) {
case CEPH_OSD_OP_READ:
- rbd_osd_read_callback(obj_request, op);
+ rbd_osd_read_callback(obj_request);
break;
case CEPH_OSD_OP_WRITE:
- rbd_osd_write_callback(obj_request, op);
+ rbd_osd_write_callback(obj_request);
+ break;
+ case CEPH_OSD_OP_STAT:
+ rbd_osd_stat_callback(obj_request);
+ break;
+ case CEPH_OSD_OP_CALL:
+ case CEPH_OSD_OP_NOTIFY_ACK:
+ case CEPH_OSD_OP_WATCH:
+ rbd_osd_trivial_callback(obj_request);
break;
default:
rbd_warn(NULL, "%s: unsupported op %hu\n",
break;
}
- if (atomic_read(&obj_request->done))
+ if (obj_request_done_test(obj_request))
rbd_obj_request_complete(obj_request);
}
rbd_assert(obj_request_type_valid(obj_request->type));
switch (obj_request->type) {
+ case OBJ_REQUEST_NODATA:
+ break; /* Nothing to do */
case OBJ_REQUEST_BIO:
rbd_assert(obj_request->bio_list != NULL);
osd_req->r_bio = obj_request->bio_list;
- bio_get(osd_req->r_bio);
- /* osd client requires "num pages" even for bio */
- osd_req->r_num_pages = calc_pages_for(offset, length);
break;
case OBJ_REQUEST_PAGES:
osd_req->r_pages = obj_request->pages;
obj_request->which = BAD_WHICH;
obj_request->type = type;
INIT_LIST_HEAD(&obj_request->links);
- atomic_set(&obj_request->done, 0);
+ obj_request_done_init(obj_request);
init_completion(&obj_request->completion);
kref_init(&obj_request->kref);
+ dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
+ offset, length, (int)type, obj_request);
+
return obj_request;
}
obj_request = container_of(kref, struct rbd_obj_request, kref);
+ dout("%s: obj %p\n", __func__, obj_request);
+
rbd_assert(obj_request->img_request == NULL);
rbd_assert(obj_request->which == BAD_WHICH);
rbd_assert(obj_request_type_valid(obj_request->type));
switch (obj_request->type) {
+ case OBJ_REQUEST_NODATA:
+ break; /* Nothing to do */
case OBJ_REQUEST_BIO:
if (obj_request->bio_list)
bio_chain_put(obj_request->bio_list);
* that comprises the image request, and the Linux request pointer
* (if there is one).
*/
-struct rbd_img_request *rbd_img_request_create(struct rbd_device *rbd_dev,
+static struct rbd_img_request *rbd_img_request_create(
+ struct rbd_device *rbd_dev,
u64 offset, u64 length,
bool write_request)
{
rbd_img_request_get(img_request); /* Avoid a warning */
rbd_img_request_put(img_request); /* TEMPORARY */
+ dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
+ write_request ? "write" : "read", offset, length,
+ img_request);
+
return img_request;
}
img_request = container_of(kref, struct rbd_img_request, kref);
+ dout("%s: img %p\n", __func__, img_request);
+
for_each_obj_request_safe(img_request, obj_request, next_obj_request)
rbd_img_obj_request_del(img_request, obj_request);
+ rbd_assert(img_request->obj_request_count == 0);
if (img_request->write_request)
ceph_put_snap_context(img_request->snapc);
u64 resid;
u16 opcode;
+ dout("%s: img %p bio %p\n", __func__, img_request, bio_list);
+
opcode = img_request->write_request ? CEPH_OSD_OP_WRITE
: CEPH_OSD_OP_READ;
bio_offset = 0;
image_offset = img_request->offset;
rbd_assert(image_offset == bio_list->bi_sector << SECTOR_SHIFT);
resid = img_request->length;
+ rbd_assert(resid > 0);
while (resid) {
const char *object_name;
unsigned int clone_size;
bool more = true;
img_request = obj_request->img_request;
+
+ dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
rbd_assert(img_request != NULL);
rbd_assert(img_request->rq != NULL);
+ rbd_assert(img_request->obj_request_count > 0);
rbd_assert(which != BAD_WHICH);
rbd_assert(which < img_request->obj_request_count);
rbd_assert(which >= img_request->next_completion);
rbd_assert(more);
rbd_assert(which < img_request->obj_request_count);
- if (!atomic_read(&obj_request->done))
+ if (!obj_request_done_test(obj_request))
break;
rbd_assert(obj_request->xferred <= (u64) UINT_MAX);
more = blk_end_request(img_request->rq, result, xferred);
which++;
}
+
rbd_assert(more ^ (which == img_request->obj_request_count));
img_request->next_completion = which;
out:
struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
struct rbd_obj_request *obj_request;
+ dout("%s: img %p\n", __func__, img_request);
for_each_obj_request(img_request, obj_request) {
int ret;
return 0;
}
+static int rbd_obj_notify_ack(struct rbd_device *rbd_dev,
+ u64 ver, u64 notify_id)
+{
+ struct rbd_obj_request *obj_request;
+ struct ceph_osd_req_op *op;
+ struct ceph_osd_client *osdc;
+ int ret;
+
+ obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
+ OBJ_REQUEST_NODATA);
+ if (!obj_request)
+ return -ENOMEM;
+
+ ret = -ENOMEM;
+ op = rbd_osd_req_op_create(CEPH_OSD_OP_NOTIFY_ACK, notify_id, ver);
+ if (!op)
+ goto out;
+ obj_request->osd_req = rbd_osd_req_create(rbd_dev, false,
+ obj_request, op);
+ rbd_osd_req_op_destroy(op);
+ if (!obj_request->osd_req)
+ goto out;
+
+ osdc = &rbd_dev->rbd_client->client->osdc;
+ obj_request->callback = rbd_obj_request_put;
+ ret = rbd_obj_request_submit(osdc, obj_request);
+out:
+ if (ret)
+ rbd_obj_request_put(obj_request);
+
+ return ret;
+}
+
+static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
+{
+ struct rbd_device *rbd_dev = (struct rbd_device *)data;
+ u64 hver;
+ int rc;
+
+ if (!rbd_dev)
+ return;
+
+ dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
+ rbd_dev->header_name, (unsigned long long) notify_id,
+ (unsigned int) opcode);
+ rc = rbd_dev_refresh(rbd_dev, &hver);
+ if (rc)
+ rbd_warn(rbd_dev, "got notification but failed to "
+ " update snaps: %d\n", rc);
+
+ rbd_obj_notify_ack(rbd_dev, hver, notify_id);
+}
+
+/*
+ * Request sync osd watch/unwatch. The value of "start" determines
+ * whether a watch request is being initiated or torn down.
+ */
+static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
+{
+ struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
+ struct rbd_obj_request *obj_request;
+ struct ceph_osd_req_op *op;
+ int ret;
+
+ rbd_assert(start ^ !!rbd_dev->watch_event);
+ rbd_assert(start ^ !!rbd_dev->watch_request);
+
+ if (start) {
+ ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
+ &rbd_dev->watch_event);
+ if (ret < 0)
+ return ret;
+ rbd_assert(rbd_dev->watch_event != NULL);
+ }
+
+ ret = -ENOMEM;
+ obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
+ OBJ_REQUEST_NODATA);
+ if (!obj_request)
+ goto out_cancel;
+
+ op = rbd_osd_req_op_create(CEPH_OSD_OP_WATCH,
+ rbd_dev->watch_event->cookie,
+ rbd_dev->header.obj_version, start);
+ if (!op)
+ goto out_cancel;
+ obj_request->osd_req = rbd_osd_req_create(rbd_dev, true,
+ obj_request, op);
+ rbd_osd_req_op_destroy(op);
+ if (!obj_request->osd_req)
+ goto out_cancel;
+
+ if (start)
+ ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
+ else
+ ceph_osdc_unregister_linger_request(osdc,
+ rbd_dev->watch_request->osd_req);
+ ret = rbd_obj_request_submit(osdc, obj_request);
+ if (ret)
+ goto out_cancel;
+ ret = rbd_obj_request_wait(obj_request);
+ if (ret)
+ goto out_cancel;
+ ret = obj_request->result;
+ if (ret)
+ goto out_cancel;
+
+ /*
+ * A watch request is set to linger, so the underlying osd
+ * request won't go away until we unregister it. We retain
+ * a pointer to the object request during that time (in
+ * rbd_dev->watch_request), so we'll keep a reference to
+ * it. We'll drop that reference (below) after we've
+ * unregistered it.
+ */
+ if (start) {
+ rbd_dev->watch_request = obj_request;
+
+ return 0;
+ }
+
+ /* We have successfully torn down the watch request */
+
+ rbd_obj_request_put(rbd_dev->watch_request);
+ rbd_dev->watch_request = NULL;
+out_cancel:
+ /* Cancel the event if we're tearing down, or on error */
+ ceph_osdc_cancel_event(rbd_dev->watch_event);
+ rbd_dev->watch_event = NULL;
+ if (obj_request)
+ rbd_obj_request_put(obj_request);
+
+ return ret;
+}
+
+/*
+ * Synchronous osd object method call
+ */
+static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
+ const char *object_name,
+ const char *class_name,
+ const char *method_name,
+ const char *outbound,
+ size_t outbound_size,
+ char *inbound,
+ size_t inbound_size,
+ u64 *version)
+{
+ struct rbd_obj_request *obj_request;
+ struct ceph_osd_client *osdc;
+ struct ceph_osd_req_op *op;
+ struct page **pages;
+ u32 page_count;
+ int ret;
+
+ /*
+ * Method calls are ultimately read operations but they
+ * don't involve object data (so no offset or length).
+ * The result should placed into the inbound buffer
+ * provided. They also supply outbound data--parameters for
+ * the object method. Currently if this is present it will
+ * be a snapshot id.
+ */
+ page_count = (u32) calc_pages_for(0, inbound_size);
+ pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
+ if (IS_ERR(pages))
+ return PTR_ERR(pages);
+
+ ret = -ENOMEM;
+ obj_request = rbd_obj_request_create(object_name, 0, 0,
+ OBJ_REQUEST_PAGES);
+ if (!obj_request)
+ goto out;
+
+ obj_request->pages = pages;
+ obj_request->page_count = page_count;
+
+ op = rbd_osd_req_op_create(CEPH_OSD_OP_CALL, class_name,
+ method_name, outbound, outbound_size);
+ if (!op)
+ goto out;
+ obj_request->osd_req = rbd_osd_req_create(rbd_dev, false,
+ obj_request, op);
+ rbd_osd_req_op_destroy(op);
+ if (!obj_request->osd_req)
+ goto out;
+
+ osdc = &rbd_dev->rbd_client->client->osdc;
+ ret = rbd_obj_request_submit(osdc, obj_request);
+ if (ret)
+ goto out;
+ ret = rbd_obj_request_wait(obj_request);
+ if (ret)
+ goto out;
+
+ ret = obj_request->result;
+ if (ret < 0)
+ goto out;
+ ret = 0;
+ ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
+ if (version)
+ *version = obj_request->version;
+out:
+ if (obj_request)
+ rbd_obj_request_put(obj_request);
+ else
+ ceph_release_page_vector(pages, page_count);
+
+ return ret;
+}
+
static void rbd_request_fn(struct request_queue *q)
+ __releases(q->queue_lock) __acquires(q->queue_lock)
{
struct rbd_device *rbd_dev = q->queuedata;
bool read_only = rbd_dev->mapping.read_only;
/* Ignore any non-FS requests that filter through. */
if (rq->cmd_type != REQ_TYPE_FS) {
+ dout("%s: non-fs request type %d\n", __func__,
+ (int) rq->cmd_type);
+ __blk_end_request_all(rq, 0);
+ continue;
+ }
+
+ /* Ignore/skip any zero-length requests */
+
+ offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
+ length = (u64) blk_rq_bytes(rq);
+
+ if (!length) {
+ dout("%s: zero-length request\n", __func__);
__blk_end_request_all(rq, 0);
continue;
}
rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
}
- /* Quit early if the snapshot has disappeared */
-
- if (!atomic_read(&rbd_dev->exists)) {
+ /*
+ * Quit early if the mapped snapshot no longer
+ * exists. It's still possible the snapshot will
+ * have disappeared by the time our request arrives
+ * at the osd, but there's no sense in sending it if
+ * we already know.
+ */
+ if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
dout("request for non-existent snapshot");
rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
result = -ENXIO;
goto end_request;
}
- offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
- length = (u64) blk_rq_bytes(rq);
-
result = -EINVAL;
if (WARN_ON(offset && length > U64_MAX - offset + 1))
goto end_request; /* Shouldn't happen */
struct ceph_osd_client *osdc;
struct page **pages = NULL;
u32 page_count;
+ size_t size;
int ret;
page_count = (u32) calc_pages_for(offset, length);
ret = -ENOMEM;
obj_request = rbd_obj_request_create(object_name, offset, length,
- OBJ_REQUEST_PAGES);
+ OBJ_REQUEST_PAGES);
if (!obj_request)
goto out;
ret = obj_request->result;
if (ret < 0)
goto out;
- ret = ceph_copy_from_page_vector(pages, buf, 0, obj_request->xferred);
+
+ rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
+ size = (size_t) obj_request->xferred;
+ ceph_copy_from_page_vector(pages, buf, 0, size);
+ rbd_assert(size <= (size_t) INT_MAX);
+ ret = (int) size;
if (version)
*version = obj_request->version;
out:
ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
0, size,
(char *) ondisk, version);
-
if (ret < 0)
goto out_err;
if (WARN_ON((size_t) ret < size)) {
kfree(spec);
}
-struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
+static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
struct rbd_spec *spec)
{
struct rbd_device *rbd_dev;
return NULL;
spin_lock_init(&rbd_dev->lock);
- atomic_set(&rbd_dev->exists, 0);
+ rbd_dev->flags = 0;
INIT_LIST_HEAD(&rbd_dev->node);
INIT_LIST_HEAD(&rbd_dev->snaps);
init_rwsem(&rbd_dev->header_rwsem);
__le64 size;
} __attribute__ ((packed)) size_buf = { 0 };
- ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
+ ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
"rbd", "get_size",
(char *) &snapid, sizeof (snapid),
(char *) &size_buf, sizeof (size_buf), NULL);
- dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
+ dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
if (ret < 0)
return ret;
if (!reply_buf)
return -ENOMEM;
- ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
+ ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
"rbd", "get_object_prefix",
NULL, 0,
reply_buf, RBD_OBJ_PREFIX_LEN_MAX, NULL);
- dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
+ dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
if (ret < 0)
goto out;
- ret = 0; /* rbd_req_sync_exec() can return positive */
p = reply_buf;
rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
u64 incompat;
int ret;
- ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
+ ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
"rbd", "get_features",
(char *) &snapid, sizeof (snapid),
(char *) &features_buf, sizeof (features_buf),
NULL);
- dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
+ dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
if (ret < 0)
return ret;
}
snapid = cpu_to_le64(CEPH_NOSNAP);
- ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
+ ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
"rbd", "get_parent",
(char *) &snapid, sizeof (snapid),
(char *) reply_buf, size, NULL);
- dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
+ dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
if (ret < 0)
goto out_err;
if (!reply_buf)
goto out;
- ret = rbd_req_sync_exec(rbd_dev, RBD_DIRECTORY,
+ ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
"rbd", "dir_get_name",
image_id, image_id_size,
(char *) reply_buf, size, NULL);
if (!reply_buf)
return -ENOMEM;
- ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
+ ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
"rbd", "get_snapcontext",
NULL, 0,
reply_buf, size, ver);
- dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
+ dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
if (ret < 0)
goto out;
return ERR_PTR(-ENOMEM);
snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
- ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
+ ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
"rbd", "get_snapshot_name",
(char *) &snap_id, sizeof (snap_id),
reply_buf, size, NULL);
- dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
+ dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
if (ret < 0)
goto out;
if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
struct list_head *next = links->next;
- /* Existing snapshot not in the new snap context */
-
+ /*
+ * A previously-existing snapshot is not in
+ * the new snap context.
+ *
+ * If the now missing snapshot is the one the
+ * image is mapped to, clear its exists flag
+ * so we can avoid sending any more requests
+ * to it.
+ */
if (rbd_dev->spec->snap_id == snap->id)
- atomic_set(&rbd_dev->exists, 0);
+ clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
rbd_remove_snap_dev(snap);
dout("%ssnap id %llu has been removed\n",
rbd_dev->spec->snap_id == snap->id ?
struct rbd_snap *snap;
int ret = 0;
- dout("%s called\n", __func__);
+ dout("%s:\n", __func__);
if (WARN_ON(!device_is_registered(&rbd_dev->dev)))
return -EIO;
goto out;
}
- ret = rbd_req_sync_exec(rbd_dev, object_name,
+ ret = rbd_obj_method_sync(rbd_dev, object_name,
"rbd", "get_id",
NULL, 0,
response, RBD_IMAGE_ID_LEN_MAX, NULL);
- dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
+ dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
if (ret < 0)
goto out;
- ret = 0; /* rbd_req_sync_exec() can return positive */
p = response;
rbd_dev->spec->image_id = ceph_extract_encoded_string(&p,
if (ret)
goto err_out_bus;
- ret = rbd_req_sync_watch(rbd_dev, 1);
+ ret = rbd_dev_header_watch_sync(rbd_dev, 1);
if (ret)
goto err_out_bus;
{
struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
- if (rbd_dev->watch_request) {
- struct ceph_client *client = rbd_dev->rbd_client->client;
-
- ceph_osdc_unregister_linger_request(&client->osdc,
- rbd_dev->watch_request);
- }
if (rbd_dev->watch_event)
- rbd_req_sync_watch(rbd_dev, 0);
+ rbd_dev_header_watch_sync(rbd_dev, 0);
/* clean up and free blkdev */
rbd_free_disk(rbd_dev);
goto done;
}
- if (rbd_dev->open_count) {
+ spin_lock_irq(&rbd_dev->lock);
+ if (rbd_dev->open_count)
ret = -EBUSY;
+ else
+ set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
+ spin_unlock_irq(&rbd_dev->lock);
+ if (ret < 0)
goto done;
- }
rbd_remove_all_snaps(rbd_dev);
rbd_bus_del_dev(rbd_dev);
device_unregister(&rbd_root_dev);
}
-int __init rbd_init(void)
+static int __init rbd_init(void)
{
int rc;
+ if (!libceph_compatible(NULL)) {
+ rbd_warn(NULL, "libceph incompatibility (quitting)");
+
+ return -EINVAL;
+ }
rc = rbd_sysfs_init();
if (rc)
return rc;
return 0;
}
-void __exit rbd_exit(void)
+static void __exit rbd_exit(void)
{
rbd_sysfs_cleanup();
}