2 rbd.c -- Export ceph rados objects as a Linux block device
5 based on drivers/block/osdblk.c:
7 Copyright 2009 Red Hat, Inc.
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
24 For usage instructions, please refer to:
26 Documentation/ABI/testing/sysfs-bus-rbd
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
40 #include <linux/blkdev.h>
42 #include "rbd_types.h"
44 #define DRV_NAME "rbd"
45 #define DRV_NAME_LONG "rbd (rados block device)"
47 #define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
49 #define RBD_MAX_MD_NAME_LEN (RBD_MAX_OBJ_NAME_LEN + sizeof(RBD_SUFFIX))
50 #define RBD_MAX_POOL_NAME_LEN 64
51 #define RBD_MAX_SNAP_NAME_LEN 32
52 #define RBD_MAX_OPT_LEN 1024
54 #define RBD_SNAP_HEAD_NAME "-"
56 #define DEV_NAME_LEN 32
58 #define RBD_NOTIFY_TIMEOUT_DEFAULT 10
61 * block device image metadata (in-memory version)
63 struct rbd_image_header {
69 struct rw_semaphore snap_rwsem;
70 struct ceph_snap_context *snapc;
71 size_t snap_names_len;
86 * an instance of the client. multiple devices may share a client.
89 struct ceph_client *client;
90 struct rbd_options *rbd_opts;
92 struct list_head node;
101 struct request *rq; /* blk layer request */
102 struct bio *bio; /* cloned bio */
103 struct page **pages; /* list of used pages */
106 struct rbd_req_coll *coll;
109 struct rbd_req_status {
116 * a collection of requests
118 struct rbd_req_coll {
122 struct rbd_req_status status[0];
129 struct list_head node;
137 int id; /* blkdev unique id */
139 int major; /* blkdev assigned major */
140 struct gendisk *disk; /* blkdev's gendisk and rq */
141 struct request_queue *q;
143 struct rbd_client *rbd_client;
145 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
147 spinlock_t lock; /* queue lock */
149 struct rbd_image_header header;
150 char obj[RBD_MAX_OBJ_NAME_LEN]; /* rbd image name */
152 char obj_md_name[RBD_MAX_MD_NAME_LEN]; /* hdr nm. */
153 char pool_name[RBD_MAX_POOL_NAME_LEN];
156 struct ceph_osd_event *watch_event;
157 struct ceph_osd_request *watch_request;
159 char snap_name[RBD_MAX_SNAP_NAME_LEN];
160 u32 cur_snap; /* index+1 of current snapshot within snap context
164 struct list_head node;
166 /* list of snapshots */
167 struct list_head snaps;
173 static struct bus_type rbd_bus_type = {
177 static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
179 static LIST_HEAD(rbd_dev_list); /* devices */
180 static DEFINE_SPINLOCK(rbd_dev_list_lock);
182 static LIST_HEAD(rbd_client_list); /* clients */
183 static DEFINE_SPINLOCK(node_lock); /* protects client get/put */
185 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
186 static void rbd_dev_release(struct device *dev);
187 static ssize_t rbd_snap_add(struct device *dev,
188 struct device_attribute *attr,
191 static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
192 struct rbd_snap *snap);
195 static struct rbd_device *dev_to_rbd(struct device *dev)
197 return container_of(dev, struct rbd_device, dev);
200 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
202 return get_device(&rbd_dev->dev);
205 static void rbd_put_dev(struct rbd_device *rbd_dev)
207 put_device(&rbd_dev->dev);
210 static int __rbd_update_snaps(struct rbd_device *rbd_dev);
212 static int rbd_open(struct block_device *bdev, fmode_t mode)
214 struct gendisk *disk = bdev->bd_disk;
215 struct rbd_device *rbd_dev = disk->private_data;
217 rbd_get_dev(rbd_dev);
219 set_device_ro(bdev, rbd_dev->read_only);
221 if ((mode & FMODE_WRITE) && rbd_dev->read_only)
227 static int rbd_release(struct gendisk *disk, fmode_t mode)
229 struct rbd_device *rbd_dev = disk->private_data;
231 rbd_put_dev(rbd_dev);
236 static const struct block_device_operations rbd_bd_ops = {
237 .owner = THIS_MODULE,
239 .release = rbd_release,
243 * Initialize an rbd client instance.
246 static struct rbd_client *rbd_client_create(struct ceph_options *opt,
247 struct rbd_options *rbd_opts)
249 struct rbd_client *rbdc;
252 dout("rbd_client_create\n");
253 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
257 kref_init(&rbdc->kref);
258 INIT_LIST_HEAD(&rbdc->node);
260 rbdc->client = ceph_create_client(opt, rbdc, 0, 0);
261 if (IS_ERR(rbdc->client))
263 opt = NULL; /* Now rbdc->client is responsible for opt */
265 ret = ceph_open_session(rbdc->client);
269 rbdc->rbd_opts = rbd_opts;
271 spin_lock(&node_lock);
272 list_add_tail(&rbdc->node, &rbd_client_list);
273 spin_unlock(&node_lock);
275 dout("rbd_client_create created %p\n", rbdc);
279 ceph_destroy_client(rbdc->client);
284 ceph_destroy_options(opt);
289 * Find a ceph client with specific addr and configuration.
291 static struct rbd_client *__rbd_client_find(struct ceph_options *opt)
293 struct rbd_client *client_node;
295 if (opt->flags & CEPH_OPT_NOSHARE)
298 list_for_each_entry(client_node, &rbd_client_list, node)
299 if (ceph_compare_options(opt, client_node->client) == 0)
312 /* string args above */
315 static match_table_t rbdopt_tokens = {
316 {Opt_notify_timeout, "notify_timeout=%d"},
318 /* string args above */
322 static int parse_rbd_opts_token(char *c, void *private)
324 struct rbd_options *rbdopt = private;
325 substring_t argstr[MAX_OPT_ARGS];
326 int token, intval, ret;
328 token = match_token(c, rbdopt_tokens, argstr);
332 if (token < Opt_last_int) {
333 ret = match_int(&argstr[0], &intval);
335 pr_err("bad mount option arg (not int) "
339 dout("got int token %d val %d\n", token, intval);
340 } else if (token > Opt_last_int && token < Opt_last_string) {
341 dout("got string token %d val %s\n", token,
344 dout("got token %d\n", token);
348 case Opt_notify_timeout:
349 rbdopt->notify_timeout = intval;
358 * Get a ceph client with specific addr and configuration, if one does
359 * not exist create it.
361 static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
364 struct rbd_client *rbdc;
365 struct ceph_options *opt;
367 struct rbd_options *rbd_opts;
369 rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
373 rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
375 opt = ceph_parse_options(options, mon_addr,
376 mon_addr + strlen(mon_addr),
377 parse_rbd_opts_token, rbd_opts);
383 spin_lock(&node_lock);
384 rbdc = __rbd_client_find(opt);
386 ceph_destroy_options(opt);
389 /* using an existing client */
390 kref_get(&rbdc->kref);
391 rbd_dev->rbd_client = rbdc;
392 spin_unlock(&node_lock);
395 spin_unlock(&node_lock);
397 rbdc = rbd_client_create(opt, rbd_opts);
403 rbd_dev->rbd_client = rbdc;
411 * Destroy ceph client
413 * Caller must hold node_lock.
415 static void rbd_client_release(struct kref *kref)
417 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
419 dout("rbd_release_client %p\n", rbdc);
420 list_del(&rbdc->node);
422 ceph_destroy_client(rbdc->client);
423 kfree(rbdc->rbd_opts);
428 * Drop reference to ceph client node. If it's not referenced anymore, release
431 static void rbd_put_client(struct rbd_device *rbd_dev)
433 spin_lock(&node_lock);
434 kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
435 spin_unlock(&node_lock);
436 rbd_dev->rbd_client = NULL;
440 * Destroy requests collection
442 static void rbd_coll_release(struct kref *kref)
444 struct rbd_req_coll *coll =
445 container_of(kref, struct rbd_req_coll, kref);
447 dout("rbd_coll_release %p\n", coll);
452 * Create a new header structure, translate header format from the on-disk
455 static int rbd_header_from_disk(struct rbd_image_header *header,
456 struct rbd_image_header_ondisk *ondisk,
461 u32 snap_count = le32_to_cpu(ondisk->snap_count);
464 if (memcmp(ondisk, RBD_HEADER_TEXT, sizeof(RBD_HEADER_TEXT)))
467 init_rwsem(&header->snap_rwsem);
468 header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
469 header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
470 snap_count * sizeof (*ondisk),
475 header->snap_names = kmalloc(header->snap_names_len,
477 if (!header->snap_names)
479 header->snap_sizes = kmalloc(snap_count * sizeof(u64),
481 if (!header->snap_sizes)
484 header->snap_names = NULL;
485 header->snap_sizes = NULL;
487 memcpy(header->block_name, ondisk->block_name,
488 sizeof(ondisk->block_name));
490 header->image_size = le64_to_cpu(ondisk->image_size);
491 header->obj_order = ondisk->options.order;
492 header->crypt_type = ondisk->options.crypt_type;
493 header->comp_type = ondisk->options.comp_type;
495 atomic_set(&header->snapc->nref, 1);
496 header->snap_seq = le64_to_cpu(ondisk->snap_seq);
497 header->snapc->num_snaps = snap_count;
498 header->total_snaps = snap_count;
500 if (snap_count && allocated_snaps == snap_count) {
501 for (i = 0; i < snap_count; i++) {
502 header->snapc->snaps[i] =
503 le64_to_cpu(ondisk->snaps[i].id);
504 header->snap_sizes[i] =
505 le64_to_cpu(ondisk->snaps[i].image_size);
508 /* copy snapshot names */
509 memcpy(header->snap_names, &ondisk->snaps[i],
510 header->snap_names_len);
516 kfree(header->snap_names);
518 kfree(header->snapc);
522 static int snap_index(struct rbd_image_header *header, int snap_num)
524 return header->total_snaps - snap_num;
527 static u64 cur_snap_id(struct rbd_device *rbd_dev)
529 struct rbd_image_header *header = &rbd_dev->header;
531 if (!rbd_dev->cur_snap)
534 return header->snapc->snaps[snap_index(header, rbd_dev->cur_snap)];
537 static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
541 char *p = header->snap_names;
543 for (i = 0; i < header->total_snaps; i++, p += strlen(p) + 1) {
544 if (strcmp(snap_name, p) == 0)
547 if (i == header->total_snaps)
550 *seq = header->snapc->snaps[i];
553 *size = header->snap_sizes[i];
558 static int rbd_header_set_snap(struct rbd_device *dev, u64 *size)
560 struct rbd_image_header *header = &dev->header;
561 struct ceph_snap_context *snapc = header->snapc;
564 BUILD_BUG_ON(sizeof (dev->snap_name) < sizeof (RBD_SNAP_HEAD_NAME));
566 down_write(&header->snap_rwsem);
568 if (!memcmp(dev->snap_name, RBD_SNAP_HEAD_NAME,
569 sizeof (RBD_SNAP_HEAD_NAME))) {
570 if (header->total_snaps)
571 snapc->seq = header->snap_seq;
577 *size = header->image_size;
579 ret = snap_by_name(header, dev->snap_name, &snapc->seq, size);
583 dev->cur_snap = header->total_snaps - ret;
589 up_write(&header->snap_rwsem);
593 static void rbd_header_free(struct rbd_image_header *header)
595 kfree(header->snapc);
596 kfree(header->snap_names);
597 kfree(header->snap_sizes);
601 * get the actual striped segment name, offset and length
603 static u64 rbd_get_segment(struct rbd_image_header *header,
604 const char *block_name,
606 char *seg_name, u64 *segofs)
608 u64 seg = ofs >> header->obj_order;
611 snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
612 "%s.%012llx", block_name, seg);
614 ofs = ofs & ((1 << header->obj_order) - 1);
615 len = min_t(u64, len, (1 << header->obj_order) - ofs);
623 static int rbd_get_num_segments(struct rbd_image_header *header,
626 u64 start_seg = ofs >> header->obj_order;
627 u64 end_seg = (ofs + len - 1) >> header->obj_order;
628 return end_seg - start_seg + 1;
632 * returns the size of an object in the image
634 static u64 rbd_obj_bytes(struct rbd_image_header *header)
636 return 1 << header->obj_order;
643 static void bio_chain_put(struct bio *chain)
649 chain = chain->bi_next;
655 * zeros a bio chain, starting at specific offset
657 static void zero_bio_chain(struct bio *chain, int start_ofs)
666 bio_for_each_segment(bv, chain, i) {
667 if (pos + bv->bv_len > start_ofs) {
668 int remainder = max(start_ofs - pos, 0);
669 buf = bvec_kmap_irq(bv, &flags);
670 memset(buf + remainder, 0,
671 bv->bv_len - remainder);
672 bvec_kunmap_irq(buf, &flags);
677 chain = chain->bi_next;
682 * bio_chain_clone - clone a chain of bios up to a certain length.
683 * might return a bio_pair that will need to be released.
685 static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
686 struct bio_pair **bp,
687 int len, gfp_t gfpmask)
689 struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
693 bio_pair_release(*bp);
697 while (old_chain && (total < len)) {
698 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
702 if (total + old_chain->bi_size > len) {
706 * this split can only happen with a single paged bio,
707 * split_bio will BUG_ON if this is not the case
709 dout("bio_chain_clone split! total=%d remaining=%d"
711 (int)total, (int)len-total,
712 (int)old_chain->bi_size);
714 /* split the bio. We'll release it either in the next
715 call, or it will have to be released outside */
716 bp = bio_split(old_chain, (len - total) / 512ULL);
720 __bio_clone(tmp, &bp->bio1);
724 __bio_clone(tmp, old_chain);
725 *next = old_chain->bi_next;
729 gfpmask &= ~__GFP_WAIT;
733 new_chain = tail = tmp;
738 old_chain = old_chain->bi_next;
740 total += tmp->bi_size;
746 tail->bi_next = NULL;
753 dout("bio_chain_clone with err\n");
754 bio_chain_put(new_chain);
759 * helpers for osd request op vectors.
761 static int rbd_create_rw_ops(struct ceph_osd_req_op **ops,
766 *ops = kzalloc(sizeof(struct ceph_osd_req_op) * (num_ops + 1),
770 (*ops)[0].op = opcode;
772 * op extent offset and length will be set later on
773 * in calc_raw_layout()
775 (*ops)[0].payload_len = payload_len;
779 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
784 static void rbd_coll_end_req_index(struct request *rq,
785 struct rbd_req_coll *coll,
789 struct request_queue *q;
792 dout("rbd_coll_end_req_index %p index %d ret %d len %lld\n",
793 coll, index, ret, len);
799 blk_end_request(rq, ret, len);
805 spin_lock_irq(q->queue_lock);
806 coll->status[index].done = 1;
807 coll->status[index].rc = ret;
808 coll->status[index].bytes = len;
809 max = min = coll->num_done;
810 while (max < coll->total && coll->status[max].done)
813 for (i = min; i<max; i++) {
814 __blk_end_request(rq, coll->status[i].rc,
815 coll->status[i].bytes);
817 kref_put(&coll->kref, rbd_coll_release);
819 spin_unlock_irq(q->queue_lock);
822 static void rbd_coll_end_req(struct rbd_request *req,
825 rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
829 * Send ceph osd request
831 static int rbd_do_request(struct request *rq,
832 struct rbd_device *dev,
833 struct ceph_snap_context *snapc,
835 const char *obj, u64 ofs, u64 len,
840 struct ceph_osd_req_op *ops,
842 struct rbd_req_coll *coll,
844 void (*rbd_cb)(struct ceph_osd_request *req,
845 struct ceph_msg *msg),
846 struct ceph_osd_request **linger_req,
849 struct ceph_osd_request *req;
850 struct ceph_file_layout *layout;
853 struct timespec mtime = CURRENT_TIME;
854 struct rbd_request *req_data;
855 struct ceph_osd_request_head *reqhead;
856 struct rbd_image_header *header = &dev->header;
857 struct ceph_osd_client *osdc;
859 req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
862 rbd_coll_end_req_index(rq, coll, coll_index,
868 req_data->coll = coll;
869 req_data->coll_index = coll_index;
872 dout("rbd_do_request obj=%s ofs=%lld len=%lld\n", obj, len, ofs);
874 down_read(&header->snap_rwsem);
876 osdc = &dev->rbd_client->client->osdc;
877 req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
878 false, GFP_NOIO, pages, bio);
880 up_read(&header->snap_rwsem);
885 req->r_callback = rbd_cb;
889 req_data->pages = pages;
892 req->r_priv = req_data;
894 reqhead = req->r_request->front.iov_base;
895 reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
897 strncpy(req->r_oid, obj, sizeof(req->r_oid));
898 req->r_oid_len = strlen(req->r_oid);
900 layout = &req->r_file_layout;
901 memset(layout, 0, sizeof(*layout));
902 layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
903 layout->fl_stripe_count = cpu_to_le32(1);
904 layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
905 layout->fl_pg_preferred = cpu_to_le32(-1);
906 layout->fl_pg_pool = cpu_to_le32(dev->poolid);
907 ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
910 ceph_osdc_build_request(req, ofs, &len,
914 req->r_oid, req->r_oid_len);
915 up_read(&header->snap_rwsem);
918 ceph_osdc_set_request_linger(osdc, req);
922 ret = ceph_osdc_start_request(osdc, req, false);
927 ret = ceph_osdc_wait_request(osdc, req);
929 *ver = le64_to_cpu(req->r_reassert_version.version);
930 dout("reassert_ver=%lld\n",
931 le64_to_cpu(req->r_reassert_version.version));
932 ceph_osdc_put_request(req);
937 bio_chain_put(req_data->bio);
938 ceph_osdc_put_request(req);
940 rbd_coll_end_req(req_data, ret, len);
946 * Ceph osd op callback
948 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
950 struct rbd_request *req_data = req->r_priv;
951 struct ceph_osd_reply_head *replyhead;
952 struct ceph_osd_op *op;
958 replyhead = msg->front.iov_base;
959 WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
960 op = (void *)(replyhead + 1);
961 rc = le32_to_cpu(replyhead->result);
962 bytes = le64_to_cpu(op->extent.length);
963 read_op = (le32_to_cpu(op->op) == CEPH_OSD_OP_READ);
965 dout("rbd_req_cb bytes=%lld readop=%d rc=%d\n", bytes, read_op, rc);
967 if (rc == -ENOENT && read_op) {
968 zero_bio_chain(req_data->bio, 0);
970 } else if (rc == 0 && read_op && bytes < req_data->len) {
971 zero_bio_chain(req_data->bio, bytes);
972 bytes = req_data->len;
975 rbd_coll_end_req(req_data, rc, bytes);
978 bio_chain_put(req_data->bio);
980 ceph_osdc_put_request(req);
984 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
986 ceph_osdc_put_request(req);
990 * Do a synchronous ceph osd operation
992 static int rbd_req_sync_op(struct rbd_device *dev,
993 struct ceph_snap_context *snapc,
997 struct ceph_osd_req_op *orig_ops,
1002 struct ceph_osd_request **linger_req,
1006 struct page **pages;
1008 struct ceph_osd_req_op *ops = orig_ops;
1011 num_pages = calc_pages_for(ofs , len);
1012 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1014 return PTR_ERR(pages);
1017 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? len : 0);
1018 ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1022 if ((flags & CEPH_OSD_FLAG_WRITE) && buf) {
1023 ret = ceph_copy_to_page_vector(pages, buf, ofs, len);
1029 ret = rbd_do_request(NULL, dev, snapc, snapid,
1030 obj, ofs, len, NULL,
1041 if ((flags & CEPH_OSD_FLAG_READ) && buf)
1042 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1046 rbd_destroy_ops(ops);
1048 ceph_release_page_vector(pages, num_pages);
1053 * Do an asynchronous ceph osd operation
1055 static int rbd_do_op(struct request *rq,
1056 struct rbd_device *rbd_dev ,
1057 struct ceph_snap_context *snapc,
1059 int opcode, int flags, int num_reply,
1062 struct rbd_req_coll *coll,
1069 struct ceph_osd_req_op *ops;
1072 seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1076 seg_len = rbd_get_segment(&rbd_dev->header,
1077 rbd_dev->header.block_name,
1079 seg_name, &seg_ofs);
1081 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1083 ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1087 /* we've taken care of segment sizes earlier when we
1088 cloned the bios. We should never have a segment
1089 truncated at this point */
1090 BUG_ON(seg_len < len);
1092 ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1093 seg_name, seg_ofs, seg_len,
1100 rbd_req_cb, 0, NULL);
1102 rbd_destroy_ops(ops);
1109 * Request async osd write
1111 static int rbd_req_write(struct request *rq,
1112 struct rbd_device *rbd_dev,
1113 struct ceph_snap_context *snapc,
1116 struct rbd_req_coll *coll,
1119 return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1121 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1123 ofs, len, bio, coll, coll_index);
1127 * Request async osd read
1129 static int rbd_req_read(struct request *rq,
1130 struct rbd_device *rbd_dev,
1134 struct rbd_req_coll *coll,
1137 return rbd_do_op(rq, rbd_dev, NULL,
1138 (snapid ? snapid : CEPH_NOSNAP),
1142 ofs, len, bio, coll, coll_index);
1146 * Request sync osd read
1148 static int rbd_req_sync_read(struct rbd_device *dev,
1149 struct ceph_snap_context *snapc,
1156 return rbd_req_sync_op(dev, NULL,
1157 (snapid ? snapid : CEPH_NOSNAP),
1161 1, obj, ofs, len, buf, NULL, ver);
1165 * Request sync osd watch
1167 static int rbd_req_sync_notify_ack(struct rbd_device *dev,
1172 struct ceph_osd_req_op *ops;
1173 struct page **pages = NULL;
1176 ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1180 ops[0].watch.ver = cpu_to_le64(dev->header.obj_version);
1181 ops[0].watch.cookie = notify_id;
1182 ops[0].watch.flag = 0;
1184 ret = rbd_do_request(NULL, dev, NULL, CEPH_NOSNAP,
1191 rbd_simple_req_cb, 0, NULL);
1193 rbd_destroy_ops(ops);
1197 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1199 struct rbd_device *dev = (struct rbd_device *)data;
1205 dout("rbd_watch_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
1206 notify_id, (int)opcode);
1207 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1208 rc = __rbd_update_snaps(dev);
1209 mutex_unlock(&ctl_mutex);
1211 pr_warning(DRV_NAME "%d got notification but failed to update"
1212 " snaps: %d\n", dev->major, rc);
1214 rbd_req_sync_notify_ack(dev, ver, notify_id, dev->obj_md_name);
1218 * Request sync osd watch
1220 static int rbd_req_sync_watch(struct rbd_device *dev,
1224 struct ceph_osd_req_op *ops;
1225 struct ceph_osd_client *osdc = &dev->rbd_client->client->osdc;
1227 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1231 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1232 (void *)dev, &dev->watch_event);
1236 ops[0].watch.ver = cpu_to_le64(ver);
1237 ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie);
1238 ops[0].watch.flag = 1;
1240 ret = rbd_req_sync_op(dev, NULL,
1243 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1246 &dev->watch_request, NULL);
1251 rbd_destroy_ops(ops);
1255 ceph_osdc_cancel_event(dev->watch_event);
1256 dev->watch_event = NULL;
1258 rbd_destroy_ops(ops);
1263 * Request sync osd unwatch
1265 static int rbd_req_sync_unwatch(struct rbd_device *dev,
1268 struct ceph_osd_req_op *ops;
1270 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1274 ops[0].watch.ver = 0;
1275 ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie);
1276 ops[0].watch.flag = 0;
1278 ret = rbd_req_sync_op(dev, NULL,
1281 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1283 1, obj, 0, 0, NULL, NULL, NULL);
1285 rbd_destroy_ops(ops);
1286 ceph_osdc_cancel_event(dev->watch_event);
1287 dev->watch_event = NULL;
1291 struct rbd_notify_info {
1292 struct rbd_device *dev;
1295 static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1297 struct rbd_device *dev = (struct rbd_device *)data;
1301 dout("rbd_notify_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
1302 notify_id, (int)opcode);
1306 * Request sync osd notify
1308 static int rbd_req_sync_notify(struct rbd_device *dev,
1311 struct ceph_osd_req_op *ops;
1312 struct ceph_osd_client *osdc = &dev->rbd_client->client->osdc;
1313 struct ceph_osd_event *event;
1314 struct rbd_notify_info info;
1315 int payload_len = sizeof(u32) + sizeof(u32);
1318 ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY, payload_len);
1324 ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1325 (void *)&info, &event);
1329 ops[0].watch.ver = 1;
1330 ops[0].watch.flag = 1;
1331 ops[0].watch.cookie = event->cookie;
1332 ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1333 ops[0].watch.timeout = 12;
1335 ret = rbd_req_sync_op(dev, NULL,
1338 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1340 1, obj, 0, 0, NULL, NULL, NULL);
1344 ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1345 dout("ceph_osdc_wait_event returned %d\n", ret);
1346 rbd_destroy_ops(ops);
1350 ceph_osdc_cancel_event(event);
1352 rbd_destroy_ops(ops);
1357 * Request sync osd read
1359 static int rbd_req_sync_exec(struct rbd_device *dev,
1367 struct ceph_osd_req_op *ops;
1368 int cls_len = strlen(cls);
1369 int method_len = strlen(method);
1370 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_CALL,
1371 cls_len + method_len + len);
1375 ops[0].cls.class_name = cls;
1376 ops[0].cls.class_len = (__u8)cls_len;
1377 ops[0].cls.method_name = method;
1378 ops[0].cls.method_len = (__u8)method_len;
1379 ops[0].cls.argc = 0;
1380 ops[0].cls.indata = data;
1381 ops[0].cls.indata_len = len;
1383 ret = rbd_req_sync_op(dev, NULL,
1386 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1388 1, obj, 0, 0, NULL, NULL, ver);
1390 rbd_destroy_ops(ops);
1392 dout("cls_exec returned %d\n", ret);
1396 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1398 struct rbd_req_coll *coll =
1399 kzalloc(sizeof(struct rbd_req_coll) +
1400 sizeof(struct rbd_req_status) * num_reqs,
1405 coll->total = num_reqs;
1406 kref_init(&coll->kref);
1411 * block device queue callback
1413 static void rbd_rq_fn(struct request_queue *q)
1415 struct rbd_device *rbd_dev = q->queuedata;
1417 struct bio_pair *bp = NULL;
1419 rq = blk_fetch_request(q);
1423 struct bio *rq_bio, *next_bio = NULL;
1425 int size, op_size = 0;
1427 int num_segs, cur_seg = 0;
1428 struct rbd_req_coll *coll;
1430 /* peek at request from block layer */
1434 dout("fetched request\n");
1436 /* filter out block requests we don't understand */
1437 if ((rq->cmd_type != REQ_TYPE_FS)) {
1438 __blk_end_request_all(rq, 0);
1442 /* deduce our operation (read, write) */
1443 do_write = (rq_data_dir(rq) == WRITE);
1445 size = blk_rq_bytes(rq);
1446 ofs = blk_rq_pos(rq) * 512ULL;
1448 if (do_write && rbd_dev->read_only) {
1449 __blk_end_request_all(rq, -EROFS);
1453 spin_unlock_irq(q->queue_lock);
1455 dout("%s 0x%x bytes at 0x%llx\n",
1456 do_write ? "write" : "read",
1457 size, blk_rq_pos(rq) * 512ULL);
1459 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1460 coll = rbd_alloc_coll(num_segs);
1462 spin_lock_irq(q->queue_lock);
1463 __blk_end_request_all(rq, -ENOMEM);
1468 /* a bio clone to be passed down to OSD req */
1469 dout("rq->bio->bi_vcnt=%d\n", rq->bio->bi_vcnt);
1470 op_size = rbd_get_segment(&rbd_dev->header,
1471 rbd_dev->header.block_name,
1474 kref_get(&coll->kref);
1475 bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1476 op_size, GFP_ATOMIC);
1478 rbd_coll_end_req_index(rq, coll, cur_seg,
1484 /* init OSD command: write or read */
1486 rbd_req_write(rq, rbd_dev,
1487 rbd_dev->header.snapc,
1492 rbd_req_read(rq, rbd_dev,
1493 cur_snap_id(rbd_dev),
1505 kref_put(&coll->kref, rbd_coll_release);
1508 bio_pair_release(bp);
1509 spin_lock_irq(q->queue_lock);
1511 rq = blk_fetch_request(q);
1516 * a queue callback. Makes sure that we don't create a bio that spans across
1517 * multiple osd objects. One exception would be with a single page bios,
1518 * which we handle later at bio_chain_clone
1520 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1521 struct bio_vec *bvec)
1523 struct rbd_device *rbd_dev = q->queuedata;
1524 unsigned int chunk_sectors = 1 << (rbd_dev->header.obj_order - 9);
1525 sector_t sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1526 unsigned int bio_sectors = bmd->bi_size >> 9;
1529 max = (chunk_sectors - ((sector & (chunk_sectors - 1))
1530 + bio_sectors)) << 9;
1532 max = 0; /* bio_add cannot handle a negative return */
1533 if (max <= bvec->bv_len && bio_sectors == 0)
1534 return bvec->bv_len;
1538 static void rbd_free_disk(struct rbd_device *rbd_dev)
1540 struct gendisk *disk = rbd_dev->disk;
1545 rbd_header_free(&rbd_dev->header);
1547 if (disk->flags & GENHD_FL_UP)
1550 blk_cleanup_queue(disk->queue);
1555 * reload the ondisk the header
1557 static int rbd_read_header(struct rbd_device *rbd_dev,
1558 struct rbd_image_header *header)
1561 struct rbd_image_header_ondisk *dh;
1563 u64 snap_names_len = 0;
1567 int len = sizeof(*dh) +
1568 snap_count * sizeof(struct rbd_image_snap_ondisk) +
1572 dh = kmalloc(len, GFP_KERNEL);
1576 rc = rbd_req_sync_read(rbd_dev,
1578 rbd_dev->obj_md_name,
1584 rc = rbd_header_from_disk(header, dh, snap_count, GFP_KERNEL);
1587 pr_warning("unrecognized header format"
1588 " for image %s", rbd_dev->obj);
1593 if (snap_count != header->total_snaps) {
1594 snap_count = header->total_snaps;
1595 snap_names_len = header->snap_names_len;
1596 rbd_header_free(header);
1602 header->obj_version = ver;
1612 static int rbd_header_add_snap(struct rbd_device *dev,
1613 const char *snap_name,
1616 int name_len = strlen(snap_name);
1621 struct ceph_mon_client *monc;
1623 /* we should create a snapshot only if we're pointing at the head */
1627 monc = &dev->rbd_client->client->monc;
1628 ret = ceph_monc_create_snapid(monc, dev->poolid, &new_snapid);
1629 dout("created snapid=%lld\n", new_snapid);
1633 data = kmalloc(name_len + 16, gfp_flags);
1638 e = data + name_len + 16;
1640 ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
1641 ceph_encode_64_safe(&p, e, new_snapid, bad);
1643 ret = rbd_req_sync_exec(dev, dev->obj_md_name, "rbd", "snap_add",
1644 data, p - data, &ver);
1651 dev->header.snapc->seq = new_snapid;
1658 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1660 struct rbd_snap *snap;
1662 while (!list_empty(&rbd_dev->snaps)) {
1663 snap = list_first_entry(&rbd_dev->snaps, struct rbd_snap, node);
1664 __rbd_remove_snap_dev(rbd_dev, snap);
1669 * only read the first part of the ondisk header, without the snaps info
1671 static int __rbd_update_snaps(struct rbd_device *rbd_dev)
1674 struct rbd_image_header h;
1678 ret = rbd_read_header(rbd_dev, &h);
1683 set_capacity(rbd_dev->disk, h.image_size / 512ULL);
1685 down_write(&rbd_dev->header.snap_rwsem);
1687 snap_seq = rbd_dev->header.snapc->seq;
1688 if (rbd_dev->header.total_snaps &&
1689 rbd_dev->header.snapc->snaps[0] == snap_seq)
1690 /* pointing at the head, will need to follow that
1694 kfree(rbd_dev->header.snapc);
1695 kfree(rbd_dev->header.snap_names);
1696 kfree(rbd_dev->header.snap_sizes);
1698 rbd_dev->header.total_snaps = h.total_snaps;
1699 rbd_dev->header.snapc = h.snapc;
1700 rbd_dev->header.snap_names = h.snap_names;
1701 rbd_dev->header.snap_names_len = h.snap_names_len;
1702 rbd_dev->header.snap_sizes = h.snap_sizes;
1704 rbd_dev->header.snapc->seq = rbd_dev->header.snapc->snaps[0];
1706 rbd_dev->header.snapc->seq = snap_seq;
1708 ret = __rbd_init_snaps_header(rbd_dev);
1710 up_write(&rbd_dev->header.snap_rwsem);
1715 static int rbd_init_disk(struct rbd_device *rbd_dev)
1717 struct gendisk *disk;
1718 struct request_queue *q;
1722 /* contact OSD, request size info about the object being mapped */
1723 rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1727 /* no need to lock here, as rbd_dev is not registered yet */
1728 rc = __rbd_init_snaps_header(rbd_dev);
1732 rc = rbd_header_set_snap(rbd_dev, &total_size);
1736 /* create gendisk info */
1738 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1742 snprintf(disk->disk_name, sizeof(disk->disk_name), DRV_NAME "%d",
1744 disk->major = rbd_dev->major;
1745 disk->first_minor = 0;
1746 disk->fops = &rbd_bd_ops;
1747 disk->private_data = rbd_dev;
1751 q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1755 /* set io sizes to object size */
1756 blk_queue_max_hw_sectors(q, rbd_obj_bytes(&rbd_dev->header) / 512ULL);
1757 blk_queue_max_segment_size(q, rbd_obj_bytes(&rbd_dev->header));
1758 blk_queue_io_min(q, rbd_obj_bytes(&rbd_dev->header));
1759 blk_queue_io_opt(q, rbd_obj_bytes(&rbd_dev->header));
1761 blk_queue_merge_bvec(q, rbd_merge_bvec);
1764 q->queuedata = rbd_dev;
1766 rbd_dev->disk = disk;
1769 /* finally, announce the disk to the world */
1770 set_capacity(disk, total_size / 512ULL);
1773 pr_info("%s: added with size 0x%llx\n",
1774 disk->disk_name, (unsigned long long)total_size);
1787 static ssize_t rbd_size_show(struct device *dev,
1788 struct device_attribute *attr, char *buf)
1790 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1792 return sprintf(buf, "%llu\n", (unsigned long long)rbd_dev->header.image_size);
1795 static ssize_t rbd_major_show(struct device *dev,
1796 struct device_attribute *attr, char *buf)
1798 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1800 return sprintf(buf, "%d\n", rbd_dev->major);
1803 static ssize_t rbd_client_id_show(struct device *dev,
1804 struct device_attribute *attr, char *buf)
1806 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1808 return sprintf(buf, "client%lld\n",
1809 ceph_client_id(rbd_dev->rbd_client->client));
1812 static ssize_t rbd_pool_show(struct device *dev,
1813 struct device_attribute *attr, char *buf)
1815 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1817 return sprintf(buf, "%s\n", rbd_dev->pool_name);
1820 static ssize_t rbd_name_show(struct device *dev,
1821 struct device_attribute *attr, char *buf)
1823 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1825 return sprintf(buf, "%s\n", rbd_dev->obj);
1828 static ssize_t rbd_snap_show(struct device *dev,
1829 struct device_attribute *attr,
1832 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1834 return sprintf(buf, "%s\n", rbd_dev->snap_name);
1837 static ssize_t rbd_image_refresh(struct device *dev,
1838 struct device_attribute *attr,
1842 struct rbd_device *rbd_dev = dev_to_rbd(dev);
1846 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1848 rc = __rbd_update_snaps(rbd_dev);
1852 mutex_unlock(&ctl_mutex);
1856 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1857 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1858 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1859 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1860 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1861 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1862 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1863 static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
1865 static struct attribute *rbd_attrs[] = {
1866 &dev_attr_size.attr,
1867 &dev_attr_major.attr,
1868 &dev_attr_client_id.attr,
1869 &dev_attr_pool.attr,
1870 &dev_attr_name.attr,
1871 &dev_attr_current_snap.attr,
1872 &dev_attr_refresh.attr,
1873 &dev_attr_create_snap.attr,
1877 static struct attribute_group rbd_attr_group = {
1881 static const struct attribute_group *rbd_attr_groups[] = {
1886 static void rbd_sysfs_dev_release(struct device *dev)
1890 static struct device_type rbd_device_type = {
1892 .groups = rbd_attr_groups,
1893 .release = rbd_sysfs_dev_release,
1901 static ssize_t rbd_snap_size_show(struct device *dev,
1902 struct device_attribute *attr,
1905 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1907 return sprintf(buf, "%lld\n", (long long)snap->size);
1910 static ssize_t rbd_snap_id_show(struct device *dev,
1911 struct device_attribute *attr,
1914 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1916 return sprintf(buf, "%lld\n", (long long)snap->id);
1919 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
1920 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
1922 static struct attribute *rbd_snap_attrs[] = {
1923 &dev_attr_snap_size.attr,
1924 &dev_attr_snap_id.attr,
1928 static struct attribute_group rbd_snap_attr_group = {
1929 .attrs = rbd_snap_attrs,
1932 static void rbd_snap_dev_release(struct device *dev)
1934 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1939 static const struct attribute_group *rbd_snap_attr_groups[] = {
1940 &rbd_snap_attr_group,
1944 static struct device_type rbd_snap_device_type = {
1945 .groups = rbd_snap_attr_groups,
1946 .release = rbd_snap_dev_release,
1949 static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
1950 struct rbd_snap *snap)
1952 list_del(&snap->node);
1953 device_unregister(&snap->dev);
1956 static int rbd_register_snap_dev(struct rbd_device *rbd_dev,
1957 struct rbd_snap *snap,
1958 struct device *parent)
1960 struct device *dev = &snap->dev;
1963 dev->type = &rbd_snap_device_type;
1964 dev->parent = parent;
1965 dev->release = rbd_snap_dev_release;
1966 dev_set_name(dev, "snap_%s", snap->name);
1967 ret = device_register(dev);
1972 static int __rbd_add_snap_dev(struct rbd_device *rbd_dev,
1973 int i, const char *name,
1974 struct rbd_snap **snapp)
1977 struct rbd_snap *snap = kzalloc(sizeof(*snap), GFP_KERNEL);
1980 snap->name = kstrdup(name, GFP_KERNEL);
1981 snap->size = rbd_dev->header.snap_sizes[i];
1982 snap->id = rbd_dev->header.snapc->snaps[i];
1983 if (device_is_registered(&rbd_dev->dev)) {
1984 ret = rbd_register_snap_dev(rbd_dev, snap,
1998 * search for the previous snap in a null delimited string list
2000 const char *rbd_prev_snap_name(const char *name, const char *start)
2002 if (name < start + 2)
2015 * compare the old list of snapshots that we have to what's in the header
2016 * and update it accordingly. Note that the header holds the snapshots
2017 * in a reverse order (from newest to oldest) and we need to go from
2018 * older to new so that we don't get a duplicate snap name when
2019 * doing the process (e.g., removed snapshot and recreated a new
2020 * one with the same name.
2022 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
2024 const char *name, *first_name;
2025 int i = rbd_dev->header.total_snaps;
2026 struct rbd_snap *snap, *old_snap = NULL;
2028 struct list_head *p, *n;
2030 first_name = rbd_dev->header.snap_names;
2031 name = first_name + rbd_dev->header.snap_names_len;
2033 list_for_each_prev_safe(p, n, &rbd_dev->snaps) {
2036 old_snap = list_entry(p, struct rbd_snap, node);
2039 cur_id = rbd_dev->header.snapc->snaps[i - 1];
2041 if (!i || old_snap->id < cur_id) {
2042 /* old_snap->id was skipped, thus was removed */
2043 __rbd_remove_snap_dev(rbd_dev, old_snap);
2046 if (old_snap->id == cur_id) {
2047 /* we have this snapshot already */
2049 name = rbd_prev_snap_name(name, first_name);
2053 i--, name = rbd_prev_snap_name(name, first_name)) {
2058 cur_id = rbd_dev->header.snapc->snaps[i];
2059 /* snapshot removal? handle it above */
2060 if (cur_id >= old_snap->id)
2062 /* a new snapshot */
2063 ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2067 /* note that we add it backward so using n and not p */
2068 list_add(&snap->node, n);
2072 /* we're done going over the old snap list, just add what's left */
2073 for (; i > 0; i--) {
2074 name = rbd_prev_snap_name(name, first_name);
2079 ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2082 list_add(&snap->node, &rbd_dev->snaps);
2089 static void rbd_root_dev_release(struct device *dev)
2093 static struct device rbd_root_dev = {
2095 .release = rbd_root_dev_release,
2098 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2102 struct rbd_snap *snap;
2104 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2105 dev = &rbd_dev->dev;
2107 dev->bus = &rbd_bus_type;
2108 dev->type = &rbd_device_type;
2109 dev->parent = &rbd_root_dev;
2110 dev->release = rbd_dev_release;
2111 dev_set_name(dev, "%d", rbd_dev->id);
2112 ret = device_register(dev);
2116 list_for_each_entry(snap, &rbd_dev->snaps, node) {
2117 ret = rbd_register_snap_dev(rbd_dev, snap,
2123 mutex_unlock(&ctl_mutex);
2126 mutex_unlock(&ctl_mutex);
2130 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2132 device_unregister(&rbd_dev->dev);
2135 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2140 ret = rbd_req_sync_watch(rbd_dev, rbd_dev->obj_md_name,
2141 rbd_dev->header.obj_version);
2142 if (ret == -ERANGE) {
2143 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2144 rc = __rbd_update_snaps(rbd_dev);
2145 mutex_unlock(&ctl_mutex);
2149 } while (ret == -ERANGE);
2154 static atomic64_t rbd_id_max = ATOMIC64_INIT(0);
2157 * Get a unique rbd identifier. The minimum rbd id is 1.
2159 static int rbd_id_get(void)
2161 return atomic64_inc_return(&rbd_id_max);
2165 * Record that an rbd identifier is no longer in use.
2167 static void rbd_id_put(int rbd_id)
2172 * New id's are always one more than the current maximum.
2173 * If the id being "put" *is* that maximum, decrement the
2174 * maximum so the next one requested just reuses this one.
2176 atomic64_cmpxchg(&rbd_id_max, rbd_id, rbd_id - 1);
2179 static ssize_t rbd_add(struct bus_type *bus,
2183 struct ceph_osd_client *osdc;
2184 struct rbd_device *rbd_dev;
2185 ssize_t rc = -ENOMEM;
2190 if (!try_module_get(THIS_MODULE))
2193 mon_dev_name = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL);
2197 options = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL);
2201 /* new rbd_device object */
2202 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2206 /* static rbd_device initialization */
2207 spin_lock_init(&rbd_dev->lock);
2208 INIT_LIST_HEAD(&rbd_dev->node);
2209 INIT_LIST_HEAD(&rbd_dev->snaps);
2211 init_rwsem(&rbd_dev->header.snap_rwsem);
2213 /* generate unique id: one more than highest used so far */
2214 rbd_dev->id = rbd_id_get();
2216 /* add to global list */
2217 spin_lock(&rbd_dev_list_lock);
2218 list_add_tail(&rbd_dev->node, &rbd_dev_list);
2219 spin_unlock(&rbd_dev_list_lock);
2221 /* parse add command */
2222 if (sscanf(buf, "%" __stringify(RBD_MAX_OPT_LEN) "s "
2223 "%" __stringify(RBD_MAX_OPT_LEN) "s "
2224 "%" __stringify(RBD_MAX_POOL_NAME_LEN) "s "
2225 "%" __stringify(RBD_MAX_OBJ_NAME_LEN) "s"
2226 "%" __stringify(RBD_MAX_SNAP_NAME_LEN) "s",
2227 mon_dev_name, options, rbd_dev->pool_name,
2228 rbd_dev->obj, rbd_dev->snap_name) < 4) {
2233 if (rbd_dev->snap_name[0] == 0)
2234 memcpy(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
2235 sizeof (RBD_SNAP_HEAD_NAME));
2237 rbd_dev->obj_len = strlen(rbd_dev->obj);
2238 snprintf(rbd_dev->obj_md_name, sizeof(rbd_dev->obj_md_name), "%s%s",
2239 rbd_dev->obj, RBD_SUFFIX);
2241 /* initialize rest of new object */
2242 snprintf(rbd_dev->name, DEV_NAME_LEN, DRV_NAME "%d", rbd_dev->id);
2244 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2245 rc = rbd_get_client(rbd_dev, mon_dev_name, options);
2246 mutex_unlock(&ctl_mutex);
2252 osdc = &rbd_dev->rbd_client->client->osdc;
2253 rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2255 goto err_out_client;
2256 rbd_dev->poolid = rc;
2258 /* register our block device */
2259 irc = register_blkdev(0, rbd_dev->name);
2262 goto err_out_client;
2264 rbd_dev->major = irc;
2266 rc = rbd_bus_add_dev(rbd_dev);
2268 goto err_out_blkdev;
2270 /* set up and announce blkdev mapping */
2271 rc = rbd_init_disk(rbd_dev);
2275 rc = rbd_init_watch_dev(rbd_dev);
2282 spin_lock(&rbd_dev_list_lock);
2283 list_del_init(&rbd_dev->node);
2284 spin_unlock(&rbd_dev_list_lock);
2285 rbd_id_put(target_id);
2287 /* this will also clean up rest of rbd_dev stuff */
2289 rbd_bus_del_dev(rbd_dev);
2291 kfree(mon_dev_name);
2295 unregister_blkdev(rbd_dev->major, rbd_dev->name);
2297 rbd_put_client(rbd_dev);
2299 spin_lock(&rbd_dev_list_lock);
2300 list_del_init(&rbd_dev->node);
2301 spin_unlock(&rbd_dev_list_lock);
2302 rbd_id_put(target_id);
2308 kfree(mon_dev_name);
2310 dout("Error adding device %s\n", buf);
2311 module_put(THIS_MODULE);
2315 static struct rbd_device *__rbd_get_dev(unsigned long id)
2317 struct list_head *tmp;
2318 struct rbd_device *rbd_dev;
2320 spin_lock(&rbd_dev_list_lock);
2321 list_for_each(tmp, &rbd_dev_list) {
2322 rbd_dev = list_entry(tmp, struct rbd_device, node);
2323 if (rbd_dev->id == id) {
2324 spin_unlock(&rbd_dev_list_lock);
2328 spin_unlock(&rbd_dev_list_lock);
2332 static void rbd_dev_release(struct device *dev)
2334 struct rbd_device *rbd_dev =
2335 container_of(dev, struct rbd_device, dev);
2337 if (rbd_dev->watch_request) {
2338 struct ceph_client *client = rbd_dev->rbd_client->client;
2340 ceph_osdc_unregister_linger_request(&client->osdc,
2341 rbd_dev->watch_request);
2343 if (rbd_dev->watch_event)
2344 rbd_req_sync_unwatch(rbd_dev, rbd_dev->obj_md_name);
2346 rbd_put_client(rbd_dev);
2348 /* clean up and free blkdev */
2349 rbd_free_disk(rbd_dev);
2350 unregister_blkdev(rbd_dev->major, rbd_dev->name);
2353 /* release module ref */
2354 module_put(THIS_MODULE);
2357 static ssize_t rbd_remove(struct bus_type *bus,
2361 struct rbd_device *rbd_dev = NULL;
2366 rc = strict_strtoul(buf, 10, &ul);
2370 /* convert to int; abort if we lost anything in the conversion */
2371 target_id = (int) ul;
2372 if (target_id != ul)
2375 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2377 rbd_dev = __rbd_get_dev(target_id);
2383 spin_lock(&rbd_dev_list_lock);
2384 list_del_init(&rbd_dev->node);
2385 spin_unlock(&rbd_dev_list_lock);
2387 rbd_id_put(target_id);
2389 __rbd_remove_all_snaps(rbd_dev);
2390 rbd_bus_del_dev(rbd_dev);
2393 mutex_unlock(&ctl_mutex);
2397 static ssize_t rbd_snap_add(struct device *dev,
2398 struct device_attribute *attr,
2402 struct rbd_device *rbd_dev = dev_to_rbd(dev);
2404 char *name = kmalloc(count + 1, GFP_KERNEL);
2408 snprintf(name, count, "%s", buf);
2410 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2412 ret = rbd_header_add_snap(rbd_dev,
2417 ret = __rbd_update_snaps(rbd_dev);
2421 /* shouldn't hold ctl_mutex when notifying.. notify might
2422 trigger a watch callback that would need to get that mutex */
2423 mutex_unlock(&ctl_mutex);
2425 /* make a best effort, don't error if failed */
2426 rbd_req_sync_notify(rbd_dev, rbd_dev->obj_md_name);
2433 mutex_unlock(&ctl_mutex);
2438 static struct bus_attribute rbd_bus_attrs[] = {
2439 __ATTR(add, S_IWUSR, NULL, rbd_add),
2440 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
2445 * create control files in sysfs
2448 static int rbd_sysfs_init(void)
2452 rbd_bus_type.bus_attrs = rbd_bus_attrs;
2454 ret = bus_register(&rbd_bus_type);
2458 ret = device_register(&rbd_root_dev);
2463 static void rbd_sysfs_cleanup(void)
2465 device_unregister(&rbd_root_dev);
2466 bus_unregister(&rbd_bus_type);
2469 int __init rbd_init(void)
2473 rc = rbd_sysfs_init();
2476 pr_info("loaded " DRV_NAME_LONG "\n");
2480 void __exit rbd_exit(void)
2482 rbd_sysfs_cleanup();
2485 module_init(rbd_init);
2486 module_exit(rbd_exit);
2488 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2489 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2490 MODULE_DESCRIPTION("rados block device");
2492 /* following authorship retained from original osdblk.c */
2493 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2495 MODULE_LICENSE("GPL");