2 rbd.c -- Export ceph rados objects as a Linux block device
5 based on drivers/block/osdblk.c:
7 Copyright 2009 Red Hat, Inc.
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
24 For usage instructions, please refer to:
26 Documentation/ABI/testing/sysfs-bus-rbd
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
40 #include <linux/blkdev.h>
42 #include "rbd_types.h"
45 * The basic unit of block I/O is a sector. It is interpreted in a
46 * number of contexts in Linux (blk, bio, genhd), but the default is
47 * universally 512 bytes. These symbols are just slightly more
48 * meaningful than the bare numbers they represent.
50 #define SECTOR_SHIFT 9
51 #define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
53 #define RBD_DRV_NAME "rbd"
54 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
56 #define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
58 #define RBD_MAX_SNAP_NAME_LEN 32
59 #define RBD_MAX_OPT_LEN 1024
61 #define RBD_SNAP_HEAD_NAME "-"
64 * An RBD device name will be "rbd#", where the "rbd" comes from
65 * RBD_DRV_NAME above, and # is a unique integer identifier.
66 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
67 * enough to hold all possible device names.
69 #define DEV_NAME_LEN 32
70 #define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
72 #define RBD_NOTIFY_TIMEOUT_DEFAULT 10
75 * block device image metadata (in-memory version)
77 struct rbd_image_header {
83 struct ceph_snap_context *snapc;
84 size_t snap_names_len;
99 * an instance of the client. multiple devices may share an rbd client.
102 struct ceph_client *client;
103 struct rbd_options *rbd_opts;
105 struct list_head node;
109 * a request completion status
111 struct rbd_req_status {
118 * a collection of requests
120 struct rbd_req_coll {
124 struct rbd_req_status status[0];
128 * a single io request
131 struct request *rq; /* blk layer request */
132 struct bio *bio; /* cloned bio */
133 struct page **pages; /* list of used pages */
136 struct rbd_req_coll *coll;
143 struct list_head node;
151 int id; /* blkdev unique id */
153 int major; /* blkdev assigned major */
154 struct gendisk *disk; /* blkdev's gendisk and rq */
155 struct request_queue *q;
157 struct rbd_client *rbd_client;
159 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
161 spinlock_t lock; /* queue lock */
163 struct rbd_image_header header;
164 char *obj; /* rbd image name */
166 char *obj_md_name; /* hdr nm. */
170 struct ceph_osd_event *watch_event;
171 struct ceph_osd_request *watch_request;
173 /* protects updating the header */
174 struct rw_semaphore header_rwsem;
176 u64 snap_id; /* current snapshot id */
179 struct list_head node;
181 /* list of snapshots */
182 struct list_head snaps;
188 static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
190 static LIST_HEAD(rbd_dev_list); /* devices */
191 static DEFINE_SPINLOCK(rbd_dev_list_lock);
193 static LIST_HEAD(rbd_client_list); /* clients */
194 static DEFINE_SPINLOCK(rbd_client_list_lock);
196 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
197 static void rbd_dev_release(struct device *dev);
198 static ssize_t rbd_snap_add(struct device *dev,
199 struct device_attribute *attr,
202 static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
203 struct rbd_snap *snap);
205 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
207 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
210 static struct bus_attribute rbd_bus_attrs[] = {
211 __ATTR(add, S_IWUSR, NULL, rbd_add),
212 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
216 static struct bus_type rbd_bus_type = {
218 .bus_attrs = rbd_bus_attrs,
221 static void rbd_root_dev_release(struct device *dev)
225 static struct device rbd_root_dev = {
227 .release = rbd_root_dev_release,
231 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
233 return get_device(&rbd_dev->dev);
236 static void rbd_put_dev(struct rbd_device *rbd_dev)
238 put_device(&rbd_dev->dev);
241 static int __rbd_refresh_header(struct rbd_device *rbd_dev);
243 static int rbd_open(struct block_device *bdev, fmode_t mode)
245 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
247 rbd_get_dev(rbd_dev);
249 set_device_ro(bdev, rbd_dev->read_only);
251 if ((mode & FMODE_WRITE) && rbd_dev->read_only)
257 static int rbd_release(struct gendisk *disk, fmode_t mode)
259 struct rbd_device *rbd_dev = disk->private_data;
261 rbd_put_dev(rbd_dev);
266 static const struct block_device_operations rbd_bd_ops = {
267 .owner = THIS_MODULE,
269 .release = rbd_release,
273 * Initialize an rbd client instance.
276 static struct rbd_client *rbd_client_create(struct ceph_options *opt,
277 struct rbd_options *rbd_opts)
279 struct rbd_client *rbdc;
282 dout("rbd_client_create\n");
283 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
287 kref_init(&rbdc->kref);
288 INIT_LIST_HEAD(&rbdc->node);
290 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
292 rbdc->client = ceph_create_client(opt, rbdc, 0, 0);
293 if (IS_ERR(rbdc->client))
295 opt = NULL; /* Now rbdc->client is responsible for opt */
297 ret = ceph_open_session(rbdc->client);
301 rbdc->rbd_opts = rbd_opts;
303 spin_lock(&rbd_client_list_lock);
304 list_add_tail(&rbdc->node, &rbd_client_list);
305 spin_unlock(&rbd_client_list_lock);
307 mutex_unlock(&ctl_mutex);
309 dout("rbd_client_create created %p\n", rbdc);
313 ceph_destroy_client(rbdc->client);
315 mutex_unlock(&ctl_mutex);
319 ceph_destroy_options(opt);
324 * Find a ceph client with specific addr and configuration.
326 static struct rbd_client *__rbd_client_find(struct ceph_options *opt)
328 struct rbd_client *client_node;
330 if (opt->flags & CEPH_OPT_NOSHARE)
333 list_for_each_entry(client_node, &rbd_client_list, node)
334 if (ceph_compare_options(opt, client_node->client) == 0)
347 /* string args above */
350 static match_table_t rbdopt_tokens = {
351 {Opt_notify_timeout, "notify_timeout=%d"},
353 /* string args above */
357 static int parse_rbd_opts_token(char *c, void *private)
359 struct rbd_options *rbdopt = private;
360 substring_t argstr[MAX_OPT_ARGS];
361 int token, intval, ret;
363 token = match_token(c, rbdopt_tokens, argstr);
367 if (token < Opt_last_int) {
368 ret = match_int(&argstr[0], &intval);
370 pr_err("bad mount option arg (not int) "
374 dout("got int token %d val %d\n", token, intval);
375 } else if (token > Opt_last_int && token < Opt_last_string) {
376 dout("got string token %d val %s\n", token,
379 dout("got token %d\n", token);
383 case Opt_notify_timeout:
384 rbdopt->notify_timeout = intval;
393 * Get a ceph client with specific addr and configuration, if one does
394 * not exist create it.
396 static struct rbd_client *rbd_get_client(const char *mon_addr,
400 struct rbd_client *rbdc;
401 struct ceph_options *opt;
402 struct rbd_options *rbd_opts;
404 rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
406 return ERR_PTR(-ENOMEM);
408 rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
410 opt = ceph_parse_options(options, mon_addr,
411 mon_addr + mon_addr_len,
412 parse_rbd_opts_token, rbd_opts);
415 return ERR_CAST(opt);
418 spin_lock(&rbd_client_list_lock);
419 rbdc = __rbd_client_find(opt);
421 /* using an existing client */
422 kref_get(&rbdc->kref);
423 spin_unlock(&rbd_client_list_lock);
425 ceph_destroy_options(opt);
430 spin_unlock(&rbd_client_list_lock);
432 rbdc = rbd_client_create(opt, rbd_opts);
441 * Destroy ceph client
443 * Caller must hold rbd_client_list_lock.
445 static void rbd_client_release(struct kref *kref)
447 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
449 dout("rbd_release_client %p\n", rbdc);
450 spin_lock(&rbd_client_list_lock);
451 list_del(&rbdc->node);
452 spin_unlock(&rbd_client_list_lock);
454 ceph_destroy_client(rbdc->client);
455 kfree(rbdc->rbd_opts);
460 * Drop reference to ceph client node. If it's not referenced anymore, release
463 static void rbd_put_client(struct rbd_device *rbd_dev)
465 kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
466 rbd_dev->rbd_client = NULL;
470 * Destroy requests collection
472 static void rbd_coll_release(struct kref *kref)
474 struct rbd_req_coll *coll =
475 container_of(kref, struct rbd_req_coll, kref);
477 dout("rbd_coll_release %p\n", coll);
482 * Create a new header structure, translate header format from the on-disk
485 static int rbd_header_from_disk(struct rbd_image_header *header,
486 struct rbd_image_header_ondisk *ondisk,
492 if (memcmp(ondisk, RBD_HEADER_TEXT, sizeof(RBD_HEADER_TEXT)))
495 snap_count = le32_to_cpu(ondisk->snap_count);
496 if (snap_count > (UINT_MAX - sizeof(struct ceph_snap_context))
499 header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
500 snap_count * sizeof(u64),
505 header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
507 header->snap_names = kmalloc(header->snap_names_len,
509 if (!header->snap_names)
511 header->snap_sizes = kmalloc(snap_count * sizeof(u64),
513 if (!header->snap_sizes)
516 header->snap_names = NULL;
517 header->snap_sizes = NULL;
520 header->object_prefix = kmalloc(sizeof (ondisk->block_name) + 1,
522 if (!header->object_prefix)
525 memcpy(header->object_prefix, ondisk->block_name,
526 sizeof(ondisk->block_name));
527 header->object_prefix[sizeof (ondisk->block_name)] = '\0';
529 header->image_size = le64_to_cpu(ondisk->image_size);
530 header->obj_order = ondisk->options.order;
531 header->crypt_type = ondisk->options.crypt_type;
532 header->comp_type = ondisk->options.comp_type;
534 atomic_set(&header->snapc->nref, 1);
535 header->snap_seq = le64_to_cpu(ondisk->snap_seq);
536 header->snapc->num_snaps = snap_count;
537 header->total_snaps = snap_count;
539 if (snap_count && allocated_snaps == snap_count) {
540 for (i = 0; i < snap_count; i++) {
541 header->snapc->snaps[i] =
542 le64_to_cpu(ondisk->snaps[i].id);
543 header->snap_sizes[i] =
544 le64_to_cpu(ondisk->snaps[i].image_size);
547 /* copy snapshot names */
548 memcpy(header->snap_names, &ondisk->snaps[i],
549 header->snap_names_len);
555 kfree(header->snap_sizes);
557 kfree(header->snap_names);
559 kfree(header->snapc);
563 static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
567 char *p = header->snap_names;
569 for (i = 0; i < header->total_snaps; i++) {
570 if (!strcmp(snap_name, p)) {
572 /* Found it. Pass back its id and/or size */
575 *seq = header->snapc->snaps[i];
577 *size = header->snap_sizes[i];
580 p += strlen(p) + 1; /* Skip ahead to the next name */
585 static int rbd_header_set_snap(struct rbd_device *dev, u64 *size)
587 struct rbd_image_header *header = &dev->header;
588 struct ceph_snap_context *snapc = header->snapc;
591 down_write(&dev->header_rwsem);
593 if (!memcmp(dev->snap_name, RBD_SNAP_HEAD_NAME,
594 sizeof (RBD_SNAP_HEAD_NAME))) {
595 if (header->total_snaps)
596 snapc->seq = header->snap_seq;
599 dev->snap_id = CEPH_NOSNAP;
602 *size = header->image_size;
604 ret = snap_by_name(header, dev->snap_name, &snapc->seq, size);
607 dev->snap_id = snapc->seq;
613 up_write(&dev->header_rwsem);
617 static void rbd_header_free(struct rbd_image_header *header)
619 kfree(header->object_prefix);
620 kfree(header->snap_sizes);
621 kfree(header->snap_names);
622 kfree(header->snapc);
626 * get the actual striped segment name, offset and length
628 static u64 rbd_get_segment(struct rbd_image_header *header,
629 const char *object_prefix,
631 char *seg_name, u64 *segofs)
633 u64 seg = ofs >> header->obj_order;
636 snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
637 "%s.%012llx", object_prefix, seg);
639 ofs = ofs & ((1 << header->obj_order) - 1);
640 len = min_t(u64, len, (1 << header->obj_order) - ofs);
648 static int rbd_get_num_segments(struct rbd_image_header *header,
651 u64 start_seg = ofs >> header->obj_order;
652 u64 end_seg = (ofs + len - 1) >> header->obj_order;
653 return end_seg - start_seg + 1;
657 * returns the size of an object in the image
659 static u64 rbd_obj_bytes(struct rbd_image_header *header)
661 return 1 << header->obj_order;
668 static void bio_chain_put(struct bio *chain)
674 chain = chain->bi_next;
680 * zeros a bio chain, starting at specific offset
682 static void zero_bio_chain(struct bio *chain, int start_ofs)
691 bio_for_each_segment(bv, chain, i) {
692 if (pos + bv->bv_len > start_ofs) {
693 int remainder = max(start_ofs - pos, 0);
694 buf = bvec_kmap_irq(bv, &flags);
695 memset(buf + remainder, 0,
696 bv->bv_len - remainder);
697 bvec_kunmap_irq(buf, &flags);
702 chain = chain->bi_next;
707 * bio_chain_clone - clone a chain of bios up to a certain length.
708 * might return a bio_pair that will need to be released.
710 static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
711 struct bio_pair **bp,
712 int len, gfp_t gfpmask)
714 struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
718 bio_pair_release(*bp);
722 while (old_chain && (total < len)) {
723 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
727 if (total + old_chain->bi_size > len) {
731 * this split can only happen with a single paged bio,
732 * split_bio will BUG_ON if this is not the case
734 dout("bio_chain_clone split! total=%d remaining=%d"
736 (int)total, (int)len-total,
737 (int)old_chain->bi_size);
739 /* split the bio. We'll release it either in the next
740 call, or it will have to be released outside */
741 bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
745 __bio_clone(tmp, &bp->bio1);
749 __bio_clone(tmp, old_chain);
750 *next = old_chain->bi_next;
754 gfpmask &= ~__GFP_WAIT;
758 new_chain = tail = tmp;
763 old_chain = old_chain->bi_next;
765 total += tmp->bi_size;
771 tail->bi_next = NULL;
778 dout("bio_chain_clone with err\n");
779 bio_chain_put(new_chain);
784 * helpers for osd request op vectors.
786 static int rbd_create_rw_ops(struct ceph_osd_req_op **ops,
791 *ops = kzalloc(sizeof(struct ceph_osd_req_op) * (num_ops + 1),
795 (*ops)[0].op = opcode;
797 * op extent offset and length will be set later on
798 * in calc_raw_layout()
800 (*ops)[0].payload_len = payload_len;
804 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
809 static void rbd_coll_end_req_index(struct request *rq,
810 struct rbd_req_coll *coll,
814 struct request_queue *q;
817 dout("rbd_coll_end_req_index %p index %d ret %d len %lld\n",
818 coll, index, ret, len);
824 blk_end_request(rq, ret, len);
830 spin_lock_irq(q->queue_lock);
831 coll->status[index].done = 1;
832 coll->status[index].rc = ret;
833 coll->status[index].bytes = len;
834 max = min = coll->num_done;
835 while (max < coll->total && coll->status[max].done)
838 for (i = min; i<max; i++) {
839 __blk_end_request(rq, coll->status[i].rc,
840 coll->status[i].bytes);
842 kref_put(&coll->kref, rbd_coll_release);
844 spin_unlock_irq(q->queue_lock);
847 static void rbd_coll_end_req(struct rbd_request *req,
850 rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
854 * Send ceph osd request
856 static int rbd_do_request(struct request *rq,
857 struct rbd_device *dev,
858 struct ceph_snap_context *snapc,
860 const char *obj, u64 ofs, u64 len,
865 struct ceph_osd_req_op *ops,
867 struct rbd_req_coll *coll,
869 void (*rbd_cb)(struct ceph_osd_request *req,
870 struct ceph_msg *msg),
871 struct ceph_osd_request **linger_req,
874 struct ceph_osd_request *req;
875 struct ceph_file_layout *layout;
878 struct timespec mtime = CURRENT_TIME;
879 struct rbd_request *req_data;
880 struct ceph_osd_request_head *reqhead;
881 struct ceph_osd_client *osdc;
883 req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
886 rbd_coll_end_req_index(rq, coll, coll_index,
892 req_data->coll = coll;
893 req_data->coll_index = coll_index;
896 dout("rbd_do_request obj=%s ofs=%lld len=%lld\n", obj, len, ofs);
898 down_read(&dev->header_rwsem);
900 osdc = &dev->rbd_client->client->osdc;
901 req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
902 false, GFP_NOIO, pages, bio);
904 up_read(&dev->header_rwsem);
909 req->r_callback = rbd_cb;
913 req_data->pages = pages;
916 req->r_priv = req_data;
918 reqhead = req->r_request->front.iov_base;
919 reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
921 strncpy(req->r_oid, obj, sizeof(req->r_oid));
922 req->r_oid_len = strlen(req->r_oid);
924 layout = &req->r_file_layout;
925 memset(layout, 0, sizeof(*layout));
926 layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
927 layout->fl_stripe_count = cpu_to_le32(1);
928 layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
929 layout->fl_pg_pool = cpu_to_le32(dev->pool_id);
930 ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
933 ceph_osdc_build_request(req, ofs, &len,
937 req->r_oid, req->r_oid_len);
938 up_read(&dev->header_rwsem);
941 ceph_osdc_set_request_linger(osdc, req);
945 ret = ceph_osdc_start_request(osdc, req, false);
950 ret = ceph_osdc_wait_request(osdc, req);
952 *ver = le64_to_cpu(req->r_reassert_version.version);
953 dout("reassert_ver=%lld\n",
954 le64_to_cpu(req->r_reassert_version.version));
955 ceph_osdc_put_request(req);
960 bio_chain_put(req_data->bio);
961 ceph_osdc_put_request(req);
963 rbd_coll_end_req(req_data, ret, len);
969 * Ceph osd op callback
971 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
973 struct rbd_request *req_data = req->r_priv;
974 struct ceph_osd_reply_head *replyhead;
975 struct ceph_osd_op *op;
981 replyhead = msg->front.iov_base;
982 WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
983 op = (void *)(replyhead + 1);
984 rc = le32_to_cpu(replyhead->result);
985 bytes = le64_to_cpu(op->extent.length);
986 read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
988 dout("rbd_req_cb bytes=%lld readop=%d rc=%d\n", bytes, read_op, rc);
990 if (rc == -ENOENT && read_op) {
991 zero_bio_chain(req_data->bio, 0);
993 } else if (rc == 0 && read_op && bytes < req_data->len) {
994 zero_bio_chain(req_data->bio, bytes);
995 bytes = req_data->len;
998 rbd_coll_end_req(req_data, rc, bytes);
1001 bio_chain_put(req_data->bio);
1003 ceph_osdc_put_request(req);
1007 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1009 ceph_osdc_put_request(req);
1013 * Do a synchronous ceph osd operation
1015 static int rbd_req_sync_op(struct rbd_device *dev,
1016 struct ceph_snap_context *snapc,
1020 struct ceph_osd_req_op *orig_ops,
1025 struct ceph_osd_request **linger_req,
1029 struct page **pages;
1031 struct ceph_osd_req_op *ops = orig_ops;
1034 num_pages = calc_pages_for(ofs , len);
1035 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1037 return PTR_ERR(pages);
1040 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? len : 0);
1041 ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1045 if ((flags & CEPH_OSD_FLAG_WRITE) && buf) {
1046 ret = ceph_copy_to_page_vector(pages, buf, ofs, len);
1052 ret = rbd_do_request(NULL, dev, snapc, snapid,
1053 obj, ofs, len, NULL,
1064 if ((flags & CEPH_OSD_FLAG_READ) && buf)
1065 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1069 rbd_destroy_ops(ops);
1071 ceph_release_page_vector(pages, num_pages);
1076 * Do an asynchronous ceph osd operation
1078 static int rbd_do_op(struct request *rq,
1079 struct rbd_device *rbd_dev ,
1080 struct ceph_snap_context *snapc,
1082 int opcode, int flags, int num_reply,
1085 struct rbd_req_coll *coll,
1092 struct ceph_osd_req_op *ops;
1095 seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1099 seg_len = rbd_get_segment(&rbd_dev->header,
1100 rbd_dev->header.object_prefix,
1102 seg_name, &seg_ofs);
1104 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1106 ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1110 /* we've taken care of segment sizes earlier when we
1111 cloned the bios. We should never have a segment
1112 truncated at this point */
1113 BUG_ON(seg_len < len);
1115 ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1116 seg_name, seg_ofs, seg_len,
1123 rbd_req_cb, 0, NULL);
1125 rbd_destroy_ops(ops);
1132 * Request async osd write
1134 static int rbd_req_write(struct request *rq,
1135 struct rbd_device *rbd_dev,
1136 struct ceph_snap_context *snapc,
1139 struct rbd_req_coll *coll,
1142 return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1144 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1146 ofs, len, bio, coll, coll_index);
1150 * Request async osd read
1152 static int rbd_req_read(struct request *rq,
1153 struct rbd_device *rbd_dev,
1157 struct rbd_req_coll *coll,
1160 return rbd_do_op(rq, rbd_dev, NULL,
1165 ofs, len, bio, coll, coll_index);
1169 * Request sync osd read
1171 static int rbd_req_sync_read(struct rbd_device *dev,
1172 struct ceph_snap_context *snapc,
1179 return rbd_req_sync_op(dev, NULL,
1184 1, obj, ofs, len, buf, NULL, ver);
1188 * Request sync osd watch
1190 static int rbd_req_sync_notify_ack(struct rbd_device *dev,
1195 struct ceph_osd_req_op *ops;
1198 ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1202 ops[0].watch.ver = cpu_to_le64(dev->header.obj_version);
1203 ops[0].watch.cookie = notify_id;
1204 ops[0].watch.flag = 0;
1206 ret = rbd_do_request(NULL, dev, NULL, CEPH_NOSNAP,
1213 rbd_simple_req_cb, 0, NULL);
1215 rbd_destroy_ops(ops);
1219 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1221 struct rbd_device *dev = (struct rbd_device *)data;
1227 dout("rbd_watch_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
1228 notify_id, (int)opcode);
1229 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1230 rc = __rbd_refresh_header(dev);
1231 mutex_unlock(&ctl_mutex);
1233 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1234 " update snaps: %d\n", dev->major, rc);
1236 rbd_req_sync_notify_ack(dev, ver, notify_id, dev->obj_md_name);
1240 * Request sync osd watch
1242 static int rbd_req_sync_watch(struct rbd_device *dev,
1246 struct ceph_osd_req_op *ops;
1247 struct ceph_osd_client *osdc = &dev->rbd_client->client->osdc;
1249 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1253 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1254 (void *)dev, &dev->watch_event);
1258 ops[0].watch.ver = cpu_to_le64(ver);
1259 ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie);
1260 ops[0].watch.flag = 1;
1262 ret = rbd_req_sync_op(dev, NULL,
1265 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1268 &dev->watch_request, NULL);
1273 rbd_destroy_ops(ops);
1277 ceph_osdc_cancel_event(dev->watch_event);
1278 dev->watch_event = NULL;
1280 rbd_destroy_ops(ops);
1285 * Request sync osd unwatch
1287 static int rbd_req_sync_unwatch(struct rbd_device *dev,
1290 struct ceph_osd_req_op *ops;
1292 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1296 ops[0].watch.ver = 0;
1297 ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie);
1298 ops[0].watch.flag = 0;
1300 ret = rbd_req_sync_op(dev, NULL,
1303 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1305 1, obj, 0, 0, NULL, NULL, NULL);
1307 rbd_destroy_ops(ops);
1308 ceph_osdc_cancel_event(dev->watch_event);
1309 dev->watch_event = NULL;
1313 struct rbd_notify_info {
1314 struct rbd_device *dev;
1317 static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1319 struct rbd_device *dev = (struct rbd_device *)data;
1323 dout("rbd_notify_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
1324 notify_id, (int)opcode);
1328 * Request sync osd notify
1330 static int rbd_req_sync_notify(struct rbd_device *dev,
1333 struct ceph_osd_req_op *ops;
1334 struct ceph_osd_client *osdc = &dev->rbd_client->client->osdc;
1335 struct ceph_osd_event *event;
1336 struct rbd_notify_info info;
1337 int payload_len = sizeof(u32) + sizeof(u32);
1340 ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY, payload_len);
1346 ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1347 (void *)&info, &event);
1351 ops[0].watch.ver = 1;
1352 ops[0].watch.flag = 1;
1353 ops[0].watch.cookie = event->cookie;
1354 ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1355 ops[0].watch.timeout = 12;
1357 ret = rbd_req_sync_op(dev, NULL,
1360 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1362 1, obj, 0, 0, NULL, NULL, NULL);
1366 ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1367 dout("ceph_osdc_wait_event returned %d\n", ret);
1368 rbd_destroy_ops(ops);
1372 ceph_osdc_cancel_event(event);
1374 rbd_destroy_ops(ops);
1379 * Request sync osd read
1381 static int rbd_req_sync_exec(struct rbd_device *dev,
1389 struct ceph_osd_req_op *ops;
1390 int cls_len = strlen(cls);
1391 int method_len = strlen(method);
1392 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_CALL,
1393 cls_len + method_len + len);
1397 ops[0].cls.class_name = cls;
1398 ops[0].cls.class_len = (__u8)cls_len;
1399 ops[0].cls.method_name = method;
1400 ops[0].cls.method_len = (__u8)method_len;
1401 ops[0].cls.argc = 0;
1402 ops[0].cls.indata = data;
1403 ops[0].cls.indata_len = len;
1405 ret = rbd_req_sync_op(dev, NULL,
1408 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1410 1, obj, 0, 0, NULL, NULL, ver);
1412 rbd_destroy_ops(ops);
1414 dout("cls_exec returned %d\n", ret);
1418 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1420 struct rbd_req_coll *coll =
1421 kzalloc(sizeof(struct rbd_req_coll) +
1422 sizeof(struct rbd_req_status) * num_reqs,
1427 coll->total = num_reqs;
1428 kref_init(&coll->kref);
1433 * block device queue callback
1435 static void rbd_rq_fn(struct request_queue *q)
1437 struct rbd_device *rbd_dev = q->queuedata;
1439 struct bio_pair *bp = NULL;
1441 while ((rq = blk_fetch_request(q))) {
1443 struct bio *rq_bio, *next_bio = NULL;
1445 int size, op_size = 0;
1447 int num_segs, cur_seg = 0;
1448 struct rbd_req_coll *coll;
1450 /* peek at request from block layer */
1454 dout("fetched request\n");
1456 /* filter out block requests we don't understand */
1457 if ((rq->cmd_type != REQ_TYPE_FS)) {
1458 __blk_end_request_all(rq, 0);
1462 /* deduce our operation (read, write) */
1463 do_write = (rq_data_dir(rq) == WRITE);
1465 size = blk_rq_bytes(rq);
1466 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1468 if (do_write && rbd_dev->read_only) {
1469 __blk_end_request_all(rq, -EROFS);
1473 spin_unlock_irq(q->queue_lock);
1475 dout("%s 0x%x bytes at 0x%llx\n",
1476 do_write ? "write" : "read",
1477 size, blk_rq_pos(rq) * SECTOR_SIZE);
1479 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1480 coll = rbd_alloc_coll(num_segs);
1482 spin_lock_irq(q->queue_lock);
1483 __blk_end_request_all(rq, -ENOMEM);
1488 /* a bio clone to be passed down to OSD req */
1489 dout("rq->bio->bi_vcnt=%d\n", rq->bio->bi_vcnt);
1490 op_size = rbd_get_segment(&rbd_dev->header,
1491 rbd_dev->header.object_prefix,
1494 kref_get(&coll->kref);
1495 bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1496 op_size, GFP_ATOMIC);
1498 rbd_coll_end_req_index(rq, coll, cur_seg,
1504 /* init OSD command: write or read */
1506 rbd_req_write(rq, rbd_dev,
1507 rbd_dev->header.snapc,
1512 rbd_req_read(rq, rbd_dev,
1525 kref_put(&coll->kref, rbd_coll_release);
1528 bio_pair_release(bp);
1529 spin_lock_irq(q->queue_lock);
1534 * a queue callback. Makes sure that we don't create a bio that spans across
1535 * multiple osd objects. One exception would be with a single page bios,
1536 * which we handle later at bio_chain_clone
1538 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1539 struct bio_vec *bvec)
1541 struct rbd_device *rbd_dev = q->queuedata;
1542 unsigned int chunk_sectors;
1544 unsigned int bio_sectors;
1547 chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1548 sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1549 bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
1551 max = (chunk_sectors - ((sector & (chunk_sectors - 1))
1552 + bio_sectors)) << SECTOR_SHIFT;
1554 max = 0; /* bio_add cannot handle a negative return */
1555 if (max <= bvec->bv_len && bio_sectors == 0)
1556 return bvec->bv_len;
1560 static void rbd_free_disk(struct rbd_device *rbd_dev)
1562 struct gendisk *disk = rbd_dev->disk;
1567 rbd_header_free(&rbd_dev->header);
1569 if (disk->flags & GENHD_FL_UP)
1572 blk_cleanup_queue(disk->queue);
1577 * reload the ondisk the header
1579 static int rbd_read_header(struct rbd_device *rbd_dev,
1580 struct rbd_image_header *header)
1583 struct rbd_image_header_ondisk *dh;
1589 * First reads the fixed-size header to determine the number
1590 * of snapshots, then re-reads it, along with all snapshot
1591 * records as well as their stored names.
1595 dh = kmalloc(len, GFP_KERNEL);
1599 rc = rbd_req_sync_read(rbd_dev,
1601 rbd_dev->obj_md_name,
1607 rc = rbd_header_from_disk(header, dh, snap_count, GFP_KERNEL);
1610 pr_warning("unrecognized header format"
1611 " for image %s", rbd_dev->obj);
1615 if (snap_count == header->total_snaps)
1618 snap_count = header->total_snaps;
1619 len = sizeof (*dh) +
1620 snap_count * sizeof(struct rbd_image_snap_ondisk) +
1621 header->snap_names_len;
1623 rbd_header_free(header);
1626 header->obj_version = ver;
1636 static int rbd_header_add_snap(struct rbd_device *dev,
1637 const char *snap_name,
1640 int name_len = strlen(snap_name);
1645 struct ceph_mon_client *monc;
1647 /* we should create a snapshot only if we're pointing at the head */
1648 if (dev->snap_id != CEPH_NOSNAP)
1651 monc = &dev->rbd_client->client->monc;
1652 ret = ceph_monc_create_snapid(monc, dev->pool_id, &new_snapid);
1653 dout("created snapid=%lld\n", new_snapid);
1657 data = kmalloc(name_len + 16, gfp_flags);
1662 e = data + name_len + 16;
1664 ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
1665 ceph_encode_64_safe(&p, e, new_snapid, bad);
1667 ret = rbd_req_sync_exec(dev, dev->obj_md_name, "rbd", "snap_add",
1668 data, p - data, &ver);
1675 down_write(&dev->header_rwsem);
1676 dev->header.snapc->seq = new_snapid;
1677 up_write(&dev->header_rwsem);
1684 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1686 struct rbd_snap *snap;
1688 while (!list_empty(&rbd_dev->snaps)) {
1689 snap = list_first_entry(&rbd_dev->snaps, struct rbd_snap, node);
1690 __rbd_remove_snap_dev(rbd_dev, snap);
1695 * only read the first part of the ondisk header, without the snaps info
1697 static int __rbd_refresh_header(struct rbd_device *rbd_dev)
1700 struct rbd_image_header h;
1704 ret = rbd_read_header(rbd_dev, &h);
1709 set_capacity(rbd_dev->disk, h.image_size / SECTOR_SIZE);
1711 down_write(&rbd_dev->header_rwsem);
1713 snap_seq = rbd_dev->header.snapc->seq;
1714 if (rbd_dev->header.total_snaps &&
1715 rbd_dev->header.snapc->snaps[0] == snap_seq)
1716 /* pointing at the head, will need to follow that
1720 /* rbd_dev->header.object_prefix shouldn't change */
1721 kfree(rbd_dev->header.snap_sizes);
1722 kfree(rbd_dev->header.snap_names);
1723 kfree(rbd_dev->header.snapc);
1725 rbd_dev->header.total_snaps = h.total_snaps;
1726 rbd_dev->header.snapc = h.snapc;
1727 rbd_dev->header.snap_names = h.snap_names;
1728 rbd_dev->header.snap_names_len = h.snap_names_len;
1729 rbd_dev->header.snap_sizes = h.snap_sizes;
1730 /* Free the extra copy of the object prefix */
1731 WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1732 kfree(h.object_prefix);
1735 rbd_dev->header.snapc->seq = rbd_dev->header.snapc->snaps[0];
1737 rbd_dev->header.snapc->seq = snap_seq;
1739 ret = __rbd_init_snaps_header(rbd_dev);
1741 up_write(&rbd_dev->header_rwsem);
1746 static int rbd_init_disk(struct rbd_device *rbd_dev)
1748 struct gendisk *disk;
1749 struct request_queue *q;
1754 /* contact OSD, request size info about the object being mapped */
1755 rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1759 /* no need to lock here, as rbd_dev is not registered yet */
1760 rc = __rbd_init_snaps_header(rbd_dev);
1764 rc = rbd_header_set_snap(rbd_dev, &total_size);
1768 /* create gendisk info */
1770 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1774 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1776 disk->major = rbd_dev->major;
1777 disk->first_minor = 0;
1778 disk->fops = &rbd_bd_ops;
1779 disk->private_data = rbd_dev;
1783 q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1787 /* We use the default size, but let's be explicit about it. */
1788 blk_queue_physical_block_size(q, SECTOR_SIZE);
1790 /* set io sizes to object size */
1791 segment_size = rbd_obj_bytes(&rbd_dev->header);
1792 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1793 blk_queue_max_segment_size(q, segment_size);
1794 blk_queue_io_min(q, segment_size);
1795 blk_queue_io_opt(q, segment_size);
1797 blk_queue_merge_bvec(q, rbd_merge_bvec);
1800 q->queuedata = rbd_dev;
1802 rbd_dev->disk = disk;
1805 /* finally, announce the disk to the world */
1806 set_capacity(disk, total_size / SECTOR_SIZE);
1809 pr_info("%s: added with size 0x%llx\n",
1810 disk->disk_name, (unsigned long long)total_size);
1823 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1825 return container_of(dev, struct rbd_device, dev);
1828 static ssize_t rbd_size_show(struct device *dev,
1829 struct device_attribute *attr, char *buf)
1831 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1833 return sprintf(buf, "%llu\n", (unsigned long long)rbd_dev->header.image_size);
1836 static ssize_t rbd_major_show(struct device *dev,
1837 struct device_attribute *attr, char *buf)
1839 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1841 return sprintf(buf, "%d\n", rbd_dev->major);
1844 static ssize_t rbd_client_id_show(struct device *dev,
1845 struct device_attribute *attr, char *buf)
1847 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1849 return sprintf(buf, "client%lld\n",
1850 ceph_client_id(rbd_dev->rbd_client->client));
1853 static ssize_t rbd_pool_show(struct device *dev,
1854 struct device_attribute *attr, char *buf)
1856 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1858 return sprintf(buf, "%s\n", rbd_dev->pool_name);
1861 static ssize_t rbd_pool_id_show(struct device *dev,
1862 struct device_attribute *attr, char *buf)
1864 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1866 return sprintf(buf, "%d\n", rbd_dev->pool_id);
1869 static ssize_t rbd_name_show(struct device *dev,
1870 struct device_attribute *attr, char *buf)
1872 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1874 return sprintf(buf, "%s\n", rbd_dev->obj);
1877 static ssize_t rbd_snap_show(struct device *dev,
1878 struct device_attribute *attr,
1881 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1883 return sprintf(buf, "%s\n", rbd_dev->snap_name);
1886 static ssize_t rbd_image_refresh(struct device *dev,
1887 struct device_attribute *attr,
1891 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1895 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1897 rc = __rbd_refresh_header(rbd_dev);
1901 mutex_unlock(&ctl_mutex);
1905 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1906 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1907 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1908 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1909 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
1910 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1911 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1912 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1913 static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
1915 static struct attribute *rbd_attrs[] = {
1916 &dev_attr_size.attr,
1917 &dev_attr_major.attr,
1918 &dev_attr_client_id.attr,
1919 &dev_attr_pool.attr,
1920 &dev_attr_pool_id.attr,
1921 &dev_attr_name.attr,
1922 &dev_attr_current_snap.attr,
1923 &dev_attr_refresh.attr,
1924 &dev_attr_create_snap.attr,
1928 static struct attribute_group rbd_attr_group = {
1932 static const struct attribute_group *rbd_attr_groups[] = {
1937 static void rbd_sysfs_dev_release(struct device *dev)
1941 static struct device_type rbd_device_type = {
1943 .groups = rbd_attr_groups,
1944 .release = rbd_sysfs_dev_release,
1952 static ssize_t rbd_snap_size_show(struct device *dev,
1953 struct device_attribute *attr,
1956 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1958 return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
1961 static ssize_t rbd_snap_id_show(struct device *dev,
1962 struct device_attribute *attr,
1965 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1967 return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
1970 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
1971 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
1973 static struct attribute *rbd_snap_attrs[] = {
1974 &dev_attr_snap_size.attr,
1975 &dev_attr_snap_id.attr,
1979 static struct attribute_group rbd_snap_attr_group = {
1980 .attrs = rbd_snap_attrs,
1983 static void rbd_snap_dev_release(struct device *dev)
1985 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1990 static const struct attribute_group *rbd_snap_attr_groups[] = {
1991 &rbd_snap_attr_group,
1995 static struct device_type rbd_snap_device_type = {
1996 .groups = rbd_snap_attr_groups,
1997 .release = rbd_snap_dev_release,
2000 static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
2001 struct rbd_snap *snap)
2003 list_del(&snap->node);
2004 device_unregister(&snap->dev);
2007 static int rbd_register_snap_dev(struct rbd_device *rbd_dev,
2008 struct rbd_snap *snap,
2009 struct device *parent)
2011 struct device *dev = &snap->dev;
2014 dev->type = &rbd_snap_device_type;
2015 dev->parent = parent;
2016 dev->release = rbd_snap_dev_release;
2017 dev_set_name(dev, "snap_%s", snap->name);
2018 ret = device_register(dev);
2023 static int __rbd_add_snap_dev(struct rbd_device *rbd_dev,
2024 int i, const char *name,
2025 struct rbd_snap **snapp)
2028 struct rbd_snap *snap = kzalloc(sizeof(*snap), GFP_KERNEL);
2031 snap->name = kstrdup(name, GFP_KERNEL);
2032 snap->size = rbd_dev->header.snap_sizes[i];
2033 snap->id = rbd_dev->header.snapc->snaps[i];
2034 if (device_is_registered(&rbd_dev->dev)) {
2035 ret = rbd_register_snap_dev(rbd_dev, snap,
2049 * search for the previous snap in a null delimited string list
2051 const char *rbd_prev_snap_name(const char *name, const char *start)
2053 if (name < start + 2)
2066 * compare the old list of snapshots that we have to what's in the header
2067 * and update it accordingly. Note that the header holds the snapshots
2068 * in a reverse order (from newest to oldest) and we need to go from
2069 * older to new so that we don't get a duplicate snap name when
2070 * doing the process (e.g., removed snapshot and recreated a new
2071 * one with the same name.
2073 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
2075 const char *name, *first_name;
2076 int i = rbd_dev->header.total_snaps;
2077 struct rbd_snap *snap, *old_snap = NULL;
2079 struct list_head *p, *n;
2081 first_name = rbd_dev->header.snap_names;
2082 name = first_name + rbd_dev->header.snap_names_len;
2084 list_for_each_prev_safe(p, n, &rbd_dev->snaps) {
2087 old_snap = list_entry(p, struct rbd_snap, node);
2090 cur_id = rbd_dev->header.snapc->snaps[i - 1];
2092 if (!i || old_snap->id < cur_id) {
2093 /* old_snap->id was skipped, thus was removed */
2094 __rbd_remove_snap_dev(rbd_dev, old_snap);
2097 if (old_snap->id == cur_id) {
2098 /* we have this snapshot already */
2100 name = rbd_prev_snap_name(name, first_name);
2104 i--, name = rbd_prev_snap_name(name, first_name)) {
2109 cur_id = rbd_dev->header.snapc->snaps[i];
2110 /* snapshot removal? handle it above */
2111 if (cur_id >= old_snap->id)
2113 /* a new snapshot */
2114 ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2118 /* note that we add it backward so using n and not p */
2119 list_add(&snap->node, n);
2123 /* we're done going over the old snap list, just add what's left */
2124 for (; i > 0; i--) {
2125 name = rbd_prev_snap_name(name, first_name);
2130 ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2133 list_add(&snap->node, &rbd_dev->snaps);
2139 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2143 struct rbd_snap *snap;
2145 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2146 dev = &rbd_dev->dev;
2148 dev->bus = &rbd_bus_type;
2149 dev->type = &rbd_device_type;
2150 dev->parent = &rbd_root_dev;
2151 dev->release = rbd_dev_release;
2152 dev_set_name(dev, "%d", rbd_dev->id);
2153 ret = device_register(dev);
2157 list_for_each_entry(snap, &rbd_dev->snaps, node) {
2158 ret = rbd_register_snap_dev(rbd_dev, snap,
2164 mutex_unlock(&ctl_mutex);
2168 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2170 device_unregister(&rbd_dev->dev);
2173 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2178 ret = rbd_req_sync_watch(rbd_dev, rbd_dev->obj_md_name,
2179 rbd_dev->header.obj_version);
2180 if (ret == -ERANGE) {
2181 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2182 rc = __rbd_refresh_header(rbd_dev);
2183 mutex_unlock(&ctl_mutex);
2187 } while (ret == -ERANGE);
2192 static atomic64_t rbd_id_max = ATOMIC64_INIT(0);
2195 * Get a unique rbd identifier for the given new rbd_dev, and add
2196 * the rbd_dev to the global list. The minimum rbd id is 1.
2198 static void rbd_id_get(struct rbd_device *rbd_dev)
2200 rbd_dev->id = atomic64_inc_return(&rbd_id_max);
2202 spin_lock(&rbd_dev_list_lock);
2203 list_add_tail(&rbd_dev->node, &rbd_dev_list);
2204 spin_unlock(&rbd_dev_list_lock);
2208 * Remove an rbd_dev from the global list, and record that its
2209 * identifier is no longer in use.
2211 static void rbd_id_put(struct rbd_device *rbd_dev)
2213 struct list_head *tmp;
2214 int rbd_id = rbd_dev->id;
2219 spin_lock(&rbd_dev_list_lock);
2220 list_del_init(&rbd_dev->node);
2223 * If the id being "put" is not the current maximum, there
2224 * is nothing special we need to do.
2226 if (rbd_id != atomic64_read(&rbd_id_max)) {
2227 spin_unlock(&rbd_dev_list_lock);
2232 * We need to update the current maximum id. Search the
2233 * list to find out what it is. We're more likely to find
2234 * the maximum at the end, so search the list backward.
2237 list_for_each_prev(tmp, &rbd_dev_list) {
2238 struct rbd_device *rbd_dev;
2240 rbd_dev = list_entry(tmp, struct rbd_device, node);
2241 if (rbd_id > max_id)
2244 spin_unlock(&rbd_dev_list_lock);
2247 * The max id could have been updated by rbd_id_get(), in
2248 * which case it now accurately reflects the new maximum.
2249 * Be careful not to overwrite the maximum value in that
2252 atomic64_cmpxchg(&rbd_id_max, rbd_id, max_id);
2256 * Skips over white space at *buf, and updates *buf to point to the
2257 * first found non-space character (if any). Returns the length of
2258 * the token (string of non-white space characters) found. Note
2259 * that *buf must be terminated with '\0'.
2261 static inline size_t next_token(const char **buf)
2264 * These are the characters that produce nonzero for
2265 * isspace() in the "C" and "POSIX" locales.
2267 const char *spaces = " \f\n\r\t\v";
2269 *buf += strspn(*buf, spaces); /* Find start of token */
2271 return strcspn(*buf, spaces); /* Return token length */
2275 * Finds the next token in *buf, and if the provided token buffer is
2276 * big enough, copies the found token into it. The result, if
2277 * copied, is guaranteed to be terminated with '\0'. Note that *buf
2278 * must be terminated with '\0' on entry.
2280 * Returns the length of the token found (not including the '\0').
2281 * Return value will be 0 if no token is found, and it will be >=
2282 * token_size if the token would not fit.
2284 * The *buf pointer will be updated to point beyond the end of the
2285 * found token. Note that this occurs even if the token buffer is
2286 * too small to hold it.
2288 static inline size_t copy_token(const char **buf,
2294 len = next_token(buf);
2295 if (len < token_size) {
2296 memcpy(token, *buf, len);
2297 *(token + len) = '\0';
2305 * Finds the next token in *buf, dynamically allocates a buffer big
2306 * enough to hold a copy of it, and copies the token into the new
2307 * buffer. The copy is guaranteed to be terminated with '\0'. Note
2308 * that a duplicate buffer is created even for a zero-length token.
2310 * Returns a pointer to the newly-allocated duplicate, or a null
2311 * pointer if memory for the duplicate was not available. If
2312 * the lenp argument is a non-null pointer, the length of the token
2313 * (not including the '\0') is returned in *lenp.
2315 * If successful, the *buf pointer will be updated to point beyond
2316 * the end of the found token.
2318 * Note: uses GFP_KERNEL for allocation.
2320 static inline char *dup_token(const char **buf, size_t *lenp)
2325 len = next_token(buf);
2326 dup = kmalloc(len + 1, GFP_KERNEL);
2330 memcpy(dup, *buf, len);
2331 *(dup + len) = '\0';
2341 * This fills in the pool_name, obj, obj_len, snap_name, obj_len,
2342 * rbd_dev, rbd_md_name, and name fields of the given rbd_dev, based
2343 * on the list of monitor addresses and other options provided via
2346 * Note: rbd_dev is assumed to have been initially zero-filled.
2348 static int rbd_add_parse_args(struct rbd_device *rbd_dev,
2350 const char **mon_addrs,
2351 size_t *mon_addrs_size,
2353 size_t options_size)
2358 /* The first four tokens are required */
2360 len = next_token(&buf);
2363 *mon_addrs_size = len + 1;
2368 len = copy_token(&buf, options, options_size);
2369 if (!len || len >= options_size)
2373 rbd_dev->pool_name = dup_token(&buf, NULL);
2374 if (!rbd_dev->pool_name)
2377 rbd_dev->obj = dup_token(&buf, &rbd_dev->obj_len);
2381 /* Create the name of the header object */
2383 rbd_dev->obj_md_name = kmalloc(rbd_dev->obj_len
2384 + sizeof (RBD_SUFFIX),
2386 if (!rbd_dev->obj_md_name)
2388 sprintf(rbd_dev->obj_md_name, "%s%s", rbd_dev->obj, RBD_SUFFIX);
2391 * The snapshot name is optional. If none is is supplied,
2392 * we use the default value.
2394 rbd_dev->snap_name = dup_token(&buf, &len);
2395 if (!rbd_dev->snap_name)
2398 /* Replace the empty name with the default */
2399 kfree(rbd_dev->snap_name);
2401 = kmalloc(sizeof (RBD_SNAP_HEAD_NAME), GFP_KERNEL);
2402 if (!rbd_dev->snap_name)
2405 memcpy(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
2406 sizeof (RBD_SNAP_HEAD_NAME));
2412 kfree(rbd_dev->obj_md_name);
2413 kfree(rbd_dev->obj);
2414 kfree(rbd_dev->pool_name);
2415 rbd_dev->pool_name = NULL;
2420 static ssize_t rbd_add(struct bus_type *bus,
2425 struct rbd_device *rbd_dev = NULL;
2426 const char *mon_addrs = NULL;
2427 size_t mon_addrs_size = 0;
2428 struct ceph_osd_client *osdc;
2431 if (!try_module_get(THIS_MODULE))
2434 options = kmalloc(count, GFP_KERNEL);
2437 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2441 /* static rbd_device initialization */
2442 spin_lock_init(&rbd_dev->lock);
2443 INIT_LIST_HEAD(&rbd_dev->node);
2444 INIT_LIST_HEAD(&rbd_dev->snaps);
2445 init_rwsem(&rbd_dev->header_rwsem);
2447 init_rwsem(&rbd_dev->header_rwsem);
2449 /* generate unique id: find highest unique id, add one */
2450 rbd_id_get(rbd_dev);
2452 /* Fill in the device name, now that we have its id. */
2453 BUILD_BUG_ON(DEV_NAME_LEN
2454 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
2455 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->id);
2457 /* parse add command */
2458 rc = rbd_add_parse_args(rbd_dev, buf, &mon_addrs, &mon_addrs_size,
2463 rbd_dev->rbd_client = rbd_get_client(mon_addrs, mon_addrs_size - 1,
2465 if (IS_ERR(rbd_dev->rbd_client)) {
2466 rc = PTR_ERR(rbd_dev->rbd_client);
2471 osdc = &rbd_dev->rbd_client->client->osdc;
2472 rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2474 goto err_out_client;
2475 rbd_dev->pool_id = rc;
2477 /* register our block device */
2478 rc = register_blkdev(0, rbd_dev->name);
2480 goto err_out_client;
2481 rbd_dev->major = rc;
2483 rc = rbd_bus_add_dev(rbd_dev);
2485 goto err_out_blkdev;
2488 * At this point cleanup in the event of an error is the job
2489 * of the sysfs code (initiated by rbd_bus_del_dev()).
2491 * Set up and announce blkdev mapping.
2493 rc = rbd_init_disk(rbd_dev);
2497 rc = rbd_init_watch_dev(rbd_dev);
2504 /* this will also clean up rest of rbd_dev stuff */
2506 rbd_bus_del_dev(rbd_dev);
2511 unregister_blkdev(rbd_dev->major, rbd_dev->name);
2513 rbd_put_client(rbd_dev);
2515 if (rbd_dev->pool_name) {
2516 kfree(rbd_dev->snap_name);
2517 kfree(rbd_dev->obj_md_name);
2518 kfree(rbd_dev->obj);
2519 kfree(rbd_dev->pool_name);
2521 rbd_id_put(rbd_dev);
2526 dout("Error adding device %s\n", buf);
2527 module_put(THIS_MODULE);
2529 return (ssize_t) rc;
2532 static struct rbd_device *__rbd_get_dev(unsigned long id)
2534 struct list_head *tmp;
2535 struct rbd_device *rbd_dev;
2537 spin_lock(&rbd_dev_list_lock);
2538 list_for_each(tmp, &rbd_dev_list) {
2539 rbd_dev = list_entry(tmp, struct rbd_device, node);
2540 if (rbd_dev->id == id) {
2541 spin_unlock(&rbd_dev_list_lock);
2545 spin_unlock(&rbd_dev_list_lock);
2549 static void rbd_dev_release(struct device *dev)
2551 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2553 if (rbd_dev->watch_request) {
2554 struct ceph_client *client = rbd_dev->rbd_client->client;
2556 ceph_osdc_unregister_linger_request(&client->osdc,
2557 rbd_dev->watch_request);
2559 if (rbd_dev->watch_event)
2560 rbd_req_sync_unwatch(rbd_dev, rbd_dev->obj_md_name);
2562 rbd_put_client(rbd_dev);
2564 /* clean up and free blkdev */
2565 rbd_free_disk(rbd_dev);
2566 unregister_blkdev(rbd_dev->major, rbd_dev->name);
2568 /* done with the id, and with the rbd_dev */
2569 kfree(rbd_dev->snap_name);
2570 kfree(rbd_dev->obj_md_name);
2571 kfree(rbd_dev->pool_name);
2572 kfree(rbd_dev->obj);
2573 rbd_id_put(rbd_dev);
2576 /* release module ref */
2577 module_put(THIS_MODULE);
2580 static ssize_t rbd_remove(struct bus_type *bus,
2584 struct rbd_device *rbd_dev = NULL;
2589 rc = strict_strtoul(buf, 10, &ul);
2593 /* convert to int; abort if we lost anything in the conversion */
2594 target_id = (int) ul;
2595 if (target_id != ul)
2598 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2600 rbd_dev = __rbd_get_dev(target_id);
2606 __rbd_remove_all_snaps(rbd_dev);
2607 rbd_bus_del_dev(rbd_dev);
2610 mutex_unlock(&ctl_mutex);
2614 static ssize_t rbd_snap_add(struct device *dev,
2615 struct device_attribute *attr,
2619 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2621 char *name = kmalloc(count + 1, GFP_KERNEL);
2625 snprintf(name, count, "%s", buf);
2627 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2629 ret = rbd_header_add_snap(rbd_dev,
2634 ret = __rbd_refresh_header(rbd_dev);
2638 /* shouldn't hold ctl_mutex when notifying.. notify might
2639 trigger a watch callback that would need to get that mutex */
2640 mutex_unlock(&ctl_mutex);
2642 /* make a best effort, don't error if failed */
2643 rbd_req_sync_notify(rbd_dev, rbd_dev->obj_md_name);
2650 mutex_unlock(&ctl_mutex);
2656 * create control files in sysfs
2659 static int rbd_sysfs_init(void)
2663 ret = device_register(&rbd_root_dev);
2667 ret = bus_register(&rbd_bus_type);
2669 device_unregister(&rbd_root_dev);
2674 static void rbd_sysfs_cleanup(void)
2676 bus_unregister(&rbd_bus_type);
2677 device_unregister(&rbd_root_dev);
2680 int __init rbd_init(void)
2684 rc = rbd_sysfs_init();
2687 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
2691 void __exit rbd_exit(void)
2693 rbd_sysfs_cleanup();
2696 module_init(rbd_init);
2697 module_exit(rbd_exit);
2699 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2700 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2701 MODULE_DESCRIPTION("rados block device");
2703 /* following authorship retained from original osdblk.c */
2704 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2706 MODULE_LICENSE("GPL");