2 rbd.c -- Export ceph rados objects as a Linux block device
5 based on drivers/block/osdblk.c:
7 Copyright 2009 Red Hat, Inc.
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
24 For usage instructions, please refer to:
26 Documentation/ABI/testing/sysfs-bus-rbd
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
40 #include <linux/blkdev.h>
42 #include "rbd_types.h"
45 * The basic unit of block I/O is a sector. It is interpreted in a
46 * number of contexts in Linux (blk, bio, genhd), but the default is
47 * universally 512 bytes. These symbols are just slightly more
48 * meaningful than the bare numbers they represent.
50 #define SECTOR_SHIFT 9
51 #define SECTOR_SIZE (1ULL << SECTOR_SHIFT)
53 #define RBD_DRV_NAME "rbd"
54 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
56 #define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
58 #define RBD_MAX_SNAP_NAME_LEN 32
59 #define RBD_MAX_OPT_LEN 1024
61 #define RBD_SNAP_HEAD_NAME "-"
64 * An RBD device name will be "rbd#", where the "rbd" comes from
65 * RBD_DRV_NAME above, and # is a unique integer identifier.
66 * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
67 * enough to hold all possible device names.
69 #define DEV_NAME_LEN 32
70 #define MAX_INT_FORMAT_WIDTH ((5 * sizeof (int)) / 2 + 1)
72 #define RBD_NOTIFY_TIMEOUT_DEFAULT 10
75 * block device image metadata (in-memory version)
77 struct rbd_image_header {
83 struct ceph_snap_context *snapc;
98 * an instance of the client. multiple devices may share an rbd client.
101 struct ceph_client *client;
102 struct rbd_options *rbd_opts;
104 struct list_head node;
108 * a request completion status
110 struct rbd_req_status {
117 * a collection of requests
119 struct rbd_req_coll {
123 struct rbd_req_status status[0];
127 * a single io request
130 struct request *rq; /* blk layer request */
131 struct bio *bio; /* cloned bio */
132 struct page **pages; /* list of used pages */
135 struct rbd_req_coll *coll;
142 struct list_head node;
150 int dev_id; /* blkdev unique id */
152 int major; /* blkdev assigned major */
153 struct gendisk *disk; /* blkdev's gendisk and rq */
154 struct request_queue *q;
156 struct rbd_client *rbd_client;
158 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
160 spinlock_t lock; /* queue lock */
162 struct rbd_image_header header;
164 size_t image_name_len;
169 struct ceph_osd_event *watch_event;
170 struct ceph_osd_request *watch_request;
172 /* protects updating the header */
173 struct rw_semaphore header_rwsem;
174 /* name of the snapshot this device reads from */
176 /* id of the snapshot this device reads from */
177 u64 snap_id; /* current snapshot id */
178 /* whether the snap_id this device reads from still exists */
182 struct list_head node;
184 /* list of snapshots */
185 struct list_head snaps;
191 static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
193 static LIST_HEAD(rbd_dev_list); /* devices */
194 static DEFINE_SPINLOCK(rbd_dev_list_lock);
196 static LIST_HEAD(rbd_client_list); /* clients */
197 static DEFINE_SPINLOCK(rbd_client_list_lock);
199 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
200 static void rbd_dev_release(struct device *dev);
201 static ssize_t rbd_snap_add(struct device *dev,
202 struct device_attribute *attr,
205 static void __rbd_remove_snap_dev(struct rbd_snap *snap);
207 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
209 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
212 static struct bus_attribute rbd_bus_attrs[] = {
213 __ATTR(add, S_IWUSR, NULL, rbd_add),
214 __ATTR(remove, S_IWUSR, NULL, rbd_remove),
218 static struct bus_type rbd_bus_type = {
220 .bus_attrs = rbd_bus_attrs,
223 static void rbd_root_dev_release(struct device *dev)
227 static struct device rbd_root_dev = {
229 .release = rbd_root_dev_release,
233 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
235 return get_device(&rbd_dev->dev);
238 static void rbd_put_dev(struct rbd_device *rbd_dev)
240 put_device(&rbd_dev->dev);
243 static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver);
245 static int rbd_open(struct block_device *bdev, fmode_t mode)
247 struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
249 if ((mode & FMODE_WRITE) && rbd_dev->read_only)
252 rbd_get_dev(rbd_dev);
253 set_device_ro(bdev, rbd_dev->read_only);
258 static int rbd_release(struct gendisk *disk, fmode_t mode)
260 struct rbd_device *rbd_dev = disk->private_data;
262 rbd_put_dev(rbd_dev);
267 static const struct block_device_operations rbd_bd_ops = {
268 .owner = THIS_MODULE,
270 .release = rbd_release,
274 * Initialize an rbd client instance.
277 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts,
278 struct rbd_options *rbd_opts)
280 struct rbd_client *rbdc;
283 dout("rbd_client_create\n");
284 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
288 kref_init(&rbdc->kref);
289 INIT_LIST_HEAD(&rbdc->node);
291 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
293 rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
294 if (IS_ERR(rbdc->client))
296 ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
298 ret = ceph_open_session(rbdc->client);
302 rbdc->rbd_opts = rbd_opts;
304 spin_lock(&rbd_client_list_lock);
305 list_add_tail(&rbdc->node, &rbd_client_list);
306 spin_unlock(&rbd_client_list_lock);
308 mutex_unlock(&ctl_mutex);
310 dout("rbd_client_create created %p\n", rbdc);
314 ceph_destroy_client(rbdc->client);
316 mutex_unlock(&ctl_mutex);
320 ceph_destroy_options(ceph_opts);
325 * Find a ceph client with specific addr and configuration.
327 static struct rbd_client *__rbd_client_find(struct ceph_options *ceph_opts)
329 struct rbd_client *client_node;
331 if (ceph_opts->flags & CEPH_OPT_NOSHARE)
334 list_for_each_entry(client_node, &rbd_client_list, node)
335 if (!ceph_compare_options(ceph_opts, client_node->client))
348 /* string args above */
351 static match_table_t rbd_opts_tokens = {
352 {Opt_notify_timeout, "notify_timeout=%d"},
354 /* string args above */
358 static int parse_rbd_opts_token(char *c, void *private)
360 struct rbd_options *rbd_opts = private;
361 substring_t argstr[MAX_OPT_ARGS];
362 int token, intval, ret;
364 token = match_token(c, rbd_opts_tokens, argstr);
368 if (token < Opt_last_int) {
369 ret = match_int(&argstr[0], &intval);
371 pr_err("bad mount option arg (not int) "
375 dout("got int token %d val %d\n", token, intval);
376 } else if (token > Opt_last_int && token < Opt_last_string) {
377 dout("got string token %d val %s\n", token,
380 dout("got token %d\n", token);
384 case Opt_notify_timeout:
385 rbd_opts->notify_timeout = intval;
394 * Get a ceph client with specific addr and configuration, if one does
395 * not exist create it.
397 static struct rbd_client *rbd_get_client(const char *mon_addr,
401 struct rbd_client *rbdc;
402 struct ceph_options *ceph_opts;
403 struct rbd_options *rbd_opts;
405 rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
407 return ERR_PTR(-ENOMEM);
409 rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
411 ceph_opts = ceph_parse_options(options, mon_addr,
412 mon_addr + mon_addr_len,
413 parse_rbd_opts_token, rbd_opts);
414 if (IS_ERR(ceph_opts)) {
416 return ERR_CAST(ceph_opts);
419 spin_lock(&rbd_client_list_lock);
420 rbdc = __rbd_client_find(ceph_opts);
422 /* using an existing client */
423 kref_get(&rbdc->kref);
424 spin_unlock(&rbd_client_list_lock);
426 ceph_destroy_options(ceph_opts);
431 spin_unlock(&rbd_client_list_lock);
433 rbdc = rbd_client_create(ceph_opts, rbd_opts);
442 * Destroy ceph client
444 * Caller must hold rbd_client_list_lock.
446 static void rbd_client_release(struct kref *kref)
448 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
450 dout("rbd_release_client %p\n", rbdc);
451 spin_lock(&rbd_client_list_lock);
452 list_del(&rbdc->node);
453 spin_unlock(&rbd_client_list_lock);
455 ceph_destroy_client(rbdc->client);
456 kfree(rbdc->rbd_opts);
461 * Drop reference to ceph client node. If it's not referenced anymore, release
464 static void rbd_put_client(struct rbd_device *rbd_dev)
466 kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
467 rbd_dev->rbd_client = NULL;
471 * Destroy requests collection
473 static void rbd_coll_release(struct kref *kref)
475 struct rbd_req_coll *coll =
476 container_of(kref, struct rbd_req_coll, kref);
478 dout("rbd_coll_release %p\n", coll);
482 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
484 return !memcmp(&ondisk->text,
485 RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT));
489 * Create a new header structure, translate header format from the on-disk
492 static int rbd_header_from_disk(struct rbd_image_header *header,
493 struct rbd_image_header_ondisk *ondisk,
499 if (!rbd_dev_ondisk_valid(ondisk))
502 snap_count = le32_to_cpu(ondisk->snap_count);
504 /* Make sure we don't overflow below */
505 size = SIZE_MAX - sizeof (struct ceph_snap_context);
506 if (snap_count > size / sizeof (header->snapc->snaps[0]))
509 size = sizeof (struct ceph_snap_context);
510 size += snap_count * sizeof (header->snapc->snaps[0]);
511 header->snapc = kmalloc(size, GFP_KERNEL);
516 header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
517 BUG_ON(header->snap_names_len > (u64) SIZE_MAX);
518 header->snap_names = kmalloc(header->snap_names_len,
520 if (!header->snap_names)
522 size = snap_count * sizeof (*header->snap_sizes);
523 header->snap_sizes = kmalloc(size, GFP_KERNEL);
524 if (!header->snap_sizes)
527 WARN_ON(ondisk->snap_names_len);
528 header->snap_names_len = 0;
529 header->snap_names = NULL;
530 header->snap_sizes = NULL;
533 size = sizeof (ondisk->block_name) + 1;
534 header->object_prefix = kmalloc(size, GFP_KERNEL);
535 if (!header->object_prefix)
537 memcpy(header->object_prefix, ondisk->block_name, size - 1);
538 header->object_prefix[size - 1] = '\0';
540 header->image_size = le64_to_cpu(ondisk->image_size);
541 header->obj_order = ondisk->options.order;
542 header->crypt_type = ondisk->options.crypt_type;
543 header->comp_type = ondisk->options.comp_type;
545 atomic_set(&header->snapc->nref, 1);
546 header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
547 header->snapc->num_snaps = snap_count;
548 header->total_snaps = snap_count;
550 if (snap_count && allocated_snaps == snap_count) {
553 for (i = 0; i < snap_count; i++) {
554 header->snapc->snaps[i] =
555 le64_to_cpu(ondisk->snaps[i].id);
556 header->snap_sizes[i] =
557 le64_to_cpu(ondisk->snaps[i].image_size);
560 /* copy snapshot names */
561 memcpy(header->snap_names, &ondisk->snaps[snap_count],
562 header->snap_names_len);
568 kfree(header->snap_sizes);
569 header->snap_sizes = NULL;
571 kfree(header->snap_names);
572 header->snap_names = NULL;
573 header->snap_names_len = 0;
575 kfree(header->snapc);
576 header->snapc = NULL;
581 static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
585 char *p = header->snap_names;
587 for (i = 0; i < header->total_snaps; i++) {
588 if (!strcmp(snap_name, p)) {
590 /* Found it. Pass back its id and/or size */
593 *seq = header->snapc->snaps[i];
595 *size = header->snap_sizes[i];
598 p += strlen(p) + 1; /* Skip ahead to the next name */
603 static int rbd_header_set_snap(struct rbd_device *rbd_dev, u64 *size)
607 down_write(&rbd_dev->header_rwsem);
609 if (!memcmp(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
610 sizeof (RBD_SNAP_HEAD_NAME))) {
611 rbd_dev->snap_id = CEPH_NOSNAP;
612 rbd_dev->snap_exists = false;
613 rbd_dev->read_only = 0;
615 *size = rbd_dev->header.image_size;
619 ret = snap_by_name(&rbd_dev->header, rbd_dev->snap_name,
623 rbd_dev->snap_id = snap_id;
624 rbd_dev->snap_exists = true;
625 rbd_dev->read_only = 1;
630 up_write(&rbd_dev->header_rwsem);
634 static void rbd_header_free(struct rbd_image_header *header)
636 kfree(header->object_prefix);
637 header->object_prefix = NULL;
638 kfree(header->snap_sizes);
639 header->snap_sizes = NULL;
640 kfree(header->snap_names);
641 header->snap_names = NULL;
642 header->snap_names_len = 0;
643 ceph_put_snap_context(header->snapc);
644 header->snapc = NULL;
648 * get the actual striped segment name, offset and length
650 static u64 rbd_get_segment(struct rbd_image_header *header,
651 const char *object_prefix,
653 char *seg_name, u64 *segofs)
655 u64 seg = ofs >> header->obj_order;
658 snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
659 "%s.%012llx", object_prefix, seg);
661 ofs = ofs & ((1 << header->obj_order) - 1);
662 len = min_t(u64, len, (1 << header->obj_order) - ofs);
670 static int rbd_get_num_segments(struct rbd_image_header *header,
673 u64 start_seg = ofs >> header->obj_order;
674 u64 end_seg = (ofs + len - 1) >> header->obj_order;
675 return end_seg - start_seg + 1;
679 * returns the size of an object in the image
681 static u64 rbd_obj_bytes(struct rbd_image_header *header)
683 return 1 << header->obj_order;
690 static void bio_chain_put(struct bio *chain)
696 chain = chain->bi_next;
702 * zeros a bio chain, starting at specific offset
704 static void zero_bio_chain(struct bio *chain, int start_ofs)
713 bio_for_each_segment(bv, chain, i) {
714 if (pos + bv->bv_len > start_ofs) {
715 int remainder = max(start_ofs - pos, 0);
716 buf = bvec_kmap_irq(bv, &flags);
717 memset(buf + remainder, 0,
718 bv->bv_len - remainder);
719 bvec_kunmap_irq(buf, &flags);
724 chain = chain->bi_next;
729 * bio_chain_clone - clone a chain of bios up to a certain length.
730 * might return a bio_pair that will need to be released.
732 static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
733 struct bio_pair **bp,
734 int len, gfp_t gfpmask)
736 struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
740 bio_pair_release(*bp);
744 while (old_chain && (total < len)) {
745 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
749 if (total + old_chain->bi_size > len) {
753 * this split can only happen with a single paged bio,
754 * split_bio will BUG_ON if this is not the case
756 dout("bio_chain_clone split! total=%d remaining=%d"
758 total, len - total, old_chain->bi_size);
760 /* split the bio. We'll release it either in the next
761 call, or it will have to be released outside */
762 bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
766 __bio_clone(tmp, &bp->bio1);
770 __bio_clone(tmp, old_chain);
771 *next = old_chain->bi_next;
775 gfpmask &= ~__GFP_WAIT;
779 new_chain = tail = tmp;
784 old_chain = old_chain->bi_next;
786 total += tmp->bi_size;
792 tail->bi_next = NULL;
799 dout("bio_chain_clone with err\n");
800 bio_chain_put(new_chain);
805 * helpers for osd request op vectors.
807 static struct ceph_osd_req_op *rbd_create_rw_ops(int num_ops,
808 int opcode, u32 payload_len)
810 struct ceph_osd_req_op *ops;
812 ops = kzalloc(sizeof (*ops) * (num_ops + 1), GFP_NOIO);
819 * op extent offset and length will be set later on
820 * in calc_raw_layout()
822 ops[0].payload_len = payload_len;
827 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
832 static void rbd_coll_end_req_index(struct request *rq,
833 struct rbd_req_coll *coll,
837 struct request_queue *q;
840 dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
841 coll, index, ret, (unsigned long long) len);
847 blk_end_request(rq, ret, len);
853 spin_lock_irq(q->queue_lock);
854 coll->status[index].done = 1;
855 coll->status[index].rc = ret;
856 coll->status[index].bytes = len;
857 max = min = coll->num_done;
858 while (max < coll->total && coll->status[max].done)
861 for (i = min; i<max; i++) {
862 __blk_end_request(rq, coll->status[i].rc,
863 coll->status[i].bytes);
865 kref_put(&coll->kref, rbd_coll_release);
867 spin_unlock_irq(q->queue_lock);
870 static void rbd_coll_end_req(struct rbd_request *req,
873 rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
877 * Send ceph osd request
879 static int rbd_do_request(struct request *rq,
880 struct rbd_device *rbd_dev,
881 struct ceph_snap_context *snapc,
883 const char *object_name, u64 ofs, u64 len,
888 struct ceph_osd_req_op *ops,
889 struct rbd_req_coll *coll,
891 void (*rbd_cb)(struct ceph_osd_request *req,
892 struct ceph_msg *msg),
893 struct ceph_osd_request **linger_req,
896 struct ceph_osd_request *req;
897 struct ceph_file_layout *layout;
900 struct timespec mtime = CURRENT_TIME;
901 struct rbd_request *req_data;
902 struct ceph_osd_request_head *reqhead;
903 struct ceph_osd_client *osdc;
905 req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
908 rbd_coll_end_req_index(rq, coll, coll_index,
914 req_data->coll = coll;
915 req_data->coll_index = coll_index;
918 dout("rbd_do_request object_name=%s ofs=%llu len=%llu\n", object_name,
919 (unsigned long long) ofs, (unsigned long long) len);
921 osdc = &rbd_dev->rbd_client->client->osdc;
922 req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
923 false, GFP_NOIO, pages, bio);
929 req->r_callback = rbd_cb;
933 req_data->pages = pages;
936 req->r_priv = req_data;
938 reqhead = req->r_request->front.iov_base;
939 reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
941 strncpy(req->r_oid, object_name, sizeof(req->r_oid));
942 req->r_oid_len = strlen(req->r_oid);
944 layout = &req->r_file_layout;
945 memset(layout, 0, sizeof(*layout));
946 layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
947 layout->fl_stripe_count = cpu_to_le32(1);
948 layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
949 layout->fl_pg_pool = cpu_to_le32(rbd_dev->pool_id);
950 ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
953 ceph_osdc_build_request(req, ofs, &len,
957 req->r_oid, req->r_oid_len);
960 ceph_osdc_set_request_linger(osdc, req);
964 ret = ceph_osdc_start_request(osdc, req, false);
969 ret = ceph_osdc_wait_request(osdc, req);
971 *ver = le64_to_cpu(req->r_reassert_version.version);
972 dout("reassert_ver=%llu\n",
974 le64_to_cpu(req->r_reassert_version.version));
975 ceph_osdc_put_request(req);
980 bio_chain_put(req_data->bio);
981 ceph_osdc_put_request(req);
983 rbd_coll_end_req(req_data, ret, len);
989 * Ceph osd op callback
991 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
993 struct rbd_request *req_data = req->r_priv;
994 struct ceph_osd_reply_head *replyhead;
995 struct ceph_osd_op *op;
1001 replyhead = msg->front.iov_base;
1002 WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
1003 op = (void *)(replyhead + 1);
1004 rc = le32_to_cpu(replyhead->result);
1005 bytes = le64_to_cpu(op->extent.length);
1006 read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
1008 dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
1009 (unsigned long long) bytes, read_op, (int) rc);
1011 if (rc == -ENOENT && read_op) {
1012 zero_bio_chain(req_data->bio, 0);
1014 } else if (rc == 0 && read_op && bytes < req_data->len) {
1015 zero_bio_chain(req_data->bio, bytes);
1016 bytes = req_data->len;
1019 rbd_coll_end_req(req_data, rc, bytes);
1022 bio_chain_put(req_data->bio);
1024 ceph_osdc_put_request(req);
1028 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1030 ceph_osdc_put_request(req);
1034 * Do a synchronous ceph osd operation
1036 static int rbd_req_sync_op(struct rbd_device *rbd_dev,
1037 struct ceph_snap_context *snapc,
1040 struct ceph_osd_req_op *ops,
1041 const char *object_name,
1044 struct ceph_osd_request **linger_req,
1048 struct page **pages;
1051 BUG_ON(ops == NULL);
1053 num_pages = calc_pages_for(ofs , len);
1054 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1056 return PTR_ERR(pages);
1058 ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
1059 object_name, ofs, len, NULL,
1069 if ((flags & CEPH_OSD_FLAG_READ) && buf)
1070 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1073 ceph_release_page_vector(pages, num_pages);
1078 * Do an asynchronous ceph osd operation
1080 static int rbd_do_op(struct request *rq,
1081 struct rbd_device *rbd_dev,
1082 struct ceph_snap_context *snapc,
1084 int opcode, int flags,
1087 struct rbd_req_coll *coll,
1094 struct ceph_osd_req_op *ops;
1097 seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1101 seg_len = rbd_get_segment(&rbd_dev->header,
1102 rbd_dev->header.object_prefix,
1104 seg_name, &seg_ofs);
1106 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1109 ops = rbd_create_rw_ops(1, opcode, payload_len);
1113 /* we've taken care of segment sizes earlier when we
1114 cloned the bios. We should never have a segment
1115 truncated at this point */
1116 BUG_ON(seg_len < len);
1118 ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1119 seg_name, seg_ofs, seg_len,
1125 rbd_req_cb, 0, NULL);
1127 rbd_destroy_ops(ops);
1134 * Request async osd write
1136 static int rbd_req_write(struct request *rq,
1137 struct rbd_device *rbd_dev,
1138 struct ceph_snap_context *snapc,
1141 struct rbd_req_coll *coll,
1144 return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1146 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1147 ofs, len, bio, coll, coll_index);
1151 * Request async osd read
1153 static int rbd_req_read(struct request *rq,
1154 struct rbd_device *rbd_dev,
1158 struct rbd_req_coll *coll,
1161 return rbd_do_op(rq, rbd_dev, NULL,
1165 ofs, len, bio, coll, coll_index);
1169 * Request sync osd read
1171 static int rbd_req_sync_read(struct rbd_device *rbd_dev,
1173 const char *object_name,
1178 struct ceph_osd_req_op *ops;
1181 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_READ, 0);
1185 ret = rbd_req_sync_op(rbd_dev, NULL,
1188 ops, object_name, ofs, len, buf, NULL, ver);
1189 rbd_destroy_ops(ops);
1195 * Request sync osd watch
1197 static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
1201 struct ceph_osd_req_op *ops;
1204 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1208 ops[0].watch.ver = cpu_to_le64(ver);
1209 ops[0].watch.cookie = notify_id;
1210 ops[0].watch.flag = 0;
1212 ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
1213 rbd_dev->header_name, 0, 0, NULL,
1218 rbd_simple_req_cb, 0, NULL);
1220 rbd_destroy_ops(ops);
1224 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1226 struct rbd_device *rbd_dev = (struct rbd_device *)data;
1233 dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1234 rbd_dev->header_name, (unsigned long long) notify_id,
1235 (unsigned int) opcode);
1236 rc = rbd_refresh_header(rbd_dev, &hver);
1238 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1239 " update snaps: %d\n", rbd_dev->major, rc);
1241 rbd_req_sync_notify_ack(rbd_dev, hver, notify_id);
1245 * Request sync osd watch
1247 static int rbd_req_sync_watch(struct rbd_device *rbd_dev)
1249 struct ceph_osd_req_op *ops;
1250 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1253 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1257 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1258 (void *)rbd_dev, &rbd_dev->watch_event);
1262 ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version);
1263 ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1264 ops[0].watch.flag = 1;
1266 ret = rbd_req_sync_op(rbd_dev, NULL,
1268 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1270 rbd_dev->header_name,
1272 &rbd_dev->watch_request, NULL);
1277 rbd_destroy_ops(ops);
1281 ceph_osdc_cancel_event(rbd_dev->watch_event);
1282 rbd_dev->watch_event = NULL;
1284 rbd_destroy_ops(ops);
1289 * Request sync osd unwatch
1291 static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev)
1293 struct ceph_osd_req_op *ops;
1296 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1300 ops[0].watch.ver = 0;
1301 ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1302 ops[0].watch.flag = 0;
1304 ret = rbd_req_sync_op(rbd_dev, NULL,
1306 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1308 rbd_dev->header_name,
1309 0, 0, NULL, NULL, NULL);
1312 rbd_destroy_ops(ops);
1313 ceph_osdc_cancel_event(rbd_dev->watch_event);
1314 rbd_dev->watch_event = NULL;
1318 struct rbd_notify_info {
1319 struct rbd_device *rbd_dev;
1322 static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1324 struct rbd_device *rbd_dev = (struct rbd_device *)data;
1328 dout("rbd_notify_cb %s notify_id=%llu opcode=%u\n",
1329 rbd_dev->header_name, (unsigned long long) notify_id,
1330 (unsigned int) opcode);
1334 * Request sync osd notify
1336 static int rbd_req_sync_notify(struct rbd_device *rbd_dev)
1338 struct ceph_osd_req_op *ops;
1339 struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1340 struct ceph_osd_event *event;
1341 struct rbd_notify_info info;
1342 int payload_len = sizeof(u32) + sizeof(u32);
1345 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY, payload_len);
1349 info.rbd_dev = rbd_dev;
1351 ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1352 (void *)&info, &event);
1356 ops[0].watch.ver = 1;
1357 ops[0].watch.flag = 1;
1358 ops[0].watch.cookie = event->cookie;
1359 ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1360 ops[0].watch.timeout = 12;
1362 ret = rbd_req_sync_op(rbd_dev, NULL,
1364 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1366 rbd_dev->header_name,
1367 0, 0, NULL, NULL, NULL);
1371 ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1372 dout("ceph_osdc_wait_event returned %d\n", ret);
1373 rbd_destroy_ops(ops);
1377 ceph_osdc_cancel_event(event);
1379 rbd_destroy_ops(ops);
1384 * Request sync osd read
1386 static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
1387 const char *object_name,
1388 const char *class_name,
1389 const char *method_name,
1394 struct ceph_osd_req_op *ops;
1395 int class_name_len = strlen(class_name);
1396 int method_name_len = strlen(method_name);
1399 ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL,
1400 class_name_len + method_name_len + len);
1404 ops[0].cls.class_name = class_name;
1405 ops[0].cls.class_len = (__u8) class_name_len;
1406 ops[0].cls.method_name = method_name;
1407 ops[0].cls.method_len = (__u8) method_name_len;
1408 ops[0].cls.argc = 0;
1409 ops[0].cls.indata = data;
1410 ops[0].cls.indata_len = len;
1412 ret = rbd_req_sync_op(rbd_dev, NULL,
1414 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1416 object_name, 0, 0, NULL, NULL, ver);
1418 rbd_destroy_ops(ops);
1420 dout("cls_exec returned %d\n", ret);
1424 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1426 struct rbd_req_coll *coll =
1427 kzalloc(sizeof(struct rbd_req_coll) +
1428 sizeof(struct rbd_req_status) * num_reqs,
1433 coll->total = num_reqs;
1434 kref_init(&coll->kref);
1439 * block device queue callback
1441 static void rbd_rq_fn(struct request_queue *q)
1443 struct rbd_device *rbd_dev = q->queuedata;
1445 struct bio_pair *bp = NULL;
1447 while ((rq = blk_fetch_request(q))) {
1449 struct bio *rq_bio, *next_bio = NULL;
1454 int num_segs, cur_seg = 0;
1455 struct rbd_req_coll *coll;
1456 struct ceph_snap_context *snapc;
1458 /* peek at request from block layer */
1462 dout("fetched request\n");
1464 /* filter out block requests we don't understand */
1465 if ((rq->cmd_type != REQ_TYPE_FS)) {
1466 __blk_end_request_all(rq, 0);
1470 /* deduce our operation (read, write) */
1471 do_write = (rq_data_dir(rq) == WRITE);
1473 size = blk_rq_bytes(rq);
1474 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1476 if (do_write && rbd_dev->read_only) {
1477 __blk_end_request_all(rq, -EROFS);
1481 spin_unlock_irq(q->queue_lock);
1483 down_read(&rbd_dev->header_rwsem);
1485 if (rbd_dev->snap_id != CEPH_NOSNAP && !rbd_dev->snap_exists) {
1486 up_read(&rbd_dev->header_rwsem);
1487 dout("request for non-existent snapshot");
1488 spin_lock_irq(q->queue_lock);
1489 __blk_end_request_all(rq, -ENXIO);
1493 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1495 up_read(&rbd_dev->header_rwsem);
1497 dout("%s 0x%x bytes at 0x%llx\n",
1498 do_write ? "write" : "read",
1499 size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
1501 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1502 coll = rbd_alloc_coll(num_segs);
1504 spin_lock_irq(q->queue_lock);
1505 __blk_end_request_all(rq, -ENOMEM);
1506 ceph_put_snap_context(snapc);
1511 /* a bio clone to be passed down to OSD req */
1512 dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
1513 op_size = rbd_get_segment(&rbd_dev->header,
1514 rbd_dev->header.object_prefix,
1517 kref_get(&coll->kref);
1518 bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1519 op_size, GFP_ATOMIC);
1521 rbd_coll_end_req_index(rq, coll, cur_seg,
1527 /* init OSD command: write or read */
1529 rbd_req_write(rq, rbd_dev,
1535 rbd_req_read(rq, rbd_dev,
1548 kref_put(&coll->kref, rbd_coll_release);
1551 bio_pair_release(bp);
1552 spin_lock_irq(q->queue_lock);
1554 ceph_put_snap_context(snapc);
1559 * a queue callback. Makes sure that we don't create a bio that spans across
1560 * multiple osd objects. One exception would be with a single page bios,
1561 * which we handle later at bio_chain_clone
1563 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1564 struct bio_vec *bvec)
1566 struct rbd_device *rbd_dev = q->queuedata;
1567 unsigned int chunk_sectors;
1569 unsigned int bio_sectors;
1572 chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1573 sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1574 bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
1576 max = (chunk_sectors - ((sector & (chunk_sectors - 1))
1577 + bio_sectors)) << SECTOR_SHIFT;
1579 max = 0; /* bio_add cannot handle a negative return */
1580 if (max <= bvec->bv_len && bio_sectors == 0)
1581 return bvec->bv_len;
1585 static void rbd_free_disk(struct rbd_device *rbd_dev)
1587 struct gendisk *disk = rbd_dev->disk;
1592 rbd_header_free(&rbd_dev->header);
1594 if (disk->flags & GENHD_FL_UP)
1597 blk_cleanup_queue(disk->queue);
1602 * reload the ondisk the header
1604 static int rbd_read_header(struct rbd_device *rbd_dev,
1605 struct rbd_image_header *header)
1608 struct rbd_image_header_ondisk *dh;
1614 * First reads the fixed-size header to determine the number
1615 * of snapshots, then re-reads it, along with all snapshot
1616 * records as well as their stored names.
1620 dh = kmalloc(len, GFP_KERNEL);
1624 rc = rbd_req_sync_read(rbd_dev,
1626 rbd_dev->header_name,
1632 rc = rbd_header_from_disk(header, dh, snap_count);
1635 pr_warning("unrecognized header format"
1637 rbd_dev->image_name);
1641 if (snap_count == header->total_snaps)
1644 snap_count = header->total_snaps;
1645 len = sizeof (*dh) +
1646 snap_count * sizeof(struct rbd_image_snap_ondisk) +
1647 header->snap_names_len;
1649 rbd_header_free(header);
1652 header->obj_version = ver;
1662 static int rbd_header_add_snap(struct rbd_device *rbd_dev,
1663 const char *snap_name,
1666 int name_len = strlen(snap_name);
1670 struct ceph_mon_client *monc;
1672 /* we should create a snapshot only if we're pointing at the head */
1673 if (rbd_dev->snap_id != CEPH_NOSNAP)
1676 monc = &rbd_dev->rbd_client->client->monc;
1677 ret = ceph_monc_create_snapid(monc, rbd_dev->pool_id, &new_snapid);
1678 dout("created snapid=%llu\n", (unsigned long long) new_snapid);
1682 data = kmalloc(name_len + 16, gfp_flags);
1687 e = data + name_len + 16;
1689 ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
1690 ceph_encode_64_safe(&p, e, new_snapid, bad);
1692 ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
1694 data, p - data, NULL);
1698 return ret < 0 ? ret : 0;
1703 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1705 struct rbd_snap *snap;
1706 struct rbd_snap *next;
1708 list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
1709 __rbd_remove_snap_dev(snap);
1713 * only read the first part of the ondisk header, without the snaps info
1715 static int __rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
1718 struct rbd_image_header h;
1720 ret = rbd_read_header(rbd_dev, &h);
1724 down_write(&rbd_dev->header_rwsem);
1727 if (rbd_dev->snap_id == CEPH_NOSNAP) {
1728 sector_t size = (sector_t) h.image_size / SECTOR_SIZE;
1730 dout("setting size to %llu sectors", (unsigned long long) size);
1731 set_capacity(rbd_dev->disk, size);
1734 /* rbd_dev->header.object_prefix shouldn't change */
1735 kfree(rbd_dev->header.snap_sizes);
1736 kfree(rbd_dev->header.snap_names);
1737 /* osd requests may still refer to snapc */
1738 ceph_put_snap_context(rbd_dev->header.snapc);
1741 *hver = h.obj_version;
1742 rbd_dev->header.obj_version = h.obj_version;
1743 rbd_dev->header.image_size = h.image_size;
1744 rbd_dev->header.total_snaps = h.total_snaps;
1745 rbd_dev->header.snapc = h.snapc;
1746 rbd_dev->header.snap_names = h.snap_names;
1747 rbd_dev->header.snap_names_len = h.snap_names_len;
1748 rbd_dev->header.snap_sizes = h.snap_sizes;
1749 /* Free the extra copy of the object prefix */
1750 WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1751 kfree(h.object_prefix);
1753 ret = __rbd_init_snaps_header(rbd_dev);
1755 up_write(&rbd_dev->header_rwsem);
1760 static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
1764 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1765 ret = __rbd_refresh_header(rbd_dev, hver);
1766 mutex_unlock(&ctl_mutex);
1771 static int rbd_init_disk(struct rbd_device *rbd_dev)
1773 struct gendisk *disk;
1774 struct request_queue *q;
1779 /* contact OSD, request size info about the object being mapped */
1780 rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1784 /* no need to lock here, as rbd_dev is not registered yet */
1785 rc = __rbd_init_snaps_header(rbd_dev);
1789 rc = rbd_header_set_snap(rbd_dev, &total_size);
1793 /* create gendisk info */
1795 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1799 snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1801 disk->major = rbd_dev->major;
1802 disk->first_minor = 0;
1803 disk->fops = &rbd_bd_ops;
1804 disk->private_data = rbd_dev;
1808 q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1812 /* We use the default size, but let's be explicit about it. */
1813 blk_queue_physical_block_size(q, SECTOR_SIZE);
1815 /* set io sizes to object size */
1816 segment_size = rbd_obj_bytes(&rbd_dev->header);
1817 blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1818 blk_queue_max_segment_size(q, segment_size);
1819 blk_queue_io_min(q, segment_size);
1820 blk_queue_io_opt(q, segment_size);
1822 blk_queue_merge_bvec(q, rbd_merge_bvec);
1825 q->queuedata = rbd_dev;
1827 rbd_dev->disk = disk;
1830 /* finally, announce the disk to the world */
1831 set_capacity(disk, total_size / SECTOR_SIZE);
1834 pr_info("%s: added with size 0x%llx\n",
1835 disk->disk_name, (unsigned long long)total_size);
1848 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1850 return container_of(dev, struct rbd_device, dev);
1853 static ssize_t rbd_size_show(struct device *dev,
1854 struct device_attribute *attr, char *buf)
1856 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1859 down_read(&rbd_dev->header_rwsem);
1860 size = get_capacity(rbd_dev->disk);
1861 up_read(&rbd_dev->header_rwsem);
1863 return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
1866 static ssize_t rbd_major_show(struct device *dev,
1867 struct device_attribute *attr, char *buf)
1869 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1871 return sprintf(buf, "%d\n", rbd_dev->major);
1874 static ssize_t rbd_client_id_show(struct device *dev,
1875 struct device_attribute *attr, char *buf)
1877 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1879 return sprintf(buf, "client%lld\n",
1880 ceph_client_id(rbd_dev->rbd_client->client));
1883 static ssize_t rbd_pool_show(struct device *dev,
1884 struct device_attribute *attr, char *buf)
1886 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1888 return sprintf(buf, "%s\n", rbd_dev->pool_name);
1891 static ssize_t rbd_pool_id_show(struct device *dev,
1892 struct device_attribute *attr, char *buf)
1894 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1896 return sprintf(buf, "%d\n", rbd_dev->pool_id);
1899 static ssize_t rbd_name_show(struct device *dev,
1900 struct device_attribute *attr, char *buf)
1902 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1904 return sprintf(buf, "%s\n", rbd_dev->image_name);
1907 static ssize_t rbd_snap_show(struct device *dev,
1908 struct device_attribute *attr,
1911 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1913 return sprintf(buf, "%s\n", rbd_dev->snap_name);
1916 static ssize_t rbd_image_refresh(struct device *dev,
1917 struct device_attribute *attr,
1921 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1924 ret = rbd_refresh_header(rbd_dev, NULL);
1926 return ret < 0 ? ret : size;
1929 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1930 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1931 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1932 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1933 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
1934 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1935 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1936 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1937 static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
1939 static struct attribute *rbd_attrs[] = {
1940 &dev_attr_size.attr,
1941 &dev_attr_major.attr,
1942 &dev_attr_client_id.attr,
1943 &dev_attr_pool.attr,
1944 &dev_attr_pool_id.attr,
1945 &dev_attr_name.attr,
1946 &dev_attr_current_snap.attr,
1947 &dev_attr_refresh.attr,
1948 &dev_attr_create_snap.attr,
1952 static struct attribute_group rbd_attr_group = {
1956 static const struct attribute_group *rbd_attr_groups[] = {
1961 static void rbd_sysfs_dev_release(struct device *dev)
1965 static struct device_type rbd_device_type = {
1967 .groups = rbd_attr_groups,
1968 .release = rbd_sysfs_dev_release,
1976 static ssize_t rbd_snap_size_show(struct device *dev,
1977 struct device_attribute *attr,
1980 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1982 return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
1985 static ssize_t rbd_snap_id_show(struct device *dev,
1986 struct device_attribute *attr,
1989 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1991 return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
1994 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
1995 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
1997 static struct attribute *rbd_snap_attrs[] = {
1998 &dev_attr_snap_size.attr,
1999 &dev_attr_snap_id.attr,
2003 static struct attribute_group rbd_snap_attr_group = {
2004 .attrs = rbd_snap_attrs,
2007 static void rbd_snap_dev_release(struct device *dev)
2009 struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2014 static const struct attribute_group *rbd_snap_attr_groups[] = {
2015 &rbd_snap_attr_group,
2019 static struct device_type rbd_snap_device_type = {
2020 .groups = rbd_snap_attr_groups,
2021 .release = rbd_snap_dev_release,
2024 static void __rbd_remove_snap_dev(struct rbd_snap *snap)
2026 list_del(&snap->node);
2027 device_unregister(&snap->dev);
2030 static int rbd_register_snap_dev(struct rbd_snap *snap,
2031 struct device *parent)
2033 struct device *dev = &snap->dev;
2036 dev->type = &rbd_snap_device_type;
2037 dev->parent = parent;
2038 dev->release = rbd_snap_dev_release;
2039 dev_set_name(dev, "snap_%s", snap->name);
2040 ret = device_register(dev);
2045 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2046 int i, const char *name)
2048 struct rbd_snap *snap;
2051 snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2053 return ERR_PTR(-ENOMEM);
2056 snap->name = kstrdup(name, GFP_KERNEL);
2060 snap->size = rbd_dev->header.snap_sizes[i];
2061 snap->id = rbd_dev->header.snapc->snaps[i];
2062 if (device_is_registered(&rbd_dev->dev)) {
2063 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2074 return ERR_PTR(ret);
2078 * Scan the rbd device's current snapshot list and compare it to the
2079 * newly-received snapshot context. Remove any existing snapshots
2080 * not present in the new snapshot context. Add a new snapshot for
2081 * any snaphots in the snapshot context not in the current list.
2082 * And verify there are no changes to snapshots we already know
2085 * Assumes the snapshots in the snapshot context are sorted by
2086 * snapshot id, highest id first. (Snapshots in the rbd_dev's list
2087 * are also maintained in that order.)
2089 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
2091 struct ceph_snap_context *snapc = rbd_dev->header.snapc;
2092 const u32 snap_count = snapc->num_snaps;
2093 char *snap_name = rbd_dev->header.snap_names;
2094 struct list_head *head = &rbd_dev->snaps;
2095 struct list_head *links = head->next;
2098 while (index < snap_count || links != head) {
2100 struct rbd_snap *snap;
2102 snap_id = index < snap_count ? snapc->snaps[index]
2104 snap = links != head ? list_entry(links, struct rbd_snap, node)
2106 BUG_ON(snap && snap->id == CEPH_NOSNAP);
2108 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
2109 struct list_head *next = links->next;
2111 /* Existing snapshot not in the new snap context */
2113 if (rbd_dev->snap_id == snap->id)
2114 rbd_dev->snap_exists = false;
2115 __rbd_remove_snap_dev(snap);
2117 /* Done with this list entry; advance */
2123 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
2124 struct rbd_snap *new_snap;
2126 /* We haven't seen this snapshot before */
2128 new_snap = __rbd_add_snap_dev(rbd_dev, index,
2130 if (IS_ERR(new_snap))
2131 return PTR_ERR(new_snap);
2133 /* New goes before existing, or at end of list */
2136 list_add_tail(&new_snap->node, &snap->node);
2138 list_add(&new_snap->node, head);
2140 /* Already have this one */
2142 BUG_ON(snap->size != rbd_dev->header.snap_sizes[index]);
2143 BUG_ON(strcmp(snap->name, snap_name));
2145 /* Done with this list entry; advance */
2147 links = links->next;
2150 /* Advance to the next entry in the snapshot context */
2153 snap_name += strlen(snap_name) + 1;
2159 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2163 struct rbd_snap *snap;
2165 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2166 dev = &rbd_dev->dev;
2168 dev->bus = &rbd_bus_type;
2169 dev->type = &rbd_device_type;
2170 dev->parent = &rbd_root_dev;
2171 dev->release = rbd_dev_release;
2172 dev_set_name(dev, "%d", rbd_dev->dev_id);
2173 ret = device_register(dev);
2177 list_for_each_entry(snap, &rbd_dev->snaps, node) {
2178 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2183 mutex_unlock(&ctl_mutex);
2187 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2189 device_unregister(&rbd_dev->dev);
2192 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2197 ret = rbd_req_sync_watch(rbd_dev);
2198 if (ret == -ERANGE) {
2199 rc = rbd_refresh_header(rbd_dev, NULL);
2203 } while (ret == -ERANGE);
2208 static atomic64_t rbd_id_max = ATOMIC64_INIT(0);
2211 * Get a unique rbd identifier for the given new rbd_dev, and add
2212 * the rbd_dev to the global list. The minimum rbd id is 1.
2214 static void rbd_id_get(struct rbd_device *rbd_dev)
2216 rbd_dev->dev_id = atomic64_inc_return(&rbd_id_max);
2218 spin_lock(&rbd_dev_list_lock);
2219 list_add_tail(&rbd_dev->node, &rbd_dev_list);
2220 spin_unlock(&rbd_dev_list_lock);
2224 * Remove an rbd_dev from the global list, and record that its
2225 * identifier is no longer in use.
2227 static void rbd_id_put(struct rbd_device *rbd_dev)
2229 struct list_head *tmp;
2230 int rbd_id = rbd_dev->dev_id;
2235 spin_lock(&rbd_dev_list_lock);
2236 list_del_init(&rbd_dev->node);
2239 * If the id being "put" is not the current maximum, there
2240 * is nothing special we need to do.
2242 if (rbd_id != atomic64_read(&rbd_id_max)) {
2243 spin_unlock(&rbd_dev_list_lock);
2248 * We need to update the current maximum id. Search the
2249 * list to find out what it is. We're more likely to find
2250 * the maximum at the end, so search the list backward.
2253 list_for_each_prev(tmp, &rbd_dev_list) {
2254 struct rbd_device *rbd_dev;
2256 rbd_dev = list_entry(tmp, struct rbd_device, node);
2257 if (rbd_id > max_id)
2260 spin_unlock(&rbd_dev_list_lock);
2263 * The max id could have been updated by rbd_id_get(), in
2264 * which case it now accurately reflects the new maximum.
2265 * Be careful not to overwrite the maximum value in that
2268 atomic64_cmpxchg(&rbd_id_max, rbd_id, max_id);
2272 * Skips over white space at *buf, and updates *buf to point to the
2273 * first found non-space character (if any). Returns the length of
2274 * the token (string of non-white space characters) found. Note
2275 * that *buf must be terminated with '\0'.
2277 static inline size_t next_token(const char **buf)
2280 * These are the characters that produce nonzero for
2281 * isspace() in the "C" and "POSIX" locales.
2283 const char *spaces = " \f\n\r\t\v";
2285 *buf += strspn(*buf, spaces); /* Find start of token */
2287 return strcspn(*buf, spaces); /* Return token length */
2291 * Finds the next token in *buf, and if the provided token buffer is
2292 * big enough, copies the found token into it. The result, if
2293 * copied, is guaranteed to be terminated with '\0'. Note that *buf
2294 * must be terminated with '\0' on entry.
2296 * Returns the length of the token found (not including the '\0').
2297 * Return value will be 0 if no token is found, and it will be >=
2298 * token_size if the token would not fit.
2300 * The *buf pointer will be updated to point beyond the end of the
2301 * found token. Note that this occurs even if the token buffer is
2302 * too small to hold it.
2304 static inline size_t copy_token(const char **buf,
2310 len = next_token(buf);
2311 if (len < token_size) {
2312 memcpy(token, *buf, len);
2313 *(token + len) = '\0';
2321 * Finds the next token in *buf, dynamically allocates a buffer big
2322 * enough to hold a copy of it, and copies the token into the new
2323 * buffer. The copy is guaranteed to be terminated with '\0'. Note
2324 * that a duplicate buffer is created even for a zero-length token.
2326 * Returns a pointer to the newly-allocated duplicate, or a null
2327 * pointer if memory for the duplicate was not available. If
2328 * the lenp argument is a non-null pointer, the length of the token
2329 * (not including the '\0') is returned in *lenp.
2331 * If successful, the *buf pointer will be updated to point beyond
2332 * the end of the found token.
2334 * Note: uses GFP_KERNEL for allocation.
2336 static inline char *dup_token(const char **buf, size_t *lenp)
2341 len = next_token(buf);
2342 dup = kmalloc(len + 1, GFP_KERNEL);
2346 memcpy(dup, *buf, len);
2347 *(dup + len) = '\0';
2357 * This fills in the pool_name, image_name, image_name_len, snap_name,
2358 * rbd_dev, rbd_md_name, and name fields of the given rbd_dev, based
2359 * on the list of monitor addresses and other options provided via
2362 * Note: rbd_dev is assumed to have been initially zero-filled.
2364 static int rbd_add_parse_args(struct rbd_device *rbd_dev,
2366 const char **mon_addrs,
2367 size_t *mon_addrs_size,
2369 size_t options_size)
2374 /* The first four tokens are required */
2376 len = next_token(&buf);
2379 *mon_addrs_size = len + 1;
2384 len = copy_token(&buf, options, options_size);
2385 if (!len || len >= options_size)
2389 rbd_dev->pool_name = dup_token(&buf, NULL);
2390 if (!rbd_dev->pool_name)
2393 rbd_dev->image_name = dup_token(&buf, &rbd_dev->image_name_len);
2394 if (!rbd_dev->image_name)
2397 /* Create the name of the header object */
2399 rbd_dev->header_name = kmalloc(rbd_dev->image_name_len
2400 + sizeof (RBD_SUFFIX),
2402 if (!rbd_dev->header_name)
2404 sprintf(rbd_dev->header_name, "%s%s", rbd_dev->image_name, RBD_SUFFIX);
2407 * The snapshot name is optional. If none is is supplied,
2408 * we use the default value.
2410 rbd_dev->snap_name = dup_token(&buf, &len);
2411 if (!rbd_dev->snap_name)
2414 /* Replace the empty name with the default */
2415 kfree(rbd_dev->snap_name);
2417 = kmalloc(sizeof (RBD_SNAP_HEAD_NAME), GFP_KERNEL);
2418 if (!rbd_dev->snap_name)
2421 memcpy(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
2422 sizeof (RBD_SNAP_HEAD_NAME));
2428 kfree(rbd_dev->header_name);
2429 rbd_dev->header_name = NULL;
2430 kfree(rbd_dev->image_name);
2431 rbd_dev->image_name = NULL;
2432 rbd_dev->image_name_len = 0;
2433 kfree(rbd_dev->pool_name);
2434 rbd_dev->pool_name = NULL;
2439 static ssize_t rbd_add(struct bus_type *bus,
2444 struct rbd_device *rbd_dev = NULL;
2445 const char *mon_addrs = NULL;
2446 size_t mon_addrs_size = 0;
2447 struct ceph_osd_client *osdc;
2450 if (!try_module_get(THIS_MODULE))
2453 options = kmalloc(count, GFP_KERNEL);
2456 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2460 /* static rbd_device initialization */
2461 spin_lock_init(&rbd_dev->lock);
2462 INIT_LIST_HEAD(&rbd_dev->node);
2463 INIT_LIST_HEAD(&rbd_dev->snaps);
2464 init_rwsem(&rbd_dev->header_rwsem);
2466 /* generate unique id: find highest unique id, add one */
2467 rbd_id_get(rbd_dev);
2469 /* Fill in the device name, now that we have its id. */
2470 BUILD_BUG_ON(DEV_NAME_LEN
2471 < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
2472 sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
2474 /* parse add command */
2475 rc = rbd_add_parse_args(rbd_dev, buf, &mon_addrs, &mon_addrs_size,
2480 rbd_dev->rbd_client = rbd_get_client(mon_addrs, mon_addrs_size - 1,
2482 if (IS_ERR(rbd_dev->rbd_client)) {
2483 rc = PTR_ERR(rbd_dev->rbd_client);
2484 rbd_dev->rbd_client = NULL;
2489 osdc = &rbd_dev->rbd_client->client->osdc;
2490 rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2492 goto err_out_client;
2493 rbd_dev->pool_id = rc;
2495 /* register our block device */
2496 rc = register_blkdev(0, rbd_dev->name);
2498 goto err_out_client;
2499 rbd_dev->major = rc;
2501 rc = rbd_bus_add_dev(rbd_dev);
2503 goto err_out_blkdev;
2506 * At this point cleanup in the event of an error is the job
2507 * of the sysfs code (initiated by rbd_bus_del_dev()).
2509 * Set up and announce blkdev mapping.
2511 rc = rbd_init_disk(rbd_dev);
2515 rc = rbd_init_watch_dev(rbd_dev);
2522 /* this will also clean up rest of rbd_dev stuff */
2524 rbd_bus_del_dev(rbd_dev);
2529 unregister_blkdev(rbd_dev->major, rbd_dev->name);
2531 rbd_put_client(rbd_dev);
2533 if (rbd_dev->pool_name) {
2534 kfree(rbd_dev->snap_name);
2535 kfree(rbd_dev->header_name);
2536 kfree(rbd_dev->image_name);
2537 kfree(rbd_dev->pool_name);
2539 rbd_id_put(rbd_dev);
2544 dout("Error adding device %s\n", buf);
2545 module_put(THIS_MODULE);
2547 return (ssize_t) rc;
2550 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
2552 struct list_head *tmp;
2553 struct rbd_device *rbd_dev;
2555 spin_lock(&rbd_dev_list_lock);
2556 list_for_each(tmp, &rbd_dev_list) {
2557 rbd_dev = list_entry(tmp, struct rbd_device, node);
2558 if (rbd_dev->dev_id == dev_id) {
2559 spin_unlock(&rbd_dev_list_lock);
2563 spin_unlock(&rbd_dev_list_lock);
2567 static void rbd_dev_release(struct device *dev)
2569 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2571 if (rbd_dev->watch_request) {
2572 struct ceph_client *client = rbd_dev->rbd_client->client;
2574 ceph_osdc_unregister_linger_request(&client->osdc,
2575 rbd_dev->watch_request);
2577 if (rbd_dev->watch_event)
2578 rbd_req_sync_unwatch(rbd_dev);
2580 rbd_put_client(rbd_dev);
2582 /* clean up and free blkdev */
2583 rbd_free_disk(rbd_dev);
2584 unregister_blkdev(rbd_dev->major, rbd_dev->name);
2586 /* done with the id, and with the rbd_dev */
2587 kfree(rbd_dev->snap_name);
2588 kfree(rbd_dev->header_name);
2589 kfree(rbd_dev->pool_name);
2590 kfree(rbd_dev->image_name);
2591 rbd_id_put(rbd_dev);
2594 /* release module ref */
2595 module_put(THIS_MODULE);
2598 static ssize_t rbd_remove(struct bus_type *bus,
2602 struct rbd_device *rbd_dev = NULL;
2607 rc = strict_strtoul(buf, 10, &ul);
2611 /* convert to int; abort if we lost anything in the conversion */
2612 target_id = (int) ul;
2613 if (target_id != ul)
2616 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2618 rbd_dev = __rbd_get_dev(target_id);
2624 __rbd_remove_all_snaps(rbd_dev);
2625 rbd_bus_del_dev(rbd_dev);
2628 mutex_unlock(&ctl_mutex);
2632 static ssize_t rbd_snap_add(struct device *dev,
2633 struct device_attribute *attr,
2637 struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2639 char *name = kmalloc(count + 1, GFP_KERNEL);
2643 snprintf(name, count, "%s", buf);
2645 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2647 ret = rbd_header_add_snap(rbd_dev,
2652 ret = __rbd_refresh_header(rbd_dev, NULL);
2656 /* shouldn't hold ctl_mutex when notifying.. notify might
2657 trigger a watch callback that would need to get that mutex */
2658 mutex_unlock(&ctl_mutex);
2660 /* make a best effort, don't error if failed */
2661 rbd_req_sync_notify(rbd_dev);
2668 mutex_unlock(&ctl_mutex);
2674 * create control files in sysfs
2677 static int rbd_sysfs_init(void)
2681 ret = device_register(&rbd_root_dev);
2685 ret = bus_register(&rbd_bus_type);
2687 device_unregister(&rbd_root_dev);
2692 static void rbd_sysfs_cleanup(void)
2694 bus_unregister(&rbd_bus_type);
2695 device_unregister(&rbd_root_dev);
2698 int __init rbd_init(void)
2702 rc = rbd_sysfs_init();
2705 pr_info("loaded " RBD_DRV_NAME_LONG "\n");
2709 void __exit rbd_exit(void)
2711 rbd_sysfs_cleanup();
2714 module_init(rbd_init);
2715 module_exit(rbd_exit);
2717 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2718 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2719 MODULE_DESCRIPTION("rados block device");
2721 /* following authorship retained from original osdblk.c */
2722 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2724 MODULE_LICENSE("GPL");