rbd: have rbd_obj_method_sync() return transfer count
[cascardo/linux.git] / drivers / block / rbd.c
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3
4
5    based on drivers/block/osdblk.c:
6
7    Copyright 2009 Red Hat, Inc.
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24    For usage instructions, please refer to:
25
26                  Documentation/ABI/testing/sysfs-bus-rbd
27
28  */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 #define RBD_DEBUG       /* Activate rbd_assert() calls */
45
46 /*
47  * The basic unit of block I/O is a sector.  It is interpreted in a
48  * number of contexts in Linux (blk, bio, genhd), but the default is
49  * universally 512 bytes.  These symbols are just slightly more
50  * meaningful than the bare numbers they represent.
51  */
52 #define SECTOR_SHIFT    9
53 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
54
55 #define RBD_DRV_NAME "rbd"
56 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
57
58 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
59
60 #define RBD_SNAP_DEV_NAME_PREFIX        "snap_"
61 #define RBD_MAX_SNAP_NAME_LEN   \
62                         (NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
63
64 #define RBD_MAX_SNAP_COUNT      510     /* allows max snapc to fit in 4KB */
65
66 #define RBD_SNAP_HEAD_NAME      "-"
67
68 /* This allows a single page to hold an image name sent by OSD */
69 #define RBD_IMAGE_NAME_LEN_MAX  (PAGE_SIZE - sizeof (__le32) - 1)
70 #define RBD_IMAGE_ID_LEN_MAX    64
71
72 #define RBD_OBJ_PREFIX_LEN_MAX  64
73
74 /* Feature bits */
75
76 #define RBD_FEATURE_LAYERING    (1<<0)
77 #define RBD_FEATURE_STRIPINGV2  (1<<1)
78 #define RBD_FEATURES_ALL \
79             (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
80
81 /* Features supported by this (client software) implementation. */
82
83 #define RBD_FEATURES_SUPPORTED  (0)
84
85 /*
86  * An RBD device name will be "rbd#", where the "rbd" comes from
87  * RBD_DRV_NAME above, and # is a unique integer identifier.
88  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
89  * enough to hold all possible device names.
90  */
91 #define DEV_NAME_LEN            32
92 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
93
94 /*
95  * block device image metadata (in-memory version)
96  */
97 struct rbd_image_header {
98         /* These four fields never change for a given rbd image */
99         char *object_prefix;
100         u64 features;
101         __u8 obj_order;
102         __u8 crypt_type;
103         __u8 comp_type;
104
105         /* The remaining fields need to be updated occasionally */
106         u64 image_size;
107         struct ceph_snap_context *snapc;
108         char *snap_names;
109         u64 *snap_sizes;
110
111         u64 obj_version;
112 };
113
114 /*
115  * An rbd image specification.
116  *
117  * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
118  * identify an image.  Each rbd_dev structure includes a pointer to
119  * an rbd_spec structure that encapsulates this identity.
120  *
121  * Each of the id's in an rbd_spec has an associated name.  For a
122  * user-mapped image, the names are supplied and the id's associated
123  * with them are looked up.  For a layered image, a parent image is
124  * defined by the tuple, and the names are looked up.
125  *
126  * An rbd_dev structure contains a parent_spec pointer which is
127  * non-null if the image it represents is a child in a layered
128  * image.  This pointer will refer to the rbd_spec structure used
129  * by the parent rbd_dev for its own identity (i.e., the structure
130  * is shared between the parent and child).
131  *
132  * Since these structures are populated once, during the discovery
133  * phase of image construction, they are effectively immutable so
134  * we make no effort to synchronize access to them.
135  *
136  * Note that code herein does not assume the image name is known (it
137  * could be a null pointer).
138  */
139 struct rbd_spec {
140         u64             pool_id;
141         char            *pool_name;
142
143         char            *image_id;
144         char            *image_name;
145
146         u64             snap_id;
147         char            *snap_name;
148
149         struct kref     kref;
150 };
151
152 /*
153  * an instance of the client.  multiple devices may share an rbd client.
154  */
155 struct rbd_client {
156         struct ceph_client      *client;
157         struct kref             kref;
158         struct list_head        node;
159 };
160
161 struct rbd_img_request;
162 typedef void (*rbd_img_callback_t)(struct rbd_img_request *);
163
164 #define BAD_WHICH       U32_MAX         /* Good which or bad which, which? */
165
166 struct rbd_obj_request;
167 typedef void (*rbd_obj_callback_t)(struct rbd_obj_request *);
168
169 enum obj_request_type {
170         OBJ_REQUEST_NODATA, OBJ_REQUEST_BIO, OBJ_REQUEST_PAGES
171 };
172
173 enum obj_req_flags {
174         OBJ_REQ_DONE,           /* completion flag: not done = 0, done = 1 */
175         OBJ_REQ_IMG_DATA,       /* object usage: standalone = 0, image = 1 */
176         OBJ_REQ_KNOWN,          /* EXISTS flag valid: no = 0, yes = 1 */
177         OBJ_REQ_EXISTS,         /* target exists: no = 0, yes = 1 */
178 };
179
180 struct rbd_obj_request {
181         const char              *object_name;
182         u64                     offset;         /* object start byte */
183         u64                     length;         /* bytes from offset */
184         unsigned long           flags;
185
186         /*
187          * An object request associated with an image will have its
188          * img_data flag set; a standalone object request will not.
189          *
190          * A standalone object request will have which == BAD_WHICH
191          * and a null obj_request pointer.
192          *
193          * An object request initiated in support of a layered image
194          * object (to check for its existence before a write) will
195          * have which == BAD_WHICH and a non-null obj_request pointer.
196          *
197          * Finally, an object request for rbd image data will have
198          * which != BAD_WHICH, and will have a non-null img_request
199          * pointer.  The value of which will be in the range
200          * 0..(img_request->obj_request_count-1).
201          */
202         union {
203                 struct rbd_obj_request  *obj_request;   /* STAT op */
204                 struct {
205                         struct rbd_img_request  *img_request;
206                         u64                     img_offset;
207                         /* links for img_request->obj_requests list */
208                         struct list_head        links;
209                 };
210         };
211         u32                     which;          /* posn image request list */
212
213         enum obj_request_type   type;
214         union {
215                 struct bio      *bio_list;
216                 struct {
217                         struct page     **pages;
218                         u32             page_count;
219                 };
220         };
221         struct page             **copyup_pages;
222
223         struct ceph_osd_request *osd_req;
224
225         u64                     xferred;        /* bytes transferred */
226         u64                     version;
227         int                     result;
228
229         rbd_obj_callback_t      callback;
230         struct completion       completion;
231
232         struct kref             kref;
233 };
234
235 enum img_req_flags {
236         IMG_REQ_WRITE,          /* I/O direction: read = 0, write = 1 */
237         IMG_REQ_CHILD,          /* initiator: block = 0, child image = 1 */
238         IMG_REQ_LAYERED,        /* ENOENT handling: normal = 0, layered = 1 */
239 };
240
241 struct rbd_img_request {
242         struct rbd_device       *rbd_dev;
243         u64                     offset; /* starting image byte offset */
244         u64                     length; /* byte count from offset */
245         unsigned long           flags;
246         union {
247                 u64                     snap_id;        /* for reads */
248                 struct ceph_snap_context *snapc;        /* for writes */
249         };
250         union {
251                 struct request          *rq;            /* block request */
252                 struct rbd_obj_request  *obj_request;   /* obj req initiator */
253         };
254         struct page             **copyup_pages;
255         spinlock_t              completion_lock;/* protects next_completion */
256         u32                     next_completion;
257         rbd_img_callback_t      callback;
258         u64                     xferred;/* aggregate bytes transferred */
259         int                     result; /* first nonzero obj_request result */
260
261         u32                     obj_request_count;
262         struct list_head        obj_requests;   /* rbd_obj_request structs */
263
264         struct kref             kref;
265 };
266
267 #define for_each_obj_request(ireq, oreq) \
268         list_for_each_entry(oreq, &(ireq)->obj_requests, links)
269 #define for_each_obj_request_from(ireq, oreq) \
270         list_for_each_entry_from(oreq, &(ireq)->obj_requests, links)
271 #define for_each_obj_request_safe(ireq, oreq, n) \
272         list_for_each_entry_safe_reverse(oreq, n, &(ireq)->obj_requests, links)
273
274 struct rbd_snap {
275         struct  device          dev;
276         const char              *name;
277         u64                     size;
278         struct list_head        node;
279         u64                     id;
280         u64                     features;
281 };
282
283 struct rbd_mapping {
284         u64                     size;
285         u64                     features;
286         bool                    read_only;
287 };
288
289 /*
290  * a single device
291  */
292 struct rbd_device {
293         int                     dev_id;         /* blkdev unique id */
294
295         int                     major;          /* blkdev assigned major */
296         struct gendisk          *disk;          /* blkdev's gendisk and rq */
297
298         u32                     image_format;   /* Either 1 or 2 */
299         struct rbd_client       *rbd_client;
300
301         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
302
303         spinlock_t              lock;           /* queue, flags, open_count */
304
305         struct rbd_image_header header;
306         unsigned long           flags;          /* possibly lock protected */
307         struct rbd_spec         *spec;
308
309         char                    *header_name;
310
311         struct ceph_file_layout layout;
312
313         struct ceph_osd_event   *watch_event;
314         struct rbd_obj_request  *watch_request;
315
316         struct rbd_spec         *parent_spec;
317         u64                     parent_overlap;
318         struct rbd_device       *parent;
319
320         /* protects updating the header */
321         struct rw_semaphore     header_rwsem;
322
323         struct rbd_mapping      mapping;
324
325         struct list_head        node;
326
327         /* list of snapshots */
328         struct list_head        snaps;
329
330         /* sysfs related */
331         struct device           dev;
332         unsigned long           open_count;     /* protected by lock */
333 };
334
335 /*
336  * Flag bits for rbd_dev->flags.  If atomicity is required,
337  * rbd_dev->lock is used to protect access.
338  *
339  * Currently, only the "removing" flag (which is coupled with the
340  * "open_count" field) requires atomic access.
341  */
342 enum rbd_dev_flags {
343         RBD_DEV_FLAG_EXISTS,    /* mapped snapshot has not been deleted */
344         RBD_DEV_FLAG_REMOVING,  /* this mapping is being removed */
345 };
346
347 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
348
349 static LIST_HEAD(rbd_dev_list);    /* devices */
350 static DEFINE_SPINLOCK(rbd_dev_list_lock);
351
352 static LIST_HEAD(rbd_client_list);              /* clients */
353 static DEFINE_SPINLOCK(rbd_client_list_lock);
354
355 static int rbd_img_request_submit(struct rbd_img_request *img_request);
356
357 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev);
358 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
359
360 static void rbd_dev_release(struct device *dev);
361 static void rbd_remove_snap_dev(struct rbd_snap *snap);
362
363 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
364                        size_t count);
365 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
366                           size_t count);
367 static int rbd_dev_probe(struct rbd_device *rbd_dev);
368
369 static struct bus_attribute rbd_bus_attrs[] = {
370         __ATTR(add, S_IWUSR, NULL, rbd_add),
371         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
372         __ATTR_NULL
373 };
374
375 static struct bus_type rbd_bus_type = {
376         .name           = "rbd",
377         .bus_attrs      = rbd_bus_attrs,
378 };
379
380 static void rbd_root_dev_release(struct device *dev)
381 {
382 }
383
384 static struct device rbd_root_dev = {
385         .init_name =    "rbd",
386         .release =      rbd_root_dev_release,
387 };
388
389 static __printf(2, 3)
390 void rbd_warn(struct rbd_device *rbd_dev, const char *fmt, ...)
391 {
392         struct va_format vaf;
393         va_list args;
394
395         va_start(args, fmt);
396         vaf.fmt = fmt;
397         vaf.va = &args;
398
399         if (!rbd_dev)
400                 printk(KERN_WARNING "%s: %pV\n", RBD_DRV_NAME, &vaf);
401         else if (rbd_dev->disk)
402                 printk(KERN_WARNING "%s: %s: %pV\n",
403                         RBD_DRV_NAME, rbd_dev->disk->disk_name, &vaf);
404         else if (rbd_dev->spec && rbd_dev->spec->image_name)
405                 printk(KERN_WARNING "%s: image %s: %pV\n",
406                         RBD_DRV_NAME, rbd_dev->spec->image_name, &vaf);
407         else if (rbd_dev->spec && rbd_dev->spec->image_id)
408                 printk(KERN_WARNING "%s: id %s: %pV\n",
409                         RBD_DRV_NAME, rbd_dev->spec->image_id, &vaf);
410         else    /* punt */
411                 printk(KERN_WARNING "%s: rbd_dev %p: %pV\n",
412                         RBD_DRV_NAME, rbd_dev, &vaf);
413         va_end(args);
414 }
415
416 #ifdef RBD_DEBUG
417 #define rbd_assert(expr)                                                \
418                 if (unlikely(!(expr))) {                                \
419                         printk(KERN_ERR "\nAssertion failure in %s() "  \
420                                                 "at line %d:\n\n"       \
421                                         "\trbd_assert(%s);\n\n",        \
422                                         __func__, __LINE__, #expr);     \
423                         BUG();                                          \
424                 }
425 #else /* !RBD_DEBUG */
426 #  define rbd_assert(expr)      ((void) 0)
427 #endif /* !RBD_DEBUG */
428
429 static void rbd_img_parent_read(struct rbd_obj_request *obj_request);
430 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request);
431
432 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
433 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
434
435 static int rbd_open(struct block_device *bdev, fmode_t mode)
436 {
437         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
438         bool removing = false;
439
440         if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
441                 return -EROFS;
442
443         spin_lock_irq(&rbd_dev->lock);
444         if (test_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags))
445                 removing = true;
446         else
447                 rbd_dev->open_count++;
448         spin_unlock_irq(&rbd_dev->lock);
449         if (removing)
450                 return -ENOENT;
451
452         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
453         (void) get_device(&rbd_dev->dev);
454         set_device_ro(bdev, rbd_dev->mapping.read_only);
455         mutex_unlock(&ctl_mutex);
456
457         return 0;
458 }
459
460 static int rbd_release(struct gendisk *disk, fmode_t mode)
461 {
462         struct rbd_device *rbd_dev = disk->private_data;
463         unsigned long open_count_before;
464
465         spin_lock_irq(&rbd_dev->lock);
466         open_count_before = rbd_dev->open_count--;
467         spin_unlock_irq(&rbd_dev->lock);
468         rbd_assert(open_count_before > 0);
469
470         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
471         put_device(&rbd_dev->dev);
472         mutex_unlock(&ctl_mutex);
473
474         return 0;
475 }
476
477 static const struct block_device_operations rbd_bd_ops = {
478         .owner                  = THIS_MODULE,
479         .open                   = rbd_open,
480         .release                = rbd_release,
481 };
482
483 /*
484  * Initialize an rbd client instance.
485  * We own *ceph_opts.
486  */
487 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts)
488 {
489         struct rbd_client *rbdc;
490         int ret = -ENOMEM;
491
492         dout("%s:\n", __func__);
493         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
494         if (!rbdc)
495                 goto out_opt;
496
497         kref_init(&rbdc->kref);
498         INIT_LIST_HEAD(&rbdc->node);
499
500         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
501
502         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
503         if (IS_ERR(rbdc->client))
504                 goto out_mutex;
505         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
506
507         ret = ceph_open_session(rbdc->client);
508         if (ret < 0)
509                 goto out_err;
510
511         spin_lock(&rbd_client_list_lock);
512         list_add_tail(&rbdc->node, &rbd_client_list);
513         spin_unlock(&rbd_client_list_lock);
514
515         mutex_unlock(&ctl_mutex);
516         dout("%s: rbdc %p\n", __func__, rbdc);
517
518         return rbdc;
519
520 out_err:
521         ceph_destroy_client(rbdc->client);
522 out_mutex:
523         mutex_unlock(&ctl_mutex);
524         kfree(rbdc);
525 out_opt:
526         if (ceph_opts)
527                 ceph_destroy_options(ceph_opts);
528         dout("%s: error %d\n", __func__, ret);
529
530         return ERR_PTR(ret);
531 }
532
533 static struct rbd_client *__rbd_get_client(struct rbd_client *rbdc)
534 {
535         kref_get(&rbdc->kref);
536
537         return rbdc;
538 }
539
540 /*
541  * Find a ceph client with specific addr and configuration.  If
542  * found, bump its reference count.
543  */
544 static struct rbd_client *rbd_client_find(struct ceph_options *ceph_opts)
545 {
546         struct rbd_client *client_node;
547         bool found = false;
548
549         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
550                 return NULL;
551
552         spin_lock(&rbd_client_list_lock);
553         list_for_each_entry(client_node, &rbd_client_list, node) {
554                 if (!ceph_compare_options(ceph_opts, client_node->client)) {
555                         __rbd_get_client(client_node);
556
557                         found = true;
558                         break;
559                 }
560         }
561         spin_unlock(&rbd_client_list_lock);
562
563         return found ? client_node : NULL;
564 }
565
566 /*
567  * mount options
568  */
569 enum {
570         Opt_last_int,
571         /* int args above */
572         Opt_last_string,
573         /* string args above */
574         Opt_read_only,
575         Opt_read_write,
576         /* Boolean args above */
577         Opt_last_bool,
578 };
579
580 static match_table_t rbd_opts_tokens = {
581         /* int args above */
582         /* string args above */
583         {Opt_read_only, "read_only"},
584         {Opt_read_only, "ro"},          /* Alternate spelling */
585         {Opt_read_write, "read_write"},
586         {Opt_read_write, "rw"},         /* Alternate spelling */
587         /* Boolean args above */
588         {-1, NULL}
589 };
590
591 struct rbd_options {
592         bool    read_only;
593 };
594
595 #define RBD_READ_ONLY_DEFAULT   false
596
597 static int parse_rbd_opts_token(char *c, void *private)
598 {
599         struct rbd_options *rbd_opts = private;
600         substring_t argstr[MAX_OPT_ARGS];
601         int token, intval, ret;
602
603         token = match_token(c, rbd_opts_tokens, argstr);
604         if (token < 0)
605                 return -EINVAL;
606
607         if (token < Opt_last_int) {
608                 ret = match_int(&argstr[0], &intval);
609                 if (ret < 0) {
610                         pr_err("bad mount option arg (not int) "
611                                "at '%s'\n", c);
612                         return ret;
613                 }
614                 dout("got int token %d val %d\n", token, intval);
615         } else if (token > Opt_last_int && token < Opt_last_string) {
616                 dout("got string token %d val %s\n", token,
617                      argstr[0].from);
618         } else if (token > Opt_last_string && token < Opt_last_bool) {
619                 dout("got Boolean token %d\n", token);
620         } else {
621                 dout("got token %d\n", token);
622         }
623
624         switch (token) {
625         case Opt_read_only:
626                 rbd_opts->read_only = true;
627                 break;
628         case Opt_read_write:
629                 rbd_opts->read_only = false;
630                 break;
631         default:
632                 rbd_assert(false);
633                 break;
634         }
635         return 0;
636 }
637
638 /*
639  * Get a ceph client with specific addr and configuration, if one does
640  * not exist create it.
641  */
642 static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
643 {
644         struct rbd_client *rbdc;
645
646         rbdc = rbd_client_find(ceph_opts);
647         if (rbdc)       /* using an existing client */
648                 ceph_destroy_options(ceph_opts);
649         else
650                 rbdc = rbd_client_create(ceph_opts);
651
652         return rbdc;
653 }
654
655 /*
656  * Destroy ceph client
657  *
658  * Caller must hold rbd_client_list_lock.
659  */
660 static void rbd_client_release(struct kref *kref)
661 {
662         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
663
664         dout("%s: rbdc %p\n", __func__, rbdc);
665         spin_lock(&rbd_client_list_lock);
666         list_del(&rbdc->node);
667         spin_unlock(&rbd_client_list_lock);
668
669         ceph_destroy_client(rbdc->client);
670         kfree(rbdc);
671 }
672
673 /*
674  * Drop reference to ceph client node. If it's not referenced anymore, release
675  * it.
676  */
677 static void rbd_put_client(struct rbd_client *rbdc)
678 {
679         if (rbdc)
680                 kref_put(&rbdc->kref, rbd_client_release);
681 }
682
683 static bool rbd_image_format_valid(u32 image_format)
684 {
685         return image_format == 1 || image_format == 2;
686 }
687
688 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
689 {
690         size_t size;
691         u32 snap_count;
692
693         /* The header has to start with the magic rbd header text */
694         if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
695                 return false;
696
697         /* The bio layer requires at least sector-sized I/O */
698
699         if (ondisk->options.order < SECTOR_SHIFT)
700                 return false;
701
702         /* If we use u64 in a few spots we may be able to loosen this */
703
704         if (ondisk->options.order > 8 * sizeof (int) - 1)
705                 return false;
706
707         /*
708          * The size of a snapshot header has to fit in a size_t, and
709          * that limits the number of snapshots.
710          */
711         snap_count = le32_to_cpu(ondisk->snap_count);
712         size = SIZE_MAX - sizeof (struct ceph_snap_context);
713         if (snap_count > size / sizeof (__le64))
714                 return false;
715
716         /*
717          * Not only that, but the size of the entire the snapshot
718          * header must also be representable in a size_t.
719          */
720         size -= snap_count * sizeof (__le64);
721         if ((u64) size < le64_to_cpu(ondisk->snap_names_len))
722                 return false;
723
724         return true;
725 }
726
727 /*
728  * Create a new header structure, translate header format from the on-disk
729  * header.
730  */
731 static int rbd_header_from_disk(struct rbd_image_header *header,
732                                  struct rbd_image_header_ondisk *ondisk)
733 {
734         u32 snap_count;
735         size_t len;
736         size_t size;
737         u32 i;
738
739         memset(header, 0, sizeof (*header));
740
741         snap_count = le32_to_cpu(ondisk->snap_count);
742
743         len = strnlen(ondisk->object_prefix, sizeof (ondisk->object_prefix));
744         header->object_prefix = kmalloc(len + 1, GFP_KERNEL);
745         if (!header->object_prefix)
746                 return -ENOMEM;
747         memcpy(header->object_prefix, ondisk->object_prefix, len);
748         header->object_prefix[len] = '\0';
749
750         if (snap_count) {
751                 u64 snap_names_len = le64_to_cpu(ondisk->snap_names_len);
752
753                 /* Save a copy of the snapshot names */
754
755                 if (snap_names_len > (u64) SIZE_MAX)
756                         return -EIO;
757                 header->snap_names = kmalloc(snap_names_len, GFP_KERNEL);
758                 if (!header->snap_names)
759                         goto out_err;
760                 /*
761                  * Note that rbd_dev_v1_header_read() guarantees
762                  * the ondisk buffer we're working with has
763                  * snap_names_len bytes beyond the end of the
764                  * snapshot id array, this memcpy() is safe.
765                  */
766                 memcpy(header->snap_names, &ondisk->snaps[snap_count],
767                         snap_names_len);
768
769                 /* Record each snapshot's size */
770
771                 size = snap_count * sizeof (*header->snap_sizes);
772                 header->snap_sizes = kmalloc(size, GFP_KERNEL);
773                 if (!header->snap_sizes)
774                         goto out_err;
775                 for (i = 0; i < snap_count; i++)
776                         header->snap_sizes[i] =
777                                 le64_to_cpu(ondisk->snaps[i].image_size);
778         } else {
779                 WARN_ON(ondisk->snap_names_len);
780                 header->snap_names = NULL;
781                 header->snap_sizes = NULL;
782         }
783
784         header->features = 0;   /* No features support in v1 images */
785         header->obj_order = ondisk->options.order;
786         header->crypt_type = ondisk->options.crypt_type;
787         header->comp_type = ondisk->options.comp_type;
788
789         /* Allocate and fill in the snapshot context */
790
791         header->image_size = le64_to_cpu(ondisk->image_size);
792         size = sizeof (struct ceph_snap_context);
793         size += snap_count * sizeof (header->snapc->snaps[0]);
794         header->snapc = kzalloc(size, GFP_KERNEL);
795         if (!header->snapc)
796                 goto out_err;
797
798         atomic_set(&header->snapc->nref, 1);
799         header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
800         header->snapc->num_snaps = snap_count;
801         for (i = 0; i < snap_count; i++)
802                 header->snapc->snaps[i] =
803                         le64_to_cpu(ondisk->snaps[i].id);
804
805         return 0;
806
807 out_err:
808         kfree(header->snap_sizes);
809         header->snap_sizes = NULL;
810         kfree(header->snap_names);
811         header->snap_names = NULL;
812         kfree(header->object_prefix);
813         header->object_prefix = NULL;
814
815         return -ENOMEM;
816 }
817
818 static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
819 {
820         struct rbd_snap *snap;
821
822         if (snap_id == CEPH_NOSNAP)
823                 return RBD_SNAP_HEAD_NAME;
824
825         list_for_each_entry(snap, &rbd_dev->snaps, node)
826                 if (snap_id == snap->id)
827                         return snap->name;
828
829         return NULL;
830 }
831
832 static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
833 {
834
835         struct rbd_snap *snap;
836
837         list_for_each_entry(snap, &rbd_dev->snaps, node) {
838                 if (!strcmp(snap_name, snap->name)) {
839                         rbd_dev->spec->snap_id = snap->id;
840                         rbd_dev->mapping.size = snap->size;
841                         rbd_dev->mapping.features = snap->features;
842
843                         return 0;
844                 }
845         }
846
847         return -ENOENT;
848 }
849
850 static int rbd_dev_set_mapping(struct rbd_device *rbd_dev)
851 {
852         int ret;
853
854         if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME,
855                     sizeof (RBD_SNAP_HEAD_NAME))) {
856                 rbd_dev->spec->snap_id = CEPH_NOSNAP;
857                 rbd_dev->mapping.size = rbd_dev->header.image_size;
858                 rbd_dev->mapping.features = rbd_dev->header.features;
859                 ret = 0;
860         } else {
861                 ret = snap_by_name(rbd_dev, rbd_dev->spec->snap_name);
862                 if (ret < 0)
863                         goto done;
864                 rbd_dev->mapping.read_only = true;
865         }
866         set_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
867
868 done:
869         return ret;
870 }
871
872 static void rbd_header_free(struct rbd_image_header *header)
873 {
874         kfree(header->object_prefix);
875         header->object_prefix = NULL;
876         kfree(header->snap_sizes);
877         header->snap_sizes = NULL;
878         kfree(header->snap_names);
879         header->snap_names = NULL;
880         ceph_put_snap_context(header->snapc);
881         header->snapc = NULL;
882 }
883
884 static const char *rbd_segment_name(struct rbd_device *rbd_dev, u64 offset)
885 {
886         char *name;
887         u64 segment;
888         int ret;
889
890         name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO);
891         if (!name)
892                 return NULL;
893         segment = offset >> rbd_dev->header.obj_order;
894         ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
895                         rbd_dev->header.object_prefix, segment);
896         if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
897                 pr_err("error formatting segment name for #%llu (%d)\n",
898                         segment, ret);
899                 kfree(name);
900                 name = NULL;
901         }
902
903         return name;
904 }
905
906 static u64 rbd_segment_offset(struct rbd_device *rbd_dev, u64 offset)
907 {
908         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
909
910         return offset & (segment_size - 1);
911 }
912
913 static u64 rbd_segment_length(struct rbd_device *rbd_dev,
914                                 u64 offset, u64 length)
915 {
916         u64 segment_size = (u64) 1 << rbd_dev->header.obj_order;
917
918         offset &= segment_size - 1;
919
920         rbd_assert(length <= U64_MAX - offset);
921         if (offset + length > segment_size)
922                 length = segment_size - offset;
923
924         return length;
925 }
926
927 /*
928  * returns the size of an object in the image
929  */
930 static u64 rbd_obj_bytes(struct rbd_image_header *header)
931 {
932         return 1 << header->obj_order;
933 }
934
935 /*
936  * bio helpers
937  */
938
939 static void bio_chain_put(struct bio *chain)
940 {
941         struct bio *tmp;
942
943         while (chain) {
944                 tmp = chain;
945                 chain = chain->bi_next;
946                 bio_put(tmp);
947         }
948 }
949
950 /*
951  * zeros a bio chain, starting at specific offset
952  */
953 static void zero_bio_chain(struct bio *chain, int start_ofs)
954 {
955         struct bio_vec *bv;
956         unsigned long flags;
957         void *buf;
958         int i;
959         int pos = 0;
960
961         while (chain) {
962                 bio_for_each_segment(bv, chain, i) {
963                         if (pos + bv->bv_len > start_ofs) {
964                                 int remainder = max(start_ofs - pos, 0);
965                                 buf = bvec_kmap_irq(bv, &flags);
966                                 memset(buf + remainder, 0,
967                                        bv->bv_len - remainder);
968                                 bvec_kunmap_irq(buf, &flags);
969                         }
970                         pos += bv->bv_len;
971                 }
972
973                 chain = chain->bi_next;
974         }
975 }
976
977 /*
978  * similar to zero_bio_chain(), zeros data defined by a page array,
979  * starting at the given byte offset from the start of the array and
980  * continuing up to the given end offset.  The pages array is
981  * assumed to be big enough to hold all bytes up to the end.
982  */
983 static void zero_pages(struct page **pages, u64 offset, u64 end)
984 {
985         struct page **page = &pages[offset >> PAGE_SHIFT];
986
987         rbd_assert(end > offset);
988         rbd_assert(end - offset <= (u64)SIZE_MAX);
989         while (offset < end) {
990                 size_t page_offset;
991                 size_t length;
992                 unsigned long flags;
993                 void *kaddr;
994
995                 page_offset = (size_t)(offset & ~PAGE_MASK);
996                 length = min(PAGE_SIZE - page_offset, (size_t)(end - offset));
997                 local_irq_save(flags);
998                 kaddr = kmap_atomic(*page);
999                 memset(kaddr + page_offset, 0, length);
1000                 kunmap_atomic(kaddr);
1001                 local_irq_restore(flags);
1002
1003                 offset += length;
1004                 page++;
1005         }
1006 }
1007
1008 /*
1009  * Clone a portion of a bio, starting at the given byte offset
1010  * and continuing for the number of bytes indicated.
1011  */
1012 static struct bio *bio_clone_range(struct bio *bio_src,
1013                                         unsigned int offset,
1014                                         unsigned int len,
1015                                         gfp_t gfpmask)
1016 {
1017         struct bio_vec *bv;
1018         unsigned int resid;
1019         unsigned short idx;
1020         unsigned int voff;
1021         unsigned short end_idx;
1022         unsigned short vcnt;
1023         struct bio *bio;
1024
1025         /* Handle the easy case for the caller */
1026
1027         if (!offset && len == bio_src->bi_size)
1028                 return bio_clone(bio_src, gfpmask);
1029
1030         if (WARN_ON_ONCE(!len))
1031                 return NULL;
1032         if (WARN_ON_ONCE(len > bio_src->bi_size))
1033                 return NULL;
1034         if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
1035                 return NULL;
1036
1037         /* Find first affected segment... */
1038
1039         resid = offset;
1040         __bio_for_each_segment(bv, bio_src, idx, 0) {
1041                 if (resid < bv->bv_len)
1042                         break;
1043                 resid -= bv->bv_len;
1044         }
1045         voff = resid;
1046
1047         /* ...and the last affected segment */
1048
1049         resid += len;
1050         __bio_for_each_segment(bv, bio_src, end_idx, idx) {
1051                 if (resid <= bv->bv_len)
1052                         break;
1053                 resid -= bv->bv_len;
1054         }
1055         vcnt = end_idx - idx + 1;
1056
1057         /* Build the clone */
1058
1059         bio = bio_alloc(gfpmask, (unsigned int) vcnt);
1060         if (!bio)
1061                 return NULL;    /* ENOMEM */
1062
1063         bio->bi_bdev = bio_src->bi_bdev;
1064         bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
1065         bio->bi_rw = bio_src->bi_rw;
1066         bio->bi_flags |= 1 << BIO_CLONED;
1067
1068         /*
1069          * Copy over our part of the bio_vec, then update the first
1070          * and last (or only) entries.
1071          */
1072         memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
1073                         vcnt * sizeof (struct bio_vec));
1074         bio->bi_io_vec[0].bv_offset += voff;
1075         if (vcnt > 1) {
1076                 bio->bi_io_vec[0].bv_len -= voff;
1077                 bio->bi_io_vec[vcnt - 1].bv_len = resid;
1078         } else {
1079                 bio->bi_io_vec[0].bv_len = len;
1080         }
1081
1082         bio->bi_vcnt = vcnt;
1083         bio->bi_size = len;
1084         bio->bi_idx = 0;
1085
1086         return bio;
1087 }
1088
1089 /*
1090  * Clone a portion of a bio chain, starting at the given byte offset
1091  * into the first bio in the source chain and continuing for the
1092  * number of bytes indicated.  The result is another bio chain of
1093  * exactly the given length, or a null pointer on error.
1094  *
1095  * The bio_src and offset parameters are both in-out.  On entry they
1096  * refer to the first source bio and the offset into that bio where
1097  * the start of data to be cloned is located.
1098  *
1099  * On return, bio_src is updated to refer to the bio in the source
1100  * chain that contains first un-cloned byte, and *offset will
1101  * contain the offset of that byte within that bio.
1102  */
1103 static struct bio *bio_chain_clone_range(struct bio **bio_src,
1104                                         unsigned int *offset,
1105                                         unsigned int len,
1106                                         gfp_t gfpmask)
1107 {
1108         struct bio *bi = *bio_src;
1109         unsigned int off = *offset;
1110         struct bio *chain = NULL;
1111         struct bio **end;
1112
1113         /* Build up a chain of clone bios up to the limit */
1114
1115         if (!bi || off >= bi->bi_size || !len)
1116                 return NULL;            /* Nothing to clone */
1117
1118         end = &chain;
1119         while (len) {
1120                 unsigned int bi_size;
1121                 struct bio *bio;
1122
1123                 if (!bi) {
1124                         rbd_warn(NULL, "bio_chain exhausted with %u left", len);
1125                         goto out_err;   /* EINVAL; ran out of bio's */
1126                 }
1127                 bi_size = min_t(unsigned int, bi->bi_size - off, len);
1128                 bio = bio_clone_range(bi, off, bi_size, gfpmask);
1129                 if (!bio)
1130                         goto out_err;   /* ENOMEM */
1131
1132                 *end = bio;
1133                 end = &bio->bi_next;
1134
1135                 off += bi_size;
1136                 if (off == bi->bi_size) {
1137                         bi = bi->bi_next;
1138                         off = 0;
1139                 }
1140                 len -= bi_size;
1141         }
1142         *bio_src = bi;
1143         *offset = off;
1144
1145         return chain;
1146 out_err:
1147         bio_chain_put(chain);
1148
1149         return NULL;
1150 }
1151
1152 /*
1153  * The default/initial value for all object request flags is 0.  For
1154  * each flag, once its value is set to 1 it is never reset to 0
1155  * again.
1156  */
1157 static void obj_request_img_data_set(struct rbd_obj_request *obj_request)
1158 {
1159         if (test_and_set_bit(OBJ_REQ_IMG_DATA, &obj_request->flags)) {
1160                 struct rbd_device *rbd_dev;
1161
1162                 rbd_dev = obj_request->img_request->rbd_dev;
1163                 rbd_warn(rbd_dev, "obj_request %p already marked img_data\n",
1164                         obj_request);
1165         }
1166 }
1167
1168 static bool obj_request_img_data_test(struct rbd_obj_request *obj_request)
1169 {
1170         smp_mb();
1171         return test_bit(OBJ_REQ_IMG_DATA, &obj_request->flags) != 0;
1172 }
1173
1174 static void obj_request_done_set(struct rbd_obj_request *obj_request)
1175 {
1176         if (test_and_set_bit(OBJ_REQ_DONE, &obj_request->flags)) {
1177                 struct rbd_device *rbd_dev = NULL;
1178
1179                 if (obj_request_img_data_test(obj_request))
1180                         rbd_dev = obj_request->img_request->rbd_dev;
1181                 rbd_warn(rbd_dev, "obj_request %p already marked done\n",
1182                         obj_request);
1183         }
1184 }
1185
1186 static bool obj_request_done_test(struct rbd_obj_request *obj_request)
1187 {
1188         smp_mb();
1189         return test_bit(OBJ_REQ_DONE, &obj_request->flags) != 0;
1190 }
1191
1192 /*
1193  * This sets the KNOWN flag after (possibly) setting the EXISTS
1194  * flag.  The latter is set based on the "exists" value provided.
1195  *
1196  * Note that for our purposes once an object exists it never goes
1197  * away again.  It's possible that the response from two existence
1198  * checks are separated by the creation of the target object, and
1199  * the first ("doesn't exist") response arrives *after* the second
1200  * ("does exist").  In that case we ignore the second one.
1201  */
1202 static void obj_request_existence_set(struct rbd_obj_request *obj_request,
1203                                 bool exists)
1204 {
1205         if (exists)
1206                 set_bit(OBJ_REQ_EXISTS, &obj_request->flags);
1207         set_bit(OBJ_REQ_KNOWN, &obj_request->flags);
1208         smp_mb();
1209 }
1210
1211 static bool obj_request_known_test(struct rbd_obj_request *obj_request)
1212 {
1213         smp_mb();
1214         return test_bit(OBJ_REQ_KNOWN, &obj_request->flags) != 0;
1215 }
1216
1217 static bool obj_request_exists_test(struct rbd_obj_request *obj_request)
1218 {
1219         smp_mb();
1220         return test_bit(OBJ_REQ_EXISTS, &obj_request->flags) != 0;
1221 }
1222
1223 static void rbd_obj_request_get(struct rbd_obj_request *obj_request)
1224 {
1225         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1226                 atomic_read(&obj_request->kref.refcount));
1227         kref_get(&obj_request->kref);
1228 }
1229
1230 static void rbd_obj_request_destroy(struct kref *kref);
1231 static void rbd_obj_request_put(struct rbd_obj_request *obj_request)
1232 {
1233         rbd_assert(obj_request != NULL);
1234         dout("%s: obj %p (was %d)\n", __func__, obj_request,
1235                 atomic_read(&obj_request->kref.refcount));
1236         kref_put(&obj_request->kref, rbd_obj_request_destroy);
1237 }
1238
1239 static void rbd_img_request_get(struct rbd_img_request *img_request)
1240 {
1241         dout("%s: img %p (was %d)\n", __func__, img_request,
1242                 atomic_read(&img_request->kref.refcount));
1243         kref_get(&img_request->kref);
1244 }
1245
1246 static void rbd_img_request_destroy(struct kref *kref);
1247 static void rbd_img_request_put(struct rbd_img_request *img_request)
1248 {
1249         rbd_assert(img_request != NULL);
1250         dout("%s: img %p (was %d)\n", __func__, img_request,
1251                 atomic_read(&img_request->kref.refcount));
1252         kref_put(&img_request->kref, rbd_img_request_destroy);
1253 }
1254
1255 static inline void rbd_img_obj_request_add(struct rbd_img_request *img_request,
1256                                         struct rbd_obj_request *obj_request)
1257 {
1258         rbd_assert(obj_request->img_request == NULL);
1259
1260         /* Image request now owns object's original reference */
1261         obj_request->img_request = img_request;
1262         obj_request->which = img_request->obj_request_count;
1263         rbd_assert(!obj_request_img_data_test(obj_request));
1264         obj_request_img_data_set(obj_request);
1265         rbd_assert(obj_request->which != BAD_WHICH);
1266         img_request->obj_request_count++;
1267         list_add_tail(&obj_request->links, &img_request->obj_requests);
1268         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1269                 obj_request->which);
1270 }
1271
1272 static inline void rbd_img_obj_request_del(struct rbd_img_request *img_request,
1273                                         struct rbd_obj_request *obj_request)
1274 {
1275         rbd_assert(obj_request->which != BAD_WHICH);
1276
1277         dout("%s: img %p obj %p w=%u\n", __func__, img_request, obj_request,
1278                 obj_request->which);
1279         list_del(&obj_request->links);
1280         rbd_assert(img_request->obj_request_count > 0);
1281         img_request->obj_request_count--;
1282         rbd_assert(obj_request->which == img_request->obj_request_count);
1283         obj_request->which = BAD_WHICH;
1284         rbd_assert(obj_request_img_data_test(obj_request));
1285         rbd_assert(obj_request->img_request == img_request);
1286         obj_request->img_request = NULL;
1287         obj_request->callback = NULL;
1288         rbd_obj_request_put(obj_request);
1289 }
1290
1291 static bool obj_request_type_valid(enum obj_request_type type)
1292 {
1293         switch (type) {
1294         case OBJ_REQUEST_NODATA:
1295         case OBJ_REQUEST_BIO:
1296         case OBJ_REQUEST_PAGES:
1297                 return true;
1298         default:
1299                 return false;
1300         }
1301 }
1302
1303 static int rbd_obj_request_submit(struct ceph_osd_client *osdc,
1304                                 struct rbd_obj_request *obj_request)
1305 {
1306         dout("%s: osdc %p obj %p\n", __func__, osdc, obj_request);
1307
1308         return ceph_osdc_start_request(osdc, obj_request->osd_req, false);
1309 }
1310
1311 static void rbd_img_request_complete(struct rbd_img_request *img_request)
1312 {
1313
1314         dout("%s: img %p\n", __func__, img_request);
1315
1316         /*
1317          * If no error occurred, compute the aggregate transfer
1318          * count for the image request.  We could instead use
1319          * atomic64_cmpxchg() to update it as each object request
1320          * completes; not clear which way is better off hand.
1321          */
1322         if (!img_request->result) {
1323                 struct rbd_obj_request *obj_request;
1324                 u64 xferred = 0;
1325
1326                 for_each_obj_request(img_request, obj_request)
1327                         xferred += obj_request->xferred;
1328                 img_request->xferred = xferred;
1329         }
1330
1331         if (img_request->callback)
1332                 img_request->callback(img_request);
1333         else
1334                 rbd_img_request_put(img_request);
1335 }
1336
1337 /* Caller is responsible for rbd_obj_request_destroy(obj_request) */
1338
1339 static int rbd_obj_request_wait(struct rbd_obj_request *obj_request)
1340 {
1341         dout("%s: obj %p\n", __func__, obj_request);
1342
1343         return wait_for_completion_interruptible(&obj_request->completion);
1344 }
1345
1346 /*
1347  * The default/initial value for all image request flags is 0.  Each
1348  * is conditionally set to 1 at image request initialization time
1349  * and currently never change thereafter.
1350  */
1351 static void img_request_write_set(struct rbd_img_request *img_request)
1352 {
1353         set_bit(IMG_REQ_WRITE, &img_request->flags);
1354         smp_mb();
1355 }
1356
1357 static bool img_request_write_test(struct rbd_img_request *img_request)
1358 {
1359         smp_mb();
1360         return test_bit(IMG_REQ_WRITE, &img_request->flags) != 0;
1361 }
1362
1363 static void img_request_child_set(struct rbd_img_request *img_request)
1364 {
1365         set_bit(IMG_REQ_CHILD, &img_request->flags);
1366         smp_mb();
1367 }
1368
1369 static bool img_request_child_test(struct rbd_img_request *img_request)
1370 {
1371         smp_mb();
1372         return test_bit(IMG_REQ_CHILD, &img_request->flags) != 0;
1373 }
1374
1375 static void img_request_layered_set(struct rbd_img_request *img_request)
1376 {
1377         set_bit(IMG_REQ_LAYERED, &img_request->flags);
1378         smp_mb();
1379 }
1380
1381 static bool img_request_layered_test(struct rbd_img_request *img_request)
1382 {
1383         smp_mb();
1384         return test_bit(IMG_REQ_LAYERED, &img_request->flags) != 0;
1385 }
1386
1387 static void
1388 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
1389 {
1390         u64 xferred = obj_request->xferred;
1391         u64 length = obj_request->length;
1392
1393         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1394                 obj_request, obj_request->img_request, obj_request->result,
1395                 xferred, length);
1396         /*
1397          * ENOENT means a hole in the image.  We zero-fill the
1398          * entire length of the request.  A short read also implies
1399          * zero-fill to the end of the request.  Either way we
1400          * update the xferred count to indicate the whole request
1401          * was satisfied.
1402          */
1403         rbd_assert(obj_request->type != OBJ_REQUEST_NODATA);
1404         if (obj_request->result == -ENOENT) {
1405                 if (obj_request->type == OBJ_REQUEST_BIO)
1406                         zero_bio_chain(obj_request->bio_list, 0);
1407                 else
1408                         zero_pages(obj_request->pages, 0, length);
1409                 obj_request->result = 0;
1410                 obj_request->xferred = length;
1411         } else if (xferred < length && !obj_request->result) {
1412                 if (obj_request->type == OBJ_REQUEST_BIO)
1413                         zero_bio_chain(obj_request->bio_list, xferred);
1414                 else
1415                         zero_pages(obj_request->pages, xferred, length);
1416                 obj_request->xferred = length;
1417         }
1418         obj_request_done_set(obj_request);
1419 }
1420
1421 static void rbd_obj_request_complete(struct rbd_obj_request *obj_request)
1422 {
1423         dout("%s: obj %p cb %p\n", __func__, obj_request,
1424                 obj_request->callback);
1425         if (obj_request->callback)
1426                 obj_request->callback(obj_request);
1427         else
1428                 complete_all(&obj_request->completion);
1429 }
1430
1431 static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
1432 {
1433         dout("%s: obj %p\n", __func__, obj_request);
1434         obj_request_done_set(obj_request);
1435 }
1436
1437 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
1438 {
1439         struct rbd_img_request *img_request = NULL;
1440         struct rbd_device *rbd_dev = NULL;
1441         bool layered = false;
1442
1443         if (obj_request_img_data_test(obj_request)) {
1444                 img_request = obj_request->img_request;
1445                 layered = img_request && img_request_layered_test(img_request);
1446                 rbd_dev = img_request->rbd_dev;
1447         }
1448
1449         dout("%s: obj %p img %p result %d %llu/%llu\n", __func__,
1450                 obj_request, img_request, obj_request->result,
1451                 obj_request->xferred, obj_request->length);
1452         if (layered && obj_request->result == -ENOENT &&
1453                         obj_request->img_offset < rbd_dev->parent_overlap)
1454                 rbd_img_parent_read(obj_request);
1455         else if (img_request)
1456                 rbd_img_obj_request_read_callback(obj_request);
1457         else
1458                 obj_request_done_set(obj_request);
1459 }
1460
1461 static void rbd_osd_write_callback(struct rbd_obj_request *obj_request)
1462 {
1463         dout("%s: obj %p result %d %llu\n", __func__, obj_request,
1464                 obj_request->result, obj_request->length);
1465         /*
1466          * There is no such thing as a successful short write.  Set
1467          * it to our originally-requested length.
1468          */
1469         obj_request->xferred = obj_request->length;
1470         obj_request_done_set(obj_request);
1471 }
1472
1473 /*
1474  * For a simple stat call there's nothing to do.  We'll do more if
1475  * this is part of a write sequence for a layered image.
1476  */
1477 static void rbd_osd_stat_callback(struct rbd_obj_request *obj_request)
1478 {
1479         dout("%s: obj %p\n", __func__, obj_request);
1480         obj_request_done_set(obj_request);
1481 }
1482
1483 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req,
1484                                 struct ceph_msg *msg)
1485 {
1486         struct rbd_obj_request *obj_request = osd_req->r_priv;
1487         u16 opcode;
1488
1489         dout("%s: osd_req %p msg %p\n", __func__, osd_req, msg);
1490         rbd_assert(osd_req == obj_request->osd_req);
1491         if (obj_request_img_data_test(obj_request)) {
1492                 rbd_assert(obj_request->img_request);
1493                 rbd_assert(obj_request->which != BAD_WHICH);
1494         } else {
1495                 rbd_assert(obj_request->which == BAD_WHICH);
1496         }
1497
1498         if (osd_req->r_result < 0)
1499                 obj_request->result = osd_req->r_result;
1500         obj_request->version = le64_to_cpu(osd_req->r_reassert_version.version);
1501
1502         BUG_ON(osd_req->r_num_ops > 2);
1503
1504         /*
1505          * We support a 64-bit length, but ultimately it has to be
1506          * passed to blk_end_request(), which takes an unsigned int.
1507          */
1508         obj_request->xferred = osd_req->r_reply_op_len[0];
1509         rbd_assert(obj_request->xferred < (u64)UINT_MAX);
1510         opcode = osd_req->r_ops[0].op;
1511         switch (opcode) {
1512         case CEPH_OSD_OP_READ:
1513                 rbd_osd_read_callback(obj_request);
1514                 break;
1515         case CEPH_OSD_OP_WRITE:
1516                 rbd_osd_write_callback(obj_request);
1517                 break;
1518         case CEPH_OSD_OP_STAT:
1519                 rbd_osd_stat_callback(obj_request);
1520                 break;
1521         case CEPH_OSD_OP_CALL:
1522         case CEPH_OSD_OP_NOTIFY_ACK:
1523         case CEPH_OSD_OP_WATCH:
1524                 rbd_osd_trivial_callback(obj_request);
1525                 break;
1526         default:
1527                 rbd_warn(NULL, "%s: unsupported op %hu\n",
1528                         obj_request->object_name, (unsigned short) opcode);
1529                 break;
1530         }
1531
1532         if (obj_request_done_test(obj_request))
1533                 rbd_obj_request_complete(obj_request);
1534 }
1535
1536 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
1537 {
1538         struct rbd_img_request *img_request = obj_request->img_request;
1539         struct ceph_osd_request *osd_req = obj_request->osd_req;
1540         u64 snap_id;
1541
1542         rbd_assert(osd_req != NULL);
1543
1544         snap_id = img_request ? img_request->snap_id : CEPH_NOSNAP;
1545         ceph_osdc_build_request(osd_req, obj_request->offset,
1546                         NULL, snap_id, NULL);
1547 }
1548
1549 static void rbd_osd_req_format_write(struct rbd_obj_request *obj_request)
1550 {
1551         struct rbd_img_request *img_request = obj_request->img_request;
1552         struct ceph_osd_request *osd_req = obj_request->osd_req;
1553         struct ceph_snap_context *snapc;
1554         struct timespec mtime = CURRENT_TIME;
1555
1556         rbd_assert(osd_req != NULL);
1557
1558         snapc = img_request ? img_request->snapc : NULL;
1559         ceph_osdc_build_request(osd_req, obj_request->offset,
1560                         snapc, CEPH_NOSNAP, &mtime);
1561 }
1562
1563 static struct ceph_osd_request *rbd_osd_req_create(
1564                                         struct rbd_device *rbd_dev,
1565                                         bool write_request,
1566                                         struct rbd_obj_request *obj_request)
1567 {
1568         struct ceph_snap_context *snapc = NULL;
1569         struct ceph_osd_client *osdc;
1570         struct ceph_osd_request *osd_req;
1571
1572         if (obj_request_img_data_test(obj_request)) {
1573                 struct rbd_img_request *img_request = obj_request->img_request;
1574
1575                 rbd_assert(write_request ==
1576                                 img_request_write_test(img_request));
1577                 if (write_request)
1578                         snapc = img_request->snapc;
1579         }
1580
1581         /* Allocate and initialize the request, for the single op */
1582
1583         osdc = &rbd_dev->rbd_client->client->osdc;
1584         osd_req = ceph_osdc_alloc_request(osdc, snapc, 1, false, GFP_ATOMIC);
1585         if (!osd_req)
1586                 return NULL;    /* ENOMEM */
1587
1588         if (write_request)
1589                 osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1590         else
1591                 osd_req->r_flags = CEPH_OSD_FLAG_READ;
1592
1593         osd_req->r_callback = rbd_osd_req_callback;
1594         osd_req->r_priv = obj_request;
1595
1596         osd_req->r_oid_len = strlen(obj_request->object_name);
1597         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1598         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1599
1600         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1601
1602         return osd_req;
1603 }
1604
1605 /*
1606  * Create a copyup osd request based on the information in the
1607  * object request supplied.  A copyup request has two osd ops,
1608  * a copyup method call, and a "normal" write request.
1609  */
1610 static struct ceph_osd_request *
1611 rbd_osd_req_create_copyup(struct rbd_obj_request *obj_request)
1612 {
1613         struct rbd_img_request *img_request;
1614         struct ceph_snap_context *snapc;
1615         struct rbd_device *rbd_dev;
1616         struct ceph_osd_client *osdc;
1617         struct ceph_osd_request *osd_req;
1618
1619         rbd_assert(obj_request_img_data_test(obj_request));
1620         img_request = obj_request->img_request;
1621         rbd_assert(img_request);
1622         rbd_assert(img_request_write_test(img_request));
1623
1624         /* Allocate and initialize the request, for the two ops */
1625
1626         snapc = img_request->snapc;
1627         rbd_dev = img_request->rbd_dev;
1628         osdc = &rbd_dev->rbd_client->client->osdc;
1629         osd_req = ceph_osdc_alloc_request(osdc, snapc, 2, false, GFP_ATOMIC);
1630         if (!osd_req)
1631                 return NULL;    /* ENOMEM */
1632
1633         osd_req->r_flags = CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK;
1634         osd_req->r_callback = rbd_osd_req_callback;
1635         osd_req->r_priv = obj_request;
1636
1637         osd_req->r_oid_len = strlen(obj_request->object_name);
1638         rbd_assert(osd_req->r_oid_len < sizeof (osd_req->r_oid));
1639         memcpy(osd_req->r_oid, obj_request->object_name, osd_req->r_oid_len);
1640
1641         osd_req->r_file_layout = rbd_dev->layout;       /* struct */
1642
1643         return osd_req;
1644 }
1645
1646
1647 static void rbd_osd_req_destroy(struct ceph_osd_request *osd_req)
1648 {
1649         ceph_osdc_put_request(osd_req);
1650 }
1651
1652 /* object_name is assumed to be a non-null pointer and NUL-terminated */
1653
1654 static struct rbd_obj_request *rbd_obj_request_create(const char *object_name,
1655                                                 u64 offset, u64 length,
1656                                                 enum obj_request_type type)
1657 {
1658         struct rbd_obj_request *obj_request;
1659         size_t size;
1660         char *name;
1661
1662         rbd_assert(obj_request_type_valid(type));
1663
1664         size = strlen(object_name) + 1;
1665         obj_request = kzalloc(sizeof (*obj_request) + size, GFP_KERNEL);
1666         if (!obj_request)
1667                 return NULL;
1668
1669         name = (char *)(obj_request + 1);
1670         obj_request->object_name = memcpy(name, object_name, size);
1671         obj_request->offset = offset;
1672         obj_request->length = length;
1673         obj_request->flags = 0;
1674         obj_request->which = BAD_WHICH;
1675         obj_request->type = type;
1676         INIT_LIST_HEAD(&obj_request->links);
1677         init_completion(&obj_request->completion);
1678         kref_init(&obj_request->kref);
1679
1680         dout("%s: \"%s\" %llu/%llu %d -> obj %p\n", __func__, object_name,
1681                 offset, length, (int)type, obj_request);
1682
1683         return obj_request;
1684 }
1685
1686 static void rbd_obj_request_destroy(struct kref *kref)
1687 {
1688         struct rbd_obj_request *obj_request;
1689
1690         obj_request = container_of(kref, struct rbd_obj_request, kref);
1691
1692         dout("%s: obj %p\n", __func__, obj_request);
1693
1694         rbd_assert(obj_request->img_request == NULL);
1695         rbd_assert(obj_request->which == BAD_WHICH);
1696
1697         if (obj_request->osd_req)
1698                 rbd_osd_req_destroy(obj_request->osd_req);
1699
1700         rbd_assert(obj_request_type_valid(obj_request->type));
1701         switch (obj_request->type) {
1702         case OBJ_REQUEST_NODATA:
1703                 break;          /* Nothing to do */
1704         case OBJ_REQUEST_BIO:
1705                 if (obj_request->bio_list)
1706                         bio_chain_put(obj_request->bio_list);
1707                 break;
1708         case OBJ_REQUEST_PAGES:
1709                 if (obj_request->pages)
1710                         ceph_release_page_vector(obj_request->pages,
1711                                                 obj_request->page_count);
1712                 break;
1713         }
1714
1715         kfree(obj_request);
1716 }
1717
1718 /*
1719  * Caller is responsible for filling in the list of object requests
1720  * that comprises the image request, and the Linux request pointer
1721  * (if there is one).
1722  */
1723 static struct rbd_img_request *rbd_img_request_create(
1724                                         struct rbd_device *rbd_dev,
1725                                         u64 offset, u64 length,
1726                                         bool write_request,
1727                                         bool child_request)
1728 {
1729         struct rbd_img_request *img_request;
1730         struct ceph_snap_context *snapc = NULL;
1731
1732         img_request = kmalloc(sizeof (*img_request), GFP_ATOMIC);
1733         if (!img_request)
1734                 return NULL;
1735
1736         if (write_request) {
1737                 down_read(&rbd_dev->header_rwsem);
1738                 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1739                 up_read(&rbd_dev->header_rwsem);
1740                 if (WARN_ON(!snapc)) {
1741                         kfree(img_request);
1742                         return NULL;    /* Shouldn't happen */
1743                 }
1744
1745         }
1746
1747         img_request->rq = NULL;
1748         img_request->rbd_dev = rbd_dev;
1749         img_request->offset = offset;
1750         img_request->length = length;
1751         img_request->flags = 0;
1752         if (write_request) {
1753                 img_request_write_set(img_request);
1754                 img_request->snapc = snapc;
1755         } else {
1756                 img_request->snap_id = rbd_dev->spec->snap_id;
1757         }
1758         if (child_request)
1759                 img_request_child_set(img_request);
1760         if (rbd_dev->parent_spec)
1761                 img_request_layered_set(img_request);
1762         spin_lock_init(&img_request->completion_lock);
1763         img_request->next_completion = 0;
1764         img_request->callback = NULL;
1765         img_request->result = 0;
1766         img_request->obj_request_count = 0;
1767         INIT_LIST_HEAD(&img_request->obj_requests);
1768         kref_init(&img_request->kref);
1769
1770         rbd_img_request_get(img_request);       /* Avoid a warning */
1771         rbd_img_request_put(img_request);       /* TEMPORARY */
1772
1773         dout("%s: rbd_dev %p %s %llu/%llu -> img %p\n", __func__, rbd_dev,
1774                 write_request ? "write" : "read", offset, length,
1775                 img_request);
1776
1777         return img_request;
1778 }
1779
1780 static void rbd_img_request_destroy(struct kref *kref)
1781 {
1782         struct rbd_img_request *img_request;
1783         struct rbd_obj_request *obj_request;
1784         struct rbd_obj_request *next_obj_request;
1785
1786         img_request = container_of(kref, struct rbd_img_request, kref);
1787
1788         dout("%s: img %p\n", __func__, img_request);
1789
1790         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
1791                 rbd_img_obj_request_del(img_request, obj_request);
1792         rbd_assert(img_request->obj_request_count == 0);
1793
1794         if (img_request_write_test(img_request))
1795                 ceph_put_snap_context(img_request->snapc);
1796
1797         if (img_request_child_test(img_request))
1798                 rbd_obj_request_put(img_request->obj_request);
1799
1800         kfree(img_request);
1801 }
1802
1803 static bool rbd_img_obj_end_request(struct rbd_obj_request *obj_request)
1804 {
1805         struct rbd_img_request *img_request;
1806         unsigned int xferred;
1807         int result;
1808         bool more;
1809
1810         rbd_assert(obj_request_img_data_test(obj_request));
1811         img_request = obj_request->img_request;
1812
1813         rbd_assert(obj_request->xferred <= (u64)UINT_MAX);
1814         xferred = (unsigned int)obj_request->xferred;
1815         result = obj_request->result;
1816         if (result) {
1817                 struct rbd_device *rbd_dev = img_request->rbd_dev;
1818
1819                 rbd_warn(rbd_dev, "%s %llx at %llx (%llx)\n",
1820                         img_request_write_test(img_request) ? "write" : "read",
1821                         obj_request->length, obj_request->img_offset,
1822                         obj_request->offset);
1823                 rbd_warn(rbd_dev, "  result %d xferred %x\n",
1824                         result, xferred);
1825                 if (!img_request->result)
1826                         img_request->result = result;
1827         }
1828
1829         /* Image object requests don't own their page array */
1830
1831         if (obj_request->type == OBJ_REQUEST_PAGES) {
1832                 obj_request->pages = NULL;
1833                 obj_request->page_count = 0;
1834         }
1835
1836         if (img_request_child_test(img_request)) {
1837                 rbd_assert(img_request->obj_request != NULL);
1838                 more = obj_request->which < img_request->obj_request_count - 1;
1839         } else {
1840                 rbd_assert(img_request->rq != NULL);
1841                 more = blk_end_request(img_request->rq, result, xferred);
1842         }
1843
1844         return more;
1845 }
1846
1847 static void rbd_img_obj_callback(struct rbd_obj_request *obj_request)
1848 {
1849         struct rbd_img_request *img_request;
1850         u32 which = obj_request->which;
1851         bool more = true;
1852
1853         rbd_assert(obj_request_img_data_test(obj_request));
1854         img_request = obj_request->img_request;
1855
1856         dout("%s: img %p obj %p\n", __func__, img_request, obj_request);
1857         rbd_assert(img_request != NULL);
1858         rbd_assert(img_request->obj_request_count > 0);
1859         rbd_assert(which != BAD_WHICH);
1860         rbd_assert(which < img_request->obj_request_count);
1861         rbd_assert(which >= img_request->next_completion);
1862
1863         spin_lock_irq(&img_request->completion_lock);
1864         if (which != img_request->next_completion)
1865                 goto out;
1866
1867         for_each_obj_request_from(img_request, obj_request) {
1868                 rbd_assert(more);
1869                 rbd_assert(which < img_request->obj_request_count);
1870
1871                 if (!obj_request_done_test(obj_request))
1872                         break;
1873                 more = rbd_img_obj_end_request(obj_request);
1874                 which++;
1875         }
1876
1877         rbd_assert(more ^ (which == img_request->obj_request_count));
1878         img_request->next_completion = which;
1879 out:
1880         spin_unlock_irq(&img_request->completion_lock);
1881
1882         if (!more)
1883                 rbd_img_request_complete(img_request);
1884 }
1885
1886 /*
1887  * Split up an image request into one or more object requests, each
1888  * to a different object.  The "type" parameter indicates whether
1889  * "data_desc" is the pointer to the head of a list of bio
1890  * structures, or the base of a page array.  In either case this
1891  * function assumes data_desc describes memory sufficient to hold
1892  * all data described by the image request.
1893  */
1894 static int rbd_img_request_fill(struct rbd_img_request *img_request,
1895                                         enum obj_request_type type,
1896                                         void *data_desc)
1897 {
1898         struct rbd_device *rbd_dev = img_request->rbd_dev;
1899         struct rbd_obj_request *obj_request = NULL;
1900         struct rbd_obj_request *next_obj_request;
1901         bool write_request = img_request_write_test(img_request);
1902         struct bio *bio_list;
1903         unsigned int bio_offset = 0;
1904         struct page **pages;
1905         u64 img_offset;
1906         u64 resid;
1907         u16 opcode;
1908
1909         dout("%s: img %p type %d data_desc %p\n", __func__, img_request,
1910                 (int)type, data_desc);
1911
1912         opcode = write_request ? CEPH_OSD_OP_WRITE : CEPH_OSD_OP_READ;
1913         img_offset = img_request->offset;
1914         resid = img_request->length;
1915         rbd_assert(resid > 0);
1916
1917         if (type == OBJ_REQUEST_BIO) {
1918                 bio_list = data_desc;
1919                 rbd_assert(img_offset == bio_list->bi_sector << SECTOR_SHIFT);
1920         } else {
1921                 rbd_assert(type == OBJ_REQUEST_PAGES);
1922                 pages = data_desc;
1923         }
1924
1925         while (resid) {
1926                 struct ceph_osd_request *osd_req;
1927                 const char *object_name;
1928                 u64 offset;
1929                 u64 length;
1930
1931                 object_name = rbd_segment_name(rbd_dev, img_offset);
1932                 if (!object_name)
1933                         goto out_unwind;
1934                 offset = rbd_segment_offset(rbd_dev, img_offset);
1935                 length = rbd_segment_length(rbd_dev, img_offset, resid);
1936                 obj_request = rbd_obj_request_create(object_name,
1937                                                 offset, length, type);
1938                 kfree(object_name);     /* object request has its own copy */
1939                 if (!obj_request)
1940                         goto out_unwind;
1941
1942                 if (type == OBJ_REQUEST_BIO) {
1943                         unsigned int clone_size;
1944
1945                         rbd_assert(length <= (u64)UINT_MAX);
1946                         clone_size = (unsigned int)length;
1947                         obj_request->bio_list =
1948                                         bio_chain_clone_range(&bio_list,
1949                                                                 &bio_offset,
1950                                                                 clone_size,
1951                                                                 GFP_ATOMIC);
1952                         if (!obj_request->bio_list)
1953                                 goto out_partial;
1954                 } else {
1955                         unsigned int page_count;
1956
1957                         obj_request->pages = pages;
1958                         page_count = (u32)calc_pages_for(offset, length);
1959                         obj_request->page_count = page_count;
1960                         if ((offset + length) & ~PAGE_MASK)
1961                                 page_count--;   /* more on last page */
1962                         pages += page_count;
1963                 }
1964
1965                 osd_req = rbd_osd_req_create(rbd_dev, write_request,
1966                                                 obj_request);
1967                 if (!osd_req)
1968                         goto out_partial;
1969                 obj_request->osd_req = osd_req;
1970                 obj_request->callback = rbd_img_obj_callback;
1971
1972                 osd_req_op_extent_init(osd_req, 0, opcode, offset, length,
1973                                                 0, 0);
1974                 if (type == OBJ_REQUEST_BIO)
1975                         osd_req_op_extent_osd_data_bio(osd_req, 0,
1976                                         obj_request->bio_list, length);
1977                 else
1978                         osd_req_op_extent_osd_data_pages(osd_req, 0,
1979                                         obj_request->pages, length,
1980                                         offset & ~PAGE_MASK, false, false);
1981
1982                 if (write_request)
1983                         rbd_osd_req_format_write(obj_request);
1984                 else
1985                         rbd_osd_req_format_read(obj_request);
1986
1987                 obj_request->img_offset = img_offset;
1988                 rbd_img_obj_request_add(img_request, obj_request);
1989
1990                 img_offset += length;
1991                 resid -= length;
1992         }
1993
1994         return 0;
1995
1996 out_partial:
1997         rbd_obj_request_put(obj_request);
1998 out_unwind:
1999         for_each_obj_request_safe(img_request, obj_request, next_obj_request)
2000                 rbd_obj_request_put(obj_request);
2001
2002         return -ENOMEM;
2003 }
2004
2005 static void
2006 rbd_img_obj_copyup_callback(struct rbd_obj_request *obj_request)
2007 {
2008         struct rbd_img_request *img_request;
2009         struct rbd_device *rbd_dev;
2010         u64 length;
2011         u32 page_count;
2012
2013         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2014         rbd_assert(obj_request_img_data_test(obj_request));
2015         img_request = obj_request->img_request;
2016         rbd_assert(img_request);
2017
2018         rbd_dev = img_request->rbd_dev;
2019         rbd_assert(rbd_dev);
2020         length = (u64)1 << rbd_dev->header.obj_order;
2021         page_count = (u32)calc_pages_for(0, length);
2022
2023         rbd_assert(obj_request->copyup_pages);
2024         ceph_release_page_vector(obj_request->copyup_pages, page_count);
2025         obj_request->copyup_pages = NULL;
2026
2027         /*
2028          * We want the transfer count to reflect the size of the
2029          * original write request.  There is no such thing as a
2030          * successful short write, so if the request was successful
2031          * we can just set it to the originally-requested length.
2032          */
2033         if (!obj_request->result)
2034                 obj_request->xferred = obj_request->length;
2035
2036         /* Finish up with the normal image object callback */
2037
2038         rbd_img_obj_callback(obj_request);
2039 }
2040
2041 static void
2042 rbd_img_obj_parent_read_full_callback(struct rbd_img_request *img_request)
2043 {
2044         struct rbd_obj_request *orig_request;
2045         struct ceph_osd_request *osd_req;
2046         struct ceph_osd_client *osdc;
2047         struct rbd_device *rbd_dev;
2048         struct page **pages;
2049         int result;
2050         u64 obj_size;
2051         u64 xferred;
2052
2053         rbd_assert(img_request_child_test(img_request));
2054
2055         /* First get what we need from the image request */
2056
2057         pages = img_request->copyup_pages;
2058         rbd_assert(pages != NULL);
2059         img_request->copyup_pages = NULL;
2060
2061         orig_request = img_request->obj_request;
2062         rbd_assert(orig_request != NULL);
2063         rbd_assert(orig_request->type == OBJ_REQUEST_BIO);
2064         result = img_request->result;
2065         obj_size = img_request->length;
2066         xferred = img_request->xferred;
2067
2068         rbd_dev = img_request->rbd_dev;
2069         rbd_assert(rbd_dev);
2070         rbd_assert(obj_size == (u64)1 << rbd_dev->header.obj_order);
2071
2072         rbd_img_request_put(img_request);
2073
2074         if (result)
2075                 goto out_err;
2076
2077         /* Allocate the new copyup osd request for the original request */
2078
2079         result = -ENOMEM;
2080         rbd_assert(!orig_request->osd_req);
2081         osd_req = rbd_osd_req_create_copyup(orig_request);
2082         if (!osd_req)
2083                 goto out_err;
2084         orig_request->osd_req = osd_req;
2085         orig_request->copyup_pages = pages;
2086
2087         /* Initialize the copyup op */
2088
2089         osd_req_op_cls_init(osd_req, 0, CEPH_OSD_OP_CALL, "rbd", "copyup");
2090         osd_req_op_cls_request_data_pages(osd_req, 0, pages, obj_size, 0,
2091                                                 false, false);
2092
2093         /* Then the original write request op */
2094
2095         osd_req_op_extent_init(osd_req, 1, CEPH_OSD_OP_WRITE,
2096                                         orig_request->offset,
2097                                         orig_request->length, 0, 0);
2098         osd_req_op_extent_osd_data_bio(osd_req, 1, orig_request->bio_list,
2099                                         orig_request->length);
2100
2101         rbd_osd_req_format_write(orig_request);
2102
2103         /* All set, send it off. */
2104
2105         orig_request->callback = rbd_img_obj_copyup_callback;
2106         osdc = &rbd_dev->rbd_client->client->osdc;
2107         result = rbd_obj_request_submit(osdc, orig_request);
2108         if (!result)
2109                 return;
2110 out_err:
2111         /* Record the error code and complete the request */
2112
2113         orig_request->result = result;
2114         orig_request->xferred = 0;
2115         obj_request_done_set(orig_request);
2116         rbd_obj_request_complete(orig_request);
2117 }
2118
2119 /*
2120  * Read from the parent image the range of data that covers the
2121  * entire target of the given object request.  This is used for
2122  * satisfying a layered image write request when the target of an
2123  * object request from the image request does not exist.
2124  *
2125  * A page array big enough to hold the returned data is allocated
2126  * and supplied to rbd_img_request_fill() as the "data descriptor."
2127  * When the read completes, this page array will be transferred to
2128  * the original object request for the copyup operation.
2129  *
2130  * If an error occurs, record it as the result of the original
2131  * object request and mark it done so it gets completed.
2132  */
2133 static int rbd_img_obj_parent_read_full(struct rbd_obj_request *obj_request)
2134 {
2135         struct rbd_img_request *img_request = NULL;
2136         struct rbd_img_request *parent_request = NULL;
2137         struct rbd_device *rbd_dev;
2138         u64 img_offset;
2139         u64 length;
2140         struct page **pages = NULL;
2141         u32 page_count;
2142         int result;
2143
2144         rbd_assert(obj_request_img_data_test(obj_request));
2145         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2146
2147         img_request = obj_request->img_request;
2148         rbd_assert(img_request != NULL);
2149         rbd_dev = img_request->rbd_dev;
2150         rbd_assert(rbd_dev->parent != NULL);
2151
2152         /*
2153          * First things first.  The original osd request is of no
2154          * use to use any more, we'll need a new one that can hold
2155          * the two ops in a copyup request.  We'll get that later,
2156          * but for now we can release the old one.
2157          */
2158         rbd_osd_req_destroy(obj_request->osd_req);
2159         obj_request->osd_req = NULL;
2160
2161         /*
2162          * Determine the byte range covered by the object in the
2163          * child image to which the original request was to be sent.
2164          */
2165         img_offset = obj_request->img_offset - obj_request->offset;
2166         length = (u64)1 << rbd_dev->header.obj_order;
2167
2168         /*
2169          * There is no defined parent data beyond the parent
2170          * overlap, so limit what we read at that boundary if
2171          * necessary.
2172          */
2173         if (img_offset + length > rbd_dev->parent_overlap) {
2174                 rbd_assert(img_offset < rbd_dev->parent_overlap);
2175                 length = rbd_dev->parent_overlap - img_offset;
2176         }
2177
2178         /*
2179          * Allocate a page array big enough to receive the data read
2180          * from the parent.
2181          */
2182         page_count = (u32)calc_pages_for(0, length);
2183         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2184         if (IS_ERR(pages)) {
2185                 result = PTR_ERR(pages);
2186                 pages = NULL;
2187                 goto out_err;
2188         }
2189
2190         result = -ENOMEM;
2191         parent_request = rbd_img_request_create(rbd_dev->parent,
2192                                                 img_offset, length,
2193                                                 false, true);
2194         if (!parent_request)
2195                 goto out_err;
2196         rbd_obj_request_get(obj_request);
2197         parent_request->obj_request = obj_request;
2198
2199         result = rbd_img_request_fill(parent_request, OBJ_REQUEST_PAGES, pages);
2200         if (result)
2201                 goto out_err;
2202         parent_request->copyup_pages = pages;
2203
2204         parent_request->callback = rbd_img_obj_parent_read_full_callback;
2205         result = rbd_img_request_submit(parent_request);
2206         if (!result)
2207                 return 0;
2208
2209         parent_request->copyup_pages = NULL;
2210         parent_request->obj_request = NULL;
2211         rbd_obj_request_put(obj_request);
2212 out_err:
2213         if (pages)
2214                 ceph_release_page_vector(pages, page_count);
2215         if (parent_request)
2216                 rbd_img_request_put(parent_request);
2217         obj_request->result = result;
2218         obj_request->xferred = 0;
2219         obj_request_done_set(obj_request);
2220
2221         return result;
2222 }
2223
2224 static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
2225 {
2226         struct rbd_obj_request *orig_request;
2227         int result;
2228
2229         rbd_assert(!obj_request_img_data_test(obj_request));
2230
2231         /*
2232          * All we need from the object request is the original
2233          * request and the result of the STAT op.  Grab those, then
2234          * we're done with the request.
2235          */
2236         orig_request = obj_request->obj_request;
2237         obj_request->obj_request = NULL;
2238         rbd_assert(orig_request);
2239         rbd_assert(orig_request->img_request);
2240
2241         result = obj_request->result;
2242         obj_request->result = 0;
2243
2244         dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
2245                 obj_request, orig_request, result,
2246                 obj_request->xferred, obj_request->length);
2247         rbd_obj_request_put(obj_request);
2248
2249         rbd_assert(orig_request);
2250         rbd_assert(orig_request->img_request);
2251
2252         /*
2253          * Our only purpose here is to determine whether the object
2254          * exists, and we don't want to treat the non-existence as
2255          * an error.  If something else comes back, transfer the
2256          * error to the original request and complete it now.
2257          */
2258         if (!result) {
2259                 obj_request_existence_set(orig_request, true);
2260         } else if (result == -ENOENT) {
2261                 obj_request_existence_set(orig_request, false);
2262         } else if (result) {
2263                 orig_request->result = result;
2264                 goto out;
2265         }
2266
2267         /*
2268          * Resubmit the original request now that we have recorded
2269          * whether the target object exists.
2270          */
2271         orig_request->result = rbd_img_obj_request_submit(orig_request);
2272 out:
2273         if (orig_request->result)
2274                 rbd_obj_request_complete(orig_request);
2275         rbd_obj_request_put(orig_request);
2276 }
2277
2278 static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
2279 {
2280         struct rbd_obj_request *stat_request;
2281         struct rbd_device *rbd_dev;
2282         struct ceph_osd_client *osdc;
2283         struct page **pages = NULL;
2284         u32 page_count;
2285         size_t size;
2286         int ret;
2287
2288         /*
2289          * The response data for a STAT call consists of:
2290          *     le64 length;
2291          *     struct {
2292          *         le32 tv_sec;
2293          *         le32 tv_nsec;
2294          *     } mtime;
2295          */
2296         size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
2297         page_count = (u32)calc_pages_for(0, size);
2298         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2299         if (IS_ERR(pages))
2300                 return PTR_ERR(pages);
2301
2302         ret = -ENOMEM;
2303         stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
2304                                                         OBJ_REQUEST_PAGES);
2305         if (!stat_request)
2306                 goto out;
2307
2308         rbd_obj_request_get(obj_request);
2309         stat_request->obj_request = obj_request;
2310         stat_request->pages = pages;
2311         stat_request->page_count = page_count;
2312
2313         rbd_assert(obj_request->img_request);
2314         rbd_dev = obj_request->img_request->rbd_dev;
2315         stat_request->osd_req = rbd_osd_req_create(rbd_dev, false,
2316                                                 stat_request);
2317         if (!stat_request->osd_req)
2318                 goto out;
2319         stat_request->callback = rbd_img_obj_exists_callback;
2320
2321         osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
2322         osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
2323                                         false, false);
2324         rbd_osd_req_format_read(stat_request);
2325
2326         osdc = &rbd_dev->rbd_client->client->osdc;
2327         ret = rbd_obj_request_submit(osdc, stat_request);
2328 out:
2329         if (ret)
2330                 rbd_obj_request_put(obj_request);
2331
2332         return ret;
2333 }
2334
2335 static int rbd_img_obj_request_submit(struct rbd_obj_request *obj_request)
2336 {
2337         struct rbd_img_request *img_request;
2338         struct rbd_device *rbd_dev;
2339         bool known;
2340
2341         rbd_assert(obj_request_img_data_test(obj_request));
2342
2343         img_request = obj_request->img_request;
2344         rbd_assert(img_request);
2345         rbd_dev = img_request->rbd_dev;
2346
2347         /*
2348          * Only writes to layered images need special handling.
2349          * Reads and non-layered writes are simple object requests.
2350          * Layered writes that start beyond the end of the overlap
2351          * with the parent have no parent data, so they too are
2352          * simple object requests.  Finally, if the target object is
2353          * known to already exist, its parent data has already been
2354          * copied, so a write to the object can also be handled as a
2355          * simple object request.
2356          */
2357         if (!img_request_write_test(img_request) ||
2358                 !img_request_layered_test(img_request) ||
2359                 rbd_dev->parent_overlap <= obj_request->img_offset ||
2360                 ((known = obj_request_known_test(obj_request)) &&
2361                         obj_request_exists_test(obj_request))) {
2362
2363                 struct rbd_device *rbd_dev;
2364                 struct ceph_osd_client *osdc;
2365
2366                 rbd_dev = obj_request->img_request->rbd_dev;
2367                 osdc = &rbd_dev->rbd_client->client->osdc;
2368
2369                 return rbd_obj_request_submit(osdc, obj_request);
2370         }
2371
2372         /*
2373          * It's a layered write.  The target object might exist but
2374          * we may not know that yet.  If we know it doesn't exist,
2375          * start by reading the data for the full target object from
2376          * the parent so we can use it for a copyup to the target.
2377          */
2378         if (known)
2379                 return rbd_img_obj_parent_read_full(obj_request);
2380
2381         /* We don't know whether the target exists.  Go find out. */
2382
2383         return rbd_img_obj_exists_submit(obj_request);
2384 }
2385
2386 static int rbd_img_request_submit(struct rbd_img_request *img_request)
2387 {
2388         struct rbd_obj_request *obj_request;
2389         struct rbd_obj_request *next_obj_request;
2390
2391         dout("%s: img %p\n", __func__, img_request);
2392         for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
2393                 int ret;
2394
2395                 ret = rbd_img_obj_request_submit(obj_request);
2396                 if (ret)
2397                         return ret;
2398         }
2399
2400         return 0;
2401 }
2402
2403 static void rbd_img_parent_read_callback(struct rbd_img_request *img_request)
2404 {
2405         struct rbd_obj_request *obj_request;
2406         struct rbd_device *rbd_dev;
2407         u64 obj_end;
2408
2409         rbd_assert(img_request_child_test(img_request));
2410
2411         obj_request = img_request->obj_request;
2412         rbd_assert(obj_request);
2413         rbd_assert(obj_request->img_request);
2414
2415         obj_request->result = img_request->result;
2416         if (obj_request->result)
2417                 goto out;
2418
2419         /*
2420          * We need to zero anything beyond the parent overlap
2421          * boundary.  Since rbd_img_obj_request_read_callback()
2422          * will zero anything beyond the end of a short read, an
2423          * easy way to do this is to pretend the data from the
2424          * parent came up short--ending at the overlap boundary.
2425          */
2426         rbd_assert(obj_request->img_offset < U64_MAX - obj_request->length);
2427         obj_end = obj_request->img_offset + obj_request->length;
2428         rbd_dev = obj_request->img_request->rbd_dev;
2429         if (obj_end > rbd_dev->parent_overlap) {
2430                 u64 xferred = 0;
2431
2432                 if (obj_request->img_offset < rbd_dev->parent_overlap)
2433                         xferred = rbd_dev->parent_overlap -
2434                                         obj_request->img_offset;
2435
2436                 obj_request->xferred = min(img_request->xferred, xferred);
2437         } else {
2438                 obj_request->xferred = img_request->xferred;
2439         }
2440 out:
2441         rbd_img_obj_request_read_callback(obj_request);
2442         rbd_obj_request_complete(obj_request);
2443 }
2444
2445 static void rbd_img_parent_read(struct rbd_obj_request *obj_request)
2446 {
2447         struct rbd_device *rbd_dev;
2448         struct rbd_img_request *img_request;
2449         int result;
2450
2451         rbd_assert(obj_request_img_data_test(obj_request));
2452         rbd_assert(obj_request->img_request != NULL);
2453         rbd_assert(obj_request->result == (s32) -ENOENT);
2454         rbd_assert(obj_request->type == OBJ_REQUEST_BIO);
2455
2456         rbd_dev = obj_request->img_request->rbd_dev;
2457         rbd_assert(rbd_dev->parent != NULL);
2458         /* rbd_read_finish(obj_request, obj_request->length); */
2459         img_request = rbd_img_request_create(rbd_dev->parent,
2460                                                 obj_request->img_offset,
2461                                                 obj_request->length,
2462                                                 false, true);
2463         result = -ENOMEM;
2464         if (!img_request)
2465                 goto out_err;
2466
2467         rbd_obj_request_get(obj_request);
2468         img_request->obj_request = obj_request;
2469
2470         result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2471                                         obj_request->bio_list);
2472         if (result)
2473                 goto out_err;
2474
2475         img_request->callback = rbd_img_parent_read_callback;
2476         result = rbd_img_request_submit(img_request);
2477         if (result)
2478                 goto out_err;
2479
2480         return;
2481 out_err:
2482         if (img_request)
2483                 rbd_img_request_put(img_request);
2484         obj_request->result = result;
2485         obj_request->xferred = 0;
2486         obj_request_done_set(obj_request);
2487 }
2488
2489 static int rbd_obj_notify_ack(struct rbd_device *rbd_dev,
2490                                    u64 ver, u64 notify_id)
2491 {
2492         struct rbd_obj_request *obj_request;
2493         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2494         int ret;
2495
2496         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2497                                                         OBJ_REQUEST_NODATA);
2498         if (!obj_request)
2499                 return -ENOMEM;
2500
2501         ret = -ENOMEM;
2502         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2503         if (!obj_request->osd_req)
2504                 goto out;
2505         obj_request->callback = rbd_obj_request_put;
2506
2507         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
2508                                         notify_id, ver, 0);
2509         rbd_osd_req_format_read(obj_request);
2510
2511         ret = rbd_obj_request_submit(osdc, obj_request);
2512 out:
2513         if (ret)
2514                 rbd_obj_request_put(obj_request);
2515
2516         return ret;
2517 }
2518
2519 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
2520 {
2521         struct rbd_device *rbd_dev = (struct rbd_device *)data;
2522         u64 hver;
2523         int rc;
2524
2525         if (!rbd_dev)
2526                 return;
2527
2528         dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
2529                 rbd_dev->header_name, (unsigned long long) notify_id,
2530                 (unsigned int) opcode);
2531         rc = rbd_dev_refresh(rbd_dev, &hver);
2532         if (rc)
2533                 rbd_warn(rbd_dev, "got notification but failed to "
2534                            " update snaps: %d\n", rc);
2535
2536         rbd_obj_notify_ack(rbd_dev, hver, notify_id);
2537 }
2538
2539 /*
2540  * Request sync osd watch/unwatch.  The value of "start" determines
2541  * whether a watch request is being initiated or torn down.
2542  */
2543 static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev, int start)
2544 {
2545         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2546         struct rbd_obj_request *obj_request;
2547         int ret;
2548
2549         rbd_assert(start ^ !!rbd_dev->watch_event);
2550         rbd_assert(start ^ !!rbd_dev->watch_request);
2551
2552         if (start) {
2553                 ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
2554                                                 &rbd_dev->watch_event);
2555                 if (ret < 0)
2556                         return ret;
2557                 rbd_assert(rbd_dev->watch_event != NULL);
2558         }
2559
2560         ret = -ENOMEM;
2561         obj_request = rbd_obj_request_create(rbd_dev->header_name, 0, 0,
2562                                                         OBJ_REQUEST_NODATA);
2563         if (!obj_request)
2564                 goto out_cancel;
2565
2566         obj_request->osd_req = rbd_osd_req_create(rbd_dev, true, obj_request);
2567         if (!obj_request->osd_req)
2568                 goto out_cancel;
2569
2570         if (start)
2571                 ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
2572         else
2573                 ceph_osdc_unregister_linger_request(osdc,
2574                                         rbd_dev->watch_request->osd_req);
2575
2576         osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
2577                                 rbd_dev->watch_event->cookie,
2578                                 rbd_dev->header.obj_version, start);
2579         rbd_osd_req_format_write(obj_request);
2580
2581         ret = rbd_obj_request_submit(osdc, obj_request);
2582         if (ret)
2583                 goto out_cancel;
2584         ret = rbd_obj_request_wait(obj_request);
2585         if (ret)
2586                 goto out_cancel;
2587         ret = obj_request->result;
2588         if (ret)
2589                 goto out_cancel;
2590
2591         /*
2592          * A watch request is set to linger, so the underlying osd
2593          * request won't go away until we unregister it.  We retain
2594          * a pointer to the object request during that time (in
2595          * rbd_dev->watch_request), so we'll keep a reference to
2596          * it.  We'll drop that reference (below) after we've
2597          * unregistered it.
2598          */
2599         if (start) {
2600                 rbd_dev->watch_request = obj_request;
2601
2602                 return 0;
2603         }
2604
2605         /* We have successfully torn down the watch request */
2606
2607         rbd_obj_request_put(rbd_dev->watch_request);
2608         rbd_dev->watch_request = NULL;
2609 out_cancel:
2610         /* Cancel the event if we're tearing down, or on error */
2611         ceph_osdc_cancel_event(rbd_dev->watch_event);
2612         rbd_dev->watch_event = NULL;
2613         if (obj_request)
2614                 rbd_obj_request_put(obj_request);
2615
2616         return ret;
2617 }
2618
2619 /*
2620  * Synchronous osd object method call
2621  */
2622 static int rbd_obj_method_sync(struct rbd_device *rbd_dev,
2623                              const char *object_name,
2624                              const char *class_name,
2625                              const char *method_name,
2626                              const void *outbound,
2627                              size_t outbound_size,
2628                              void *inbound,
2629                              size_t inbound_size,
2630                              u64 *version)
2631 {
2632         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2633         struct rbd_obj_request *obj_request;
2634         struct page **pages;
2635         u32 page_count;
2636         int ret;
2637
2638         /*
2639          * Method calls are ultimately read operations.  The result
2640          * should placed into the inbound buffer provided.  They
2641          * also supply outbound data--parameters for the object
2642          * method.  Currently if this is present it will be a
2643          * snapshot id.
2644          */
2645         page_count = (u32)calc_pages_for(0, inbound_size);
2646         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2647         if (IS_ERR(pages))
2648                 return PTR_ERR(pages);
2649
2650         ret = -ENOMEM;
2651         obj_request = rbd_obj_request_create(object_name, 0, inbound_size,
2652                                                         OBJ_REQUEST_PAGES);
2653         if (!obj_request)
2654                 goto out;
2655
2656         obj_request->pages = pages;
2657         obj_request->page_count = page_count;
2658
2659         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2660         if (!obj_request->osd_req)
2661                 goto out;
2662
2663         osd_req_op_cls_init(obj_request->osd_req, 0, CEPH_OSD_OP_CALL,
2664                                         class_name, method_name);
2665         if (outbound_size) {
2666                 struct ceph_pagelist *pagelist;
2667
2668                 pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
2669                 if (!pagelist)
2670                         goto out;
2671
2672                 ceph_pagelist_init(pagelist);
2673                 ceph_pagelist_append(pagelist, outbound, outbound_size);
2674                 osd_req_op_cls_request_data_pagelist(obj_request->osd_req, 0,
2675                                                 pagelist);
2676         }
2677         osd_req_op_cls_response_data_pages(obj_request->osd_req, 0,
2678                                         obj_request->pages, inbound_size,
2679                                         0, false, false);
2680         rbd_osd_req_format_read(obj_request);
2681
2682         ret = rbd_obj_request_submit(osdc, obj_request);
2683         if (ret)
2684                 goto out;
2685         ret = rbd_obj_request_wait(obj_request);
2686         if (ret)
2687                 goto out;
2688
2689         ret = obj_request->result;
2690         if (ret < 0)
2691                 goto out;
2692
2693         rbd_assert(obj_request->xferred < (u64)INT_MAX);
2694         ret = (int)obj_request->xferred;
2695         ceph_copy_from_page_vector(pages, inbound, 0, obj_request->xferred);
2696         if (version)
2697                 *version = obj_request->version;
2698 out:
2699         if (obj_request)
2700                 rbd_obj_request_put(obj_request);
2701         else
2702                 ceph_release_page_vector(pages, page_count);
2703
2704         return ret;
2705 }
2706
2707 static void rbd_request_fn(struct request_queue *q)
2708                 __releases(q->queue_lock) __acquires(q->queue_lock)
2709 {
2710         struct rbd_device *rbd_dev = q->queuedata;
2711         bool read_only = rbd_dev->mapping.read_only;
2712         struct request *rq;
2713         int result;
2714
2715         while ((rq = blk_fetch_request(q))) {
2716                 bool write_request = rq_data_dir(rq) == WRITE;
2717                 struct rbd_img_request *img_request;
2718                 u64 offset;
2719                 u64 length;
2720
2721                 /* Ignore any non-FS requests that filter through. */
2722
2723                 if (rq->cmd_type != REQ_TYPE_FS) {
2724                         dout("%s: non-fs request type %d\n", __func__,
2725                                 (int) rq->cmd_type);
2726                         __blk_end_request_all(rq, 0);
2727                         continue;
2728                 }
2729
2730                 /* Ignore/skip any zero-length requests */
2731
2732                 offset = (u64) blk_rq_pos(rq) << SECTOR_SHIFT;
2733                 length = (u64) blk_rq_bytes(rq);
2734
2735                 if (!length) {
2736                         dout("%s: zero-length request\n", __func__);
2737                         __blk_end_request_all(rq, 0);
2738                         continue;
2739                 }
2740
2741                 spin_unlock_irq(q->queue_lock);
2742
2743                 /* Disallow writes to a read-only device */
2744
2745                 if (write_request) {
2746                         result = -EROFS;
2747                         if (read_only)
2748                                 goto end_request;
2749                         rbd_assert(rbd_dev->spec->snap_id == CEPH_NOSNAP);
2750                 }
2751
2752                 /*
2753                  * Quit early if the mapped snapshot no longer
2754                  * exists.  It's still possible the snapshot will
2755                  * have disappeared by the time our request arrives
2756                  * at the osd, but there's no sense in sending it if
2757                  * we already know.
2758                  */
2759                 if (!test_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags)) {
2760                         dout("request for non-existent snapshot");
2761                         rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
2762                         result = -ENXIO;
2763                         goto end_request;
2764                 }
2765
2766                 result = -EINVAL;
2767                 if (WARN_ON(offset && length > U64_MAX - offset + 1))
2768                         goto end_request;       /* Shouldn't happen */
2769
2770                 result = -ENOMEM;
2771                 img_request = rbd_img_request_create(rbd_dev, offset, length,
2772                                                         write_request, false);
2773                 if (!img_request)
2774                         goto end_request;
2775
2776                 img_request->rq = rq;
2777
2778                 result = rbd_img_request_fill(img_request, OBJ_REQUEST_BIO,
2779                                                 rq->bio);
2780                 if (!result)
2781                         result = rbd_img_request_submit(img_request);
2782                 if (result)
2783                         rbd_img_request_put(img_request);
2784 end_request:
2785                 spin_lock_irq(q->queue_lock);
2786                 if (result < 0) {
2787                         rbd_warn(rbd_dev, "%s %llx at %llx result %d\n",
2788                                 write_request ? "write" : "read",
2789                                 length, offset, result);
2790
2791                         __blk_end_request_all(rq, result);
2792                 }
2793         }
2794 }
2795
2796 /*
2797  * a queue callback. Makes sure that we don't create a bio that spans across
2798  * multiple osd objects. One exception would be with a single page bios,
2799  * which we handle later at bio_chain_clone_range()
2800  */
2801 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
2802                           struct bio_vec *bvec)
2803 {
2804         struct rbd_device *rbd_dev = q->queuedata;
2805         sector_t sector_offset;
2806         sector_t sectors_per_obj;
2807         sector_t obj_sector_offset;
2808         int ret;
2809
2810         /*
2811          * Find how far into its rbd object the partition-relative
2812          * bio start sector is to offset relative to the enclosing
2813          * device.
2814          */
2815         sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
2816         sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
2817         obj_sector_offset = sector_offset & (sectors_per_obj - 1);
2818
2819         /*
2820          * Compute the number of bytes from that offset to the end
2821          * of the object.  Account for what's already used by the bio.
2822          */
2823         ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
2824         if (ret > bmd->bi_size)
2825                 ret -= bmd->bi_size;
2826         else
2827                 ret = 0;
2828
2829         /*
2830          * Don't send back more than was asked for.  And if the bio
2831          * was empty, let the whole thing through because:  "Note
2832          * that a block device *must* allow a single page to be
2833          * added to an empty bio."
2834          */
2835         rbd_assert(bvec->bv_len <= PAGE_SIZE);
2836         if (ret > (int) bvec->bv_len || !bmd->bi_size)
2837                 ret = (int) bvec->bv_len;
2838
2839         return ret;
2840 }
2841
2842 static void rbd_free_disk(struct rbd_device *rbd_dev)
2843 {
2844         struct gendisk *disk = rbd_dev->disk;
2845
2846         if (!disk)
2847                 return;
2848
2849         if (disk->flags & GENHD_FL_UP)
2850                 del_gendisk(disk);
2851         if (disk->queue)
2852                 blk_cleanup_queue(disk->queue);
2853         put_disk(disk);
2854 }
2855
2856 static int rbd_obj_read_sync(struct rbd_device *rbd_dev,
2857                                 const char *object_name,
2858                                 u64 offset, u64 length,
2859                                 void *buf, u64 *version)
2860
2861 {
2862         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
2863         struct rbd_obj_request *obj_request;
2864         struct page **pages = NULL;
2865         u32 page_count;
2866         size_t size;
2867         int ret;
2868
2869         page_count = (u32) calc_pages_for(offset, length);
2870         pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
2871         if (IS_ERR(pages))
2872                 ret = PTR_ERR(pages);
2873
2874         ret = -ENOMEM;
2875         obj_request = rbd_obj_request_create(object_name, offset, length,
2876                                                         OBJ_REQUEST_PAGES);
2877         if (!obj_request)
2878                 goto out;
2879
2880         obj_request->pages = pages;
2881         obj_request->page_count = page_count;
2882
2883         obj_request->osd_req = rbd_osd_req_create(rbd_dev, false, obj_request);
2884         if (!obj_request->osd_req)
2885                 goto out;
2886
2887         osd_req_op_extent_init(obj_request->osd_req, 0, CEPH_OSD_OP_READ,
2888                                         offset, length, 0, 0);
2889         osd_req_op_extent_osd_data_pages(obj_request->osd_req, 0,
2890                                         obj_request->pages,
2891                                         obj_request->length,
2892                                         obj_request->offset & ~PAGE_MASK,
2893                                         false, false);
2894         rbd_osd_req_format_read(obj_request);
2895
2896         ret = rbd_obj_request_submit(osdc, obj_request);
2897         if (ret)
2898                 goto out;
2899         ret = rbd_obj_request_wait(obj_request);
2900         if (ret)
2901                 goto out;
2902
2903         ret = obj_request->result;
2904         if (ret < 0)
2905                 goto out;
2906
2907         rbd_assert(obj_request->xferred <= (u64) SIZE_MAX);
2908         size = (size_t) obj_request->xferred;
2909         ceph_copy_from_page_vector(pages, buf, 0, size);
2910         rbd_assert(size <= (size_t) INT_MAX);
2911         ret = (int) size;
2912         if (version)
2913                 *version = obj_request->version;
2914 out:
2915         if (obj_request)
2916                 rbd_obj_request_put(obj_request);
2917         else
2918                 ceph_release_page_vector(pages, page_count);
2919
2920         return ret;
2921 }
2922
2923 /*
2924  * Read the complete header for the given rbd device.
2925  *
2926  * Returns a pointer to a dynamically-allocated buffer containing
2927  * the complete and validated header.  Caller can pass the address
2928  * of a variable that will be filled in with the version of the
2929  * header object at the time it was read.
2930  *
2931  * Returns a pointer-coded errno if a failure occurs.
2932  */
2933 static struct rbd_image_header_ondisk *
2934 rbd_dev_v1_header_read(struct rbd_device *rbd_dev, u64 *version)
2935 {
2936         struct rbd_image_header_ondisk *ondisk = NULL;
2937         u32 snap_count = 0;
2938         u64 names_size = 0;
2939         u32 want_count;
2940         int ret;
2941
2942         /*
2943          * The complete header will include an array of its 64-bit
2944          * snapshot ids, followed by the names of those snapshots as
2945          * a contiguous block of NUL-terminated strings.  Note that
2946          * the number of snapshots could change by the time we read
2947          * it in, in which case we re-read it.
2948          */
2949         do {
2950                 size_t size;
2951
2952                 kfree(ondisk);
2953
2954                 size = sizeof (*ondisk);
2955                 size += snap_count * sizeof (struct rbd_image_snap_ondisk);
2956                 size += names_size;
2957                 ondisk = kmalloc(size, GFP_KERNEL);
2958                 if (!ondisk)
2959                         return ERR_PTR(-ENOMEM);
2960
2961                 ret = rbd_obj_read_sync(rbd_dev, rbd_dev->header_name,
2962                                        0, size, ondisk, version);
2963                 if (ret < 0)
2964                         goto out_err;
2965                 if (WARN_ON((size_t) ret < size)) {
2966                         ret = -ENXIO;
2967                         rbd_warn(rbd_dev, "short header read (want %zd got %d)",
2968                                 size, ret);
2969                         goto out_err;
2970                 }
2971                 if (!rbd_dev_ondisk_valid(ondisk)) {
2972                         ret = -ENXIO;
2973                         rbd_warn(rbd_dev, "invalid header");
2974                         goto out_err;
2975                 }
2976
2977                 names_size = le64_to_cpu(ondisk->snap_names_len);
2978                 want_count = snap_count;
2979                 snap_count = le32_to_cpu(ondisk->snap_count);
2980         } while (snap_count != want_count);
2981
2982         return ondisk;
2983
2984 out_err:
2985         kfree(ondisk);
2986
2987         return ERR_PTR(ret);
2988 }
2989
2990 /*
2991  * reload the ondisk the header
2992  */
2993 static int rbd_read_header(struct rbd_device *rbd_dev,
2994                            struct rbd_image_header *header)
2995 {
2996         struct rbd_image_header_ondisk *ondisk;
2997         u64 ver = 0;
2998         int ret;
2999
3000         ondisk = rbd_dev_v1_header_read(rbd_dev, &ver);
3001         if (IS_ERR(ondisk))
3002                 return PTR_ERR(ondisk);
3003         ret = rbd_header_from_disk(header, ondisk);
3004         if (ret >= 0)
3005                 header->obj_version = ver;
3006         kfree(ondisk);
3007
3008         return ret;
3009 }
3010
3011 static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
3012 {
3013         struct rbd_snap *snap;
3014         struct rbd_snap *next;
3015
3016         list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
3017                 rbd_remove_snap_dev(snap);
3018 }
3019
3020 static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
3021 {
3022         sector_t size;
3023
3024         if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
3025                 return;
3026
3027         size = (sector_t) rbd_dev->header.image_size / SECTOR_SIZE;
3028         dout("setting size to %llu sectors", (unsigned long long) size);
3029         rbd_dev->mapping.size = (u64) size;
3030         set_capacity(rbd_dev->disk, size);
3031 }
3032
3033 /*
3034  * only read the first part of the ondisk header, without the snaps info
3035  */
3036 static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
3037 {
3038         int ret;
3039         struct rbd_image_header h;
3040
3041         ret = rbd_read_header(rbd_dev, &h);
3042         if (ret < 0)
3043                 return ret;
3044
3045         down_write(&rbd_dev->header_rwsem);
3046
3047         /* Update image size, and check for resize of mapped image */
3048         rbd_dev->header.image_size = h.image_size;
3049         rbd_update_mapping_size(rbd_dev);
3050
3051         /* rbd_dev->header.object_prefix shouldn't change */
3052         kfree(rbd_dev->header.snap_sizes);
3053         kfree(rbd_dev->header.snap_names);
3054         /* osd requests may still refer to snapc */
3055         ceph_put_snap_context(rbd_dev->header.snapc);
3056
3057         if (hver)
3058                 *hver = h.obj_version;
3059         rbd_dev->header.obj_version = h.obj_version;
3060         rbd_dev->header.image_size = h.image_size;
3061         rbd_dev->header.snapc = h.snapc;
3062         rbd_dev->header.snap_names = h.snap_names;
3063         rbd_dev->header.snap_sizes = h.snap_sizes;
3064         /* Free the extra copy of the object prefix */
3065         WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
3066         kfree(h.object_prefix);
3067
3068         ret = rbd_dev_snaps_update(rbd_dev);
3069         if (!ret)
3070                 ret = rbd_dev_snaps_register(rbd_dev);
3071
3072         up_write(&rbd_dev->header_rwsem);
3073
3074         return ret;
3075 }
3076
3077 static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
3078 {
3079         int ret;
3080
3081         rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
3082         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
3083         if (rbd_dev->image_format == 1)
3084                 ret = rbd_dev_v1_refresh(rbd_dev, hver);
3085         else
3086                 ret = rbd_dev_v2_refresh(rbd_dev, hver);
3087         mutex_unlock(&ctl_mutex);
3088         revalidate_disk(rbd_dev->disk);
3089
3090         return ret;
3091 }
3092
3093 static int rbd_init_disk(struct rbd_device *rbd_dev)
3094 {
3095         struct gendisk *disk;
3096         struct request_queue *q;
3097         u64 segment_size;
3098
3099         /* create gendisk info */
3100         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
3101         if (!disk)
3102                 return -ENOMEM;
3103
3104         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
3105                  rbd_dev->dev_id);
3106         disk->major = rbd_dev->major;
3107         disk->first_minor = 0;
3108         disk->fops = &rbd_bd_ops;
3109         disk->private_data = rbd_dev;
3110
3111         q = blk_init_queue(rbd_request_fn, &rbd_dev->lock);
3112         if (!q)
3113                 goto out_disk;
3114
3115         /* We use the default size, but let's be explicit about it. */
3116         blk_queue_physical_block_size(q, SECTOR_SIZE);
3117
3118         /* set io sizes to object size */
3119         segment_size = rbd_obj_bytes(&rbd_dev->header);
3120         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
3121         blk_queue_max_segment_size(q, segment_size);
3122         blk_queue_io_min(q, segment_size);
3123         blk_queue_io_opt(q, segment_size);
3124
3125         blk_queue_merge_bvec(q, rbd_merge_bvec);
3126         disk->queue = q;
3127
3128         q->queuedata = rbd_dev;
3129
3130         rbd_dev->disk = disk;
3131
3132         set_capacity(rbd_dev->disk, rbd_dev->mapping.size / SECTOR_SIZE);
3133
3134         return 0;
3135 out_disk:
3136         put_disk(disk);
3137
3138         return -ENOMEM;
3139 }
3140
3141 /*
3142   sysfs
3143 */
3144
3145 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
3146 {
3147         return container_of(dev, struct rbd_device, dev);
3148 }
3149
3150 static ssize_t rbd_size_show(struct device *dev,
3151                              struct device_attribute *attr, char *buf)
3152 {
3153         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3154         sector_t size;
3155
3156         down_read(&rbd_dev->header_rwsem);
3157         size = get_capacity(rbd_dev->disk);
3158         up_read(&rbd_dev->header_rwsem);
3159
3160         return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
3161 }
3162
3163 /*
3164  * Note this shows the features for whatever's mapped, which is not
3165  * necessarily the base image.
3166  */
3167 static ssize_t rbd_features_show(struct device *dev,
3168                              struct device_attribute *attr, char *buf)
3169 {
3170         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3171
3172         return sprintf(buf, "0x%016llx\n",
3173                         (unsigned long long) rbd_dev->mapping.features);
3174 }
3175
3176 static ssize_t rbd_major_show(struct device *dev,
3177                               struct device_attribute *attr, char *buf)
3178 {
3179         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3180
3181         return sprintf(buf, "%d\n", rbd_dev->major);
3182 }
3183
3184 static ssize_t rbd_client_id_show(struct device *dev,
3185                                   struct device_attribute *attr, char *buf)
3186 {
3187         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3188
3189         return sprintf(buf, "client%lld\n",
3190                         ceph_client_id(rbd_dev->rbd_client->client));
3191 }
3192
3193 static ssize_t rbd_pool_show(struct device *dev,
3194                              struct device_attribute *attr, char *buf)
3195 {
3196         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3197
3198         return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
3199 }
3200
3201 static ssize_t rbd_pool_id_show(struct device *dev,
3202                              struct device_attribute *attr, char *buf)
3203 {
3204         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3205
3206         return sprintf(buf, "%llu\n",
3207                 (unsigned long long) rbd_dev->spec->pool_id);
3208 }
3209
3210 static ssize_t rbd_name_show(struct device *dev,
3211                              struct device_attribute *attr, char *buf)
3212 {
3213         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3214
3215         if (rbd_dev->spec->image_name)
3216                 return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
3217
3218         return sprintf(buf, "(unknown)\n");
3219 }
3220
3221 static ssize_t rbd_image_id_show(struct device *dev,
3222                              struct device_attribute *attr, char *buf)
3223 {
3224         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3225
3226         return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
3227 }
3228
3229 /*
3230  * Shows the name of the currently-mapped snapshot (or
3231  * RBD_SNAP_HEAD_NAME for the base image).
3232  */
3233 static ssize_t rbd_snap_show(struct device *dev,
3234                              struct device_attribute *attr,
3235                              char *buf)
3236 {
3237         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3238
3239         return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
3240 }
3241
3242 /*
3243  * For an rbd v2 image, shows the pool id, image id, and snapshot id
3244  * for the parent image.  If there is no parent, simply shows
3245  * "(no parent image)".
3246  */
3247 static ssize_t rbd_parent_show(struct device *dev,
3248                              struct device_attribute *attr,
3249                              char *buf)
3250 {
3251         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3252         struct rbd_spec *spec = rbd_dev->parent_spec;
3253         int count;
3254         char *bufp = buf;
3255
3256         if (!spec)
3257                 return sprintf(buf, "(no parent image)\n");
3258
3259         count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
3260                         (unsigned long long) spec->pool_id, spec->pool_name);
3261         if (count < 0)
3262                 return count;
3263         bufp += count;
3264
3265         count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
3266                         spec->image_name ? spec->image_name : "(unknown)");
3267         if (count < 0)
3268                 return count;
3269         bufp += count;
3270
3271         count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
3272                         (unsigned long long) spec->snap_id, spec->snap_name);
3273         if (count < 0)
3274                 return count;
3275         bufp += count;
3276
3277         count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
3278         if (count < 0)
3279                 return count;
3280         bufp += count;
3281
3282         return (ssize_t) (bufp - buf);
3283 }
3284
3285 static ssize_t rbd_image_refresh(struct device *dev,
3286                                  struct device_attribute *attr,
3287                                  const char *buf,
3288                                  size_t size)
3289 {
3290         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
3291         int ret;
3292
3293         ret = rbd_dev_refresh(rbd_dev, NULL);
3294
3295         return ret < 0 ? ret : size;
3296 }
3297
3298 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
3299 static DEVICE_ATTR(features, S_IRUGO, rbd_features_show, NULL);
3300 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
3301 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
3302 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
3303 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
3304 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
3305 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
3306 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
3307 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
3308 static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
3309
3310 static struct attribute *rbd_attrs[] = {
3311         &dev_attr_size.attr,
3312         &dev_attr_features.attr,
3313         &dev_attr_major.attr,
3314         &dev_attr_client_id.attr,
3315         &dev_attr_pool.attr,
3316         &dev_attr_pool_id.attr,
3317         &dev_attr_name.attr,
3318         &dev_attr_image_id.attr,
3319         &dev_attr_current_snap.attr,
3320         &dev_attr_parent.attr,
3321         &dev_attr_refresh.attr,
3322         NULL
3323 };
3324
3325 static struct attribute_group rbd_attr_group = {
3326         .attrs = rbd_attrs,
3327 };
3328
3329 static const struct attribute_group *rbd_attr_groups[] = {
3330         &rbd_attr_group,
3331         NULL
3332 };
3333
3334 static void rbd_sysfs_dev_release(struct device *dev)
3335 {
3336 }
3337
3338 static struct device_type rbd_device_type = {
3339         .name           = "rbd",
3340         .groups         = rbd_attr_groups,
3341         .release        = rbd_sysfs_dev_release,
3342 };
3343
3344
3345 /*
3346   sysfs - snapshots
3347 */
3348
3349 static ssize_t rbd_snap_size_show(struct device *dev,
3350                                   struct device_attribute *attr,
3351                                   char *buf)
3352 {
3353         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
3354
3355         return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
3356 }
3357
3358 static ssize_t rbd_snap_id_show(struct device *dev,
3359                                 struct device_attribute *attr,
3360                                 char *buf)
3361 {
3362         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
3363
3364         return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
3365 }
3366
3367 static ssize_t rbd_snap_features_show(struct device *dev,
3368                                 struct device_attribute *attr,
3369                                 char *buf)
3370 {
3371         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
3372
3373         return sprintf(buf, "0x%016llx\n",
3374                         (unsigned long long) snap->features);
3375 }
3376
3377 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
3378 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
3379 static DEVICE_ATTR(snap_features, S_IRUGO, rbd_snap_features_show, NULL);
3380
3381 static struct attribute *rbd_snap_attrs[] = {
3382         &dev_attr_snap_size.attr,
3383         &dev_attr_snap_id.attr,
3384         &dev_attr_snap_features.attr,
3385         NULL,
3386 };
3387
3388 static struct attribute_group rbd_snap_attr_group = {
3389         .attrs = rbd_snap_attrs,
3390 };
3391
3392 static void rbd_snap_dev_release(struct device *dev)
3393 {
3394         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
3395         kfree(snap->name);
3396         kfree(snap);
3397 }
3398
3399 static const struct attribute_group *rbd_snap_attr_groups[] = {
3400         &rbd_snap_attr_group,
3401         NULL
3402 };
3403
3404 static struct device_type rbd_snap_device_type = {
3405         .groups         = rbd_snap_attr_groups,
3406         .release        = rbd_snap_dev_release,
3407 };
3408
3409 static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
3410 {
3411         kref_get(&spec->kref);
3412
3413         return spec;
3414 }
3415
3416 static void rbd_spec_free(struct kref *kref);
3417 static void rbd_spec_put(struct rbd_spec *spec)
3418 {
3419         if (spec)
3420                 kref_put(&spec->kref, rbd_spec_free);
3421 }
3422
3423 static struct rbd_spec *rbd_spec_alloc(void)
3424 {
3425         struct rbd_spec *spec;
3426
3427         spec = kzalloc(sizeof (*spec), GFP_KERNEL);
3428         if (!spec)
3429                 return NULL;
3430         kref_init(&spec->kref);
3431
3432         return spec;
3433 }
3434
3435 static void rbd_spec_free(struct kref *kref)
3436 {
3437         struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
3438
3439         kfree(spec->pool_name);
3440         kfree(spec->image_id);
3441         kfree(spec->image_name);
3442         kfree(spec->snap_name);
3443         kfree(spec);
3444 }
3445
3446 static struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
3447                                 struct rbd_spec *spec)
3448 {
3449         struct rbd_device *rbd_dev;
3450
3451         rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
3452         if (!rbd_dev)
3453                 return NULL;
3454
3455         spin_lock_init(&rbd_dev->lock);
3456         rbd_dev->flags = 0;
3457         INIT_LIST_HEAD(&rbd_dev->node);
3458         INIT_LIST_HEAD(&rbd_dev->snaps);
3459         init_rwsem(&rbd_dev->header_rwsem);
3460
3461         rbd_dev->spec = spec;
3462         rbd_dev->rbd_client = rbdc;
3463
3464         /* Initialize the layout used for all rbd requests */
3465
3466         rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3467         rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
3468         rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
3469         rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
3470
3471         return rbd_dev;
3472 }
3473
3474 static void rbd_dev_destroy(struct rbd_device *rbd_dev)
3475 {
3476         rbd_spec_put(rbd_dev->parent_spec);
3477         kfree(rbd_dev->header_name);
3478         rbd_put_client(rbd_dev->rbd_client);
3479         rbd_spec_put(rbd_dev->spec);
3480         kfree(rbd_dev);
3481 }
3482
3483 static bool rbd_snap_registered(struct rbd_snap *snap)
3484 {
3485         bool ret = snap->dev.type == &rbd_snap_device_type;
3486         bool reg = device_is_registered(&snap->dev);
3487
3488         rbd_assert(!ret ^ reg);
3489
3490         return ret;
3491 }
3492
3493 static void rbd_remove_snap_dev(struct rbd_snap *snap)
3494 {
3495         list_del(&snap->node);
3496         if (device_is_registered(&snap->dev))
3497                 device_unregister(&snap->dev);
3498 }
3499
3500 static int rbd_register_snap_dev(struct rbd_snap *snap,
3501                                   struct device *parent)
3502 {
3503         struct device *dev = &snap->dev;
3504         int ret;
3505
3506         dev->type = &rbd_snap_device_type;
3507         dev->parent = parent;
3508         dev->release = rbd_snap_dev_release;
3509         dev_set_name(dev, "%s%s", RBD_SNAP_DEV_NAME_PREFIX, snap->name);
3510         dout("%s: registering device for snapshot %s\n", __func__, snap->name);
3511
3512         ret = device_register(dev);
3513
3514         return ret;
3515 }
3516
3517 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
3518                                                 const char *snap_name,
3519                                                 u64 snap_id, u64 snap_size,
3520                                                 u64 snap_features)
3521 {
3522         struct rbd_snap *snap;
3523         int ret;
3524
3525         snap = kzalloc(sizeof (*snap), GFP_KERNEL);
3526         if (!snap)
3527                 return ERR_PTR(-ENOMEM);
3528
3529         ret = -ENOMEM;
3530         snap->name = kstrdup(snap_name, GFP_KERNEL);
3531         if (!snap->name)
3532                 goto err;
3533
3534         snap->id = snap_id;
3535         snap->size = snap_size;
3536         snap->features = snap_features;
3537
3538         return snap;
3539
3540 err:
3541         kfree(snap->name);
3542         kfree(snap);
3543
3544         return ERR_PTR(ret);
3545 }
3546
3547 static char *rbd_dev_v1_snap_info(struct rbd_device *rbd_dev, u32 which,
3548                 u64 *snap_size, u64 *snap_features)
3549 {
3550         char *snap_name;
3551
3552         rbd_assert(which < rbd_dev->header.snapc->num_snaps);
3553
3554         *snap_size = rbd_dev->header.snap_sizes[which];
3555         *snap_features = 0;     /* No features for v1 */
3556
3557         /* Skip over names until we find the one we are looking for */
3558
3559         snap_name = rbd_dev->header.snap_names;
3560         while (which--)
3561                 snap_name += strlen(snap_name) + 1;
3562
3563         return snap_name;
3564 }
3565
3566 /*
3567  * Get the size and object order for an image snapshot, or if
3568  * snap_id is CEPH_NOSNAP, gets this information for the base
3569  * image.
3570  */
3571 static int _rbd_dev_v2_snap_size(struct rbd_device *rbd_dev, u64 snap_id,
3572                                 u8 *order, u64 *snap_size)
3573 {
3574         __le64 snapid = cpu_to_le64(snap_id);
3575         int ret;
3576         struct {
3577                 u8 order;
3578                 __le64 size;
3579         } __attribute__ ((packed)) size_buf = { 0 };
3580
3581         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3582                                 "rbd", "get_size",
3583                                 &snapid, sizeof (snapid),
3584                                 &size_buf, sizeof (size_buf), NULL);
3585         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3586         if (ret < 0)
3587                 return ret;
3588         if (ret < sizeof (size_buf))
3589                 return -ERANGE;
3590
3591         *order = size_buf.order;
3592         *snap_size = le64_to_cpu(size_buf.size);
3593
3594         dout("  snap_id 0x%016llx order = %u, snap_size = %llu\n",
3595                 (unsigned long long)snap_id, (unsigned int)*order,
3596                 (unsigned long long)*snap_size);
3597
3598         return 0;
3599 }
3600
3601 static int rbd_dev_v2_image_size(struct rbd_device *rbd_dev)
3602 {
3603         return _rbd_dev_v2_snap_size(rbd_dev, CEPH_NOSNAP,
3604                                         &rbd_dev->header.obj_order,
3605                                         &rbd_dev->header.image_size);
3606 }
3607
3608 static int rbd_dev_v2_object_prefix(struct rbd_device *rbd_dev)
3609 {
3610         void *reply_buf;
3611         int ret;
3612         void *p;
3613
3614         reply_buf = kzalloc(RBD_OBJ_PREFIX_LEN_MAX, GFP_KERNEL);
3615         if (!reply_buf)
3616                 return -ENOMEM;
3617
3618         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3619                                 "rbd", "get_object_prefix", NULL, 0,
3620                                 reply_buf, RBD_OBJ_PREFIX_LEN_MAX, NULL);
3621         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3622         if (ret < 0)
3623                 goto out;
3624
3625         p = reply_buf;
3626         rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
3627                                                 p + ret, NULL, GFP_NOIO);
3628         ret = 0;
3629
3630         if (IS_ERR(rbd_dev->header.object_prefix)) {
3631                 ret = PTR_ERR(rbd_dev->header.object_prefix);
3632                 rbd_dev->header.object_prefix = NULL;
3633         } else {
3634                 dout("  object_prefix = %s\n", rbd_dev->header.object_prefix);
3635         }
3636 out:
3637         kfree(reply_buf);
3638
3639         return ret;
3640 }
3641
3642 static int _rbd_dev_v2_snap_features(struct rbd_device *rbd_dev, u64 snap_id,
3643                 u64 *snap_features)
3644 {
3645         __le64 snapid = cpu_to_le64(snap_id);
3646         struct {
3647                 __le64 features;
3648                 __le64 incompat;
3649         } __attribute__ ((packed)) features_buf = { 0 };
3650         u64 incompat;
3651         int ret;
3652
3653         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3654                                 "rbd", "get_features",
3655                                 &snapid, sizeof (snapid),
3656                                 &features_buf, sizeof (features_buf), NULL);
3657         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3658         if (ret < 0)
3659                 return ret;
3660         if (ret < sizeof (features_buf))
3661                 return -ERANGE;
3662
3663         incompat = le64_to_cpu(features_buf.incompat);
3664         if (incompat & ~RBD_FEATURES_SUPPORTED)
3665                 return -ENXIO;
3666
3667         *snap_features = le64_to_cpu(features_buf.features);
3668
3669         dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
3670                 (unsigned long long)snap_id,
3671                 (unsigned long long)*snap_features,
3672                 (unsigned long long)le64_to_cpu(features_buf.incompat));
3673
3674         return 0;
3675 }
3676
3677 static int rbd_dev_v2_features(struct rbd_device *rbd_dev)
3678 {
3679         return _rbd_dev_v2_snap_features(rbd_dev, CEPH_NOSNAP,
3680                                                 &rbd_dev->header.features);
3681 }
3682
3683 static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
3684 {
3685         struct rbd_spec *parent_spec;
3686         size_t size;
3687         void *reply_buf = NULL;
3688         __le64 snapid;
3689         void *p;
3690         void *end;
3691         char *image_id;
3692         u64 overlap;
3693         int ret;
3694
3695         parent_spec = rbd_spec_alloc();
3696         if (!parent_spec)
3697                 return -ENOMEM;
3698
3699         size = sizeof (__le64) +                                /* pool_id */
3700                 sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX +        /* image_id */
3701                 sizeof (__le64) +                               /* snap_id */
3702                 sizeof (__le64);                                /* overlap */
3703         reply_buf = kmalloc(size, GFP_KERNEL);
3704         if (!reply_buf) {
3705                 ret = -ENOMEM;
3706                 goto out_err;
3707         }
3708
3709         snapid = cpu_to_le64(CEPH_NOSNAP);
3710         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3711                                 "rbd", "get_parent",
3712                                 &snapid, sizeof (snapid),
3713                                 reply_buf, size, NULL);
3714         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3715         if (ret < 0)
3716                 goto out_err;
3717
3718         p = reply_buf;
3719         end = reply_buf + ret;
3720         ret = -ERANGE;
3721         ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
3722         if (parent_spec->pool_id == CEPH_NOPOOL)
3723                 goto out;       /* No parent?  No problem. */
3724
3725         /* The ceph file layout needs to fit pool id in 32 bits */
3726
3727         ret = -EIO;
3728         if (WARN_ON(parent_spec->pool_id > (u64)U32_MAX))
3729                 goto out_err;
3730
3731         image_id = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3732         if (IS_ERR(image_id)) {
3733                 ret = PTR_ERR(image_id);
3734                 goto out_err;
3735         }
3736         parent_spec->image_id = image_id;
3737         ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
3738         ceph_decode_64_safe(&p, end, overlap, out_err);
3739
3740         rbd_dev->parent_overlap = overlap;
3741         rbd_dev->parent_spec = parent_spec;
3742         parent_spec = NULL;     /* rbd_dev now owns this */
3743 out:
3744         ret = 0;
3745 out_err:
3746         kfree(reply_buf);
3747         rbd_spec_put(parent_spec);
3748
3749         return ret;
3750 }
3751
3752 static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
3753 {
3754         size_t image_id_size;
3755         char *image_id;
3756         void *p;
3757         void *end;
3758         size_t size;
3759         void *reply_buf = NULL;
3760         size_t len = 0;
3761         char *image_name = NULL;
3762         int ret;
3763
3764         rbd_assert(!rbd_dev->spec->image_name);
3765
3766         len = strlen(rbd_dev->spec->image_id);
3767         image_id_size = sizeof (__le32) + len;
3768         image_id = kmalloc(image_id_size, GFP_KERNEL);
3769         if (!image_id)
3770                 return NULL;
3771
3772         p = image_id;
3773         end = image_id + image_id_size;
3774         ceph_encode_string(&p, end, rbd_dev->spec->image_id, (u32)len);
3775
3776         size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
3777         reply_buf = kmalloc(size, GFP_KERNEL);
3778         if (!reply_buf)
3779                 goto out;
3780
3781         ret = rbd_obj_method_sync(rbd_dev, RBD_DIRECTORY,
3782                                 "rbd", "dir_get_name",
3783                                 image_id, image_id_size,
3784                                 reply_buf, size, NULL);
3785         if (ret < 0)
3786                 goto out;
3787         p = reply_buf;
3788         end = reply_buf + size;
3789         image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
3790         if (IS_ERR(image_name))
3791                 image_name = NULL;
3792         else
3793                 dout("%s: name is %s len is %zd\n", __func__, image_name, len);
3794 out:
3795         kfree(reply_buf);
3796         kfree(image_id);
3797
3798         return image_name;
3799 }
3800
3801 /*
3802  * When a parent image gets probed, we only have the pool, image,
3803  * and snapshot ids but not the names of any of them.  This call
3804  * is made later to fill in those names.  It has to be done after
3805  * rbd_dev_snaps_update() has completed because some of the
3806  * information (in particular, snapshot name) is not available
3807  * until then.
3808  */
3809 static int rbd_dev_probe_update_spec(struct rbd_device *rbd_dev)
3810 {
3811         struct ceph_osd_client *osdc;
3812         const char *name;
3813         void *reply_buf = NULL;
3814         int ret;
3815
3816         if (rbd_dev->spec->pool_name)
3817                 return 0;       /* Already have the names */
3818
3819         /* Look up the pool name */
3820
3821         osdc = &rbd_dev->rbd_client->client->osdc;
3822         name = ceph_pg_pool_name_by_id(osdc->osdmap, rbd_dev->spec->pool_id);
3823         if (!name) {
3824                 rbd_warn(rbd_dev, "there is no pool with id %llu",
3825                         rbd_dev->spec->pool_id);        /* Really a BUG() */
3826                 return -EIO;
3827         }
3828
3829         rbd_dev->spec->pool_name = kstrdup(name, GFP_KERNEL);
3830         if (!rbd_dev->spec->pool_name)
3831                 return -ENOMEM;
3832
3833         /* Fetch the image name; tolerate failure here */
3834
3835         name = rbd_dev_image_name(rbd_dev);
3836         if (name)
3837                 rbd_dev->spec->image_name = (char *)name;
3838         else
3839                 rbd_warn(rbd_dev, "unable to get image name");
3840
3841         /* Look up the snapshot name. */
3842
3843         name = rbd_snap_name(rbd_dev, rbd_dev->spec->snap_id);
3844         if (!name) {
3845                 rbd_warn(rbd_dev, "no snapshot with id %llu",
3846                         rbd_dev->spec->snap_id);        /* Really a BUG() */
3847                 ret = -EIO;
3848                 goto out_err;
3849         }
3850         rbd_dev->spec->snap_name = kstrdup(name, GFP_KERNEL);
3851         if(!rbd_dev->spec->snap_name)
3852                 goto out_err;
3853
3854         return 0;
3855 out_err:
3856         kfree(reply_buf);
3857         kfree(rbd_dev->spec->pool_name);
3858         rbd_dev->spec->pool_name = NULL;
3859
3860         return ret;
3861 }
3862
3863 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
3864 {
3865         size_t size;
3866         int ret;
3867         void *reply_buf;
3868         void *p;
3869         void *end;
3870         u64 seq;
3871         u32 snap_count;
3872         struct ceph_snap_context *snapc;
3873         u32 i;
3874
3875         /*
3876          * We'll need room for the seq value (maximum snapshot id),
3877          * snapshot count, and array of that many snapshot ids.
3878          * For now we have a fixed upper limit on the number we're
3879          * prepared to receive.
3880          */
3881         size = sizeof (__le64) + sizeof (__le32) +
3882                         RBD_MAX_SNAP_COUNT * sizeof (__le64);
3883         reply_buf = kzalloc(size, GFP_KERNEL);
3884         if (!reply_buf)
3885                 return -ENOMEM;
3886
3887         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3888                                 "rbd", "get_snapcontext", NULL, 0,
3889                                 reply_buf, size, ver);
3890         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3891         if (ret < 0)
3892                 goto out;
3893
3894         p = reply_buf;
3895         end = reply_buf + ret;
3896         ret = -ERANGE;
3897         ceph_decode_64_safe(&p, end, seq, out);
3898         ceph_decode_32_safe(&p, end, snap_count, out);
3899
3900         /*
3901          * Make sure the reported number of snapshot ids wouldn't go
3902          * beyond the end of our buffer.  But before checking that,
3903          * make sure the computed size of the snapshot context we
3904          * allocate is representable in a size_t.
3905          */
3906         if (snap_count > (SIZE_MAX - sizeof (struct ceph_snap_context))
3907                                  / sizeof (u64)) {
3908                 ret = -EINVAL;
3909                 goto out;
3910         }
3911         if (!ceph_has_room(&p, end, snap_count * sizeof (__le64)))
3912                 goto out;
3913
3914         size = sizeof (struct ceph_snap_context) +
3915                                 snap_count * sizeof (snapc->snaps[0]);
3916         snapc = kmalloc(size, GFP_KERNEL);
3917         if (!snapc) {
3918                 ret = -ENOMEM;
3919                 goto out;
3920         }
3921         ret = 0;
3922
3923         atomic_set(&snapc->nref, 1);
3924         snapc->seq = seq;
3925         snapc->num_snaps = snap_count;
3926         for (i = 0; i < snap_count; i++)
3927                 snapc->snaps[i] = ceph_decode_64(&p);
3928
3929         rbd_dev->header.snapc = snapc;
3930
3931         dout("  snap context seq = %llu, snap_count = %u\n",
3932                 (unsigned long long)seq, (unsigned int)snap_count);
3933 out:
3934         kfree(reply_buf);
3935
3936         return ret;
3937 }
3938
3939 static char *rbd_dev_v2_snap_name(struct rbd_device *rbd_dev, u32 which)
3940 {
3941         size_t size;
3942         void *reply_buf;
3943         __le64 snap_id;
3944         int ret;
3945         void *p;
3946         void *end;
3947         char *snap_name;
3948
3949         size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
3950         reply_buf = kmalloc(size, GFP_KERNEL);
3951         if (!reply_buf)
3952                 return ERR_PTR(-ENOMEM);
3953
3954         snap_id = cpu_to_le64(rbd_dev->header.snapc->snaps[which]);
3955         ret = rbd_obj_method_sync(rbd_dev, rbd_dev->header_name,
3956                                 "rbd", "get_snapshot_name",
3957                                 &snap_id, sizeof (snap_id),
3958                                 reply_buf, size, NULL);
3959         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
3960         if (ret < 0)
3961                 goto out;
3962
3963         p = reply_buf;
3964         end = reply_buf + size;
3965         snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
3966         if (IS_ERR(snap_name)) {
3967                 ret = PTR_ERR(snap_name);
3968                 goto out;
3969         } else {
3970                 dout("  snap_id 0x%016llx snap_name = %s\n",
3971                         (unsigned long long)le64_to_cpu(snap_id), snap_name);
3972         }
3973         kfree(reply_buf);
3974
3975         return snap_name;
3976 out:
3977         kfree(reply_buf);
3978
3979         return ERR_PTR(ret);
3980 }
3981
3982 static char *rbd_dev_v2_snap_info(struct rbd_device *rbd_dev, u32 which,
3983                 u64 *snap_size, u64 *snap_features)
3984 {
3985         u64 snap_id;
3986         u8 order;
3987         int ret;
3988
3989         snap_id = rbd_dev->header.snapc->snaps[which];
3990         ret = _rbd_dev_v2_snap_size(rbd_dev, snap_id, &order, snap_size);
3991         if (ret)
3992                 return ERR_PTR(ret);
3993         ret = _rbd_dev_v2_snap_features(rbd_dev, snap_id, snap_features);
3994         if (ret)
3995                 return ERR_PTR(ret);
3996
3997         return rbd_dev_v2_snap_name(rbd_dev, which);
3998 }
3999
4000 static char *rbd_dev_snap_info(struct rbd_device *rbd_dev, u32 which,
4001                 u64 *snap_size, u64 *snap_features)
4002 {
4003         if (rbd_dev->image_format == 1)
4004                 return rbd_dev_v1_snap_info(rbd_dev, which,
4005                                         snap_size, snap_features);
4006         if (rbd_dev->image_format == 2)
4007                 return rbd_dev_v2_snap_info(rbd_dev, which,
4008                                         snap_size, snap_features);
4009         return ERR_PTR(-EINVAL);
4010 }
4011
4012 static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
4013 {
4014         int ret;
4015         __u8 obj_order;
4016
4017         down_write(&rbd_dev->header_rwsem);
4018
4019         /* Grab old order first, to see if it changes */
4020
4021         obj_order = rbd_dev->header.obj_order,
4022         ret = rbd_dev_v2_image_size(rbd_dev);
4023         if (ret)
4024                 goto out;
4025         if (rbd_dev->header.obj_order != obj_order) {
4026                 ret = -EIO;
4027                 goto out;
4028         }
4029         rbd_update_mapping_size(rbd_dev);
4030
4031         ret = rbd_dev_v2_snap_context(rbd_dev, hver);
4032         dout("rbd_dev_v2_snap_context returned %d\n", ret);
4033         if (ret)
4034                 goto out;
4035         ret = rbd_dev_snaps_update(rbd_dev);
4036         dout("rbd_dev_snaps_update returned %d\n", ret);
4037         if (ret)
4038                 goto out;
4039         ret = rbd_dev_snaps_register(rbd_dev);
4040         dout("rbd_dev_snaps_register returned %d\n", ret);
4041 out:
4042         up_write(&rbd_dev->header_rwsem);
4043
4044         return ret;
4045 }
4046
4047 /*
4048  * Scan the rbd device's current snapshot list and compare it to the
4049  * newly-received snapshot context.  Remove any existing snapshots
4050  * not present in the new snapshot context.  Add a new snapshot for
4051  * any snaphots in the snapshot context not in the current list.
4052  * And verify there are no changes to snapshots we already know
4053  * about.
4054  *
4055  * Assumes the snapshots in the snapshot context are sorted by
4056  * snapshot id, highest id first.  (Snapshots in the rbd_dev's list
4057  * are also maintained in that order.)
4058  */
4059 static int rbd_dev_snaps_update(struct rbd_device *rbd_dev)
4060 {
4061         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
4062         const u32 snap_count = snapc->num_snaps;
4063         struct list_head *head = &rbd_dev->snaps;
4064         struct list_head *links = head->next;
4065         u32 index = 0;
4066
4067         dout("%s: snap count is %u\n", __func__, (unsigned int) snap_count);
4068         while (index < snap_count || links != head) {
4069                 u64 snap_id;
4070                 struct rbd_snap *snap;
4071                 char *snap_name;
4072                 u64 snap_size = 0;
4073                 u64 snap_features = 0;
4074
4075                 snap_id = index < snap_count ? snapc->snaps[index]
4076                                              : CEPH_NOSNAP;
4077                 snap = links != head ? list_entry(links, struct rbd_snap, node)
4078                                      : NULL;
4079                 rbd_assert(!snap || snap->id != CEPH_NOSNAP);
4080
4081                 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
4082                         struct list_head *next = links->next;
4083
4084                         /*
4085                          * A previously-existing snapshot is not in
4086                          * the new snap context.
4087                          *
4088                          * If the now missing snapshot is the one the
4089                          * image is mapped to, clear its exists flag
4090                          * so we can avoid sending any more requests
4091                          * to it.
4092                          */
4093                         if (rbd_dev->spec->snap_id == snap->id)
4094                                 clear_bit(RBD_DEV_FLAG_EXISTS, &rbd_dev->flags);
4095                         rbd_remove_snap_dev(snap);
4096                         dout("%ssnap id %llu has been removed\n",
4097                                 rbd_dev->spec->snap_id == snap->id ?
4098                                                         "mapped " : "",
4099                                 (unsigned long long) snap->id);
4100
4101                         /* Done with this list entry; advance */
4102
4103                         links = next;
4104                         continue;
4105                 }
4106
4107                 snap_name = rbd_dev_snap_info(rbd_dev, index,
4108                                         &snap_size, &snap_features);
4109                 if (IS_ERR(snap_name))
4110                         return PTR_ERR(snap_name);
4111
4112                 dout("entry %u: snap_id = %llu\n", (unsigned int) snap_count,
4113                         (unsigned long long) snap_id);
4114                 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
4115                         struct rbd_snap *new_snap;
4116
4117                         /* We haven't seen this snapshot before */
4118
4119                         new_snap = __rbd_add_snap_dev(rbd_dev, snap_name,
4120                                         snap_id, snap_size, snap_features);
4121                         if (IS_ERR(new_snap)) {
4122                                 int err = PTR_ERR(new_snap);
4123
4124                                 dout("  failed to add dev, error %d\n", err);
4125
4126                                 return err;
4127                         }
4128
4129                         /* New goes before existing, or at end of list */
4130
4131                         dout("  added dev%s\n", snap ? "" : " at end\n");
4132                         if (snap)
4133                                 list_add_tail(&new_snap->node, &snap->node);
4134                         else
4135                                 list_add_tail(&new_snap->node, head);
4136                 } else {
4137                         /* Already have this one */
4138
4139                         dout("  already present\n");
4140
4141                         rbd_assert(snap->size == snap_size);
4142                         rbd_assert(!strcmp(snap->name, snap_name));
4143                         rbd_assert(snap->features == snap_features);
4144
4145                         /* Done with this list entry; advance */
4146
4147                         links = links->next;
4148                 }
4149
4150                 /* Advance to the next entry in the snapshot context */
4151
4152                 index++;
4153         }
4154         dout("%s: done\n", __func__);
4155
4156         return 0;
4157 }
4158
4159 /*
4160  * Scan the list of snapshots and register the devices for any that
4161  * have not already been registered.
4162  */
4163 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev)
4164 {
4165         struct rbd_snap *snap;
4166         int ret = 0;
4167
4168         dout("%s:\n", __func__);
4169         if (WARN_ON(!device_is_registered(&rbd_dev->dev)))
4170                 return -EIO;
4171
4172         list_for_each_entry(snap, &rbd_dev->snaps, node) {
4173                 if (!rbd_snap_registered(snap)) {
4174                         ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
4175                         if (ret < 0)
4176                                 break;
4177                 }
4178         }
4179         dout("%s: returning %d\n", __func__, ret);
4180
4181         return ret;
4182 }
4183
4184 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
4185 {
4186         struct device *dev;
4187         int ret;
4188
4189         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4190
4191         dev = &rbd_dev->dev;
4192         dev->bus = &rbd_bus_type;
4193         dev->type = &rbd_device_type;
4194         dev->parent = &rbd_root_dev;
4195         dev->release = rbd_dev_release;
4196         dev_set_name(dev, "%d", rbd_dev->dev_id);
4197         ret = device_register(dev);
4198
4199         mutex_unlock(&ctl_mutex);
4200
4201         return ret;
4202 }
4203
4204 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
4205 {
4206         device_unregister(&rbd_dev->dev);
4207 }
4208
4209 static atomic64_t rbd_dev_id_max = ATOMIC64_INIT(0);
4210
4211 /*
4212  * Get a unique rbd identifier for the given new rbd_dev, and add
4213  * the rbd_dev to the global list.  The minimum rbd id is 1.
4214  */
4215 static void rbd_dev_id_get(struct rbd_device *rbd_dev)
4216 {
4217         rbd_dev->dev_id = atomic64_inc_return(&rbd_dev_id_max);
4218
4219         spin_lock(&rbd_dev_list_lock);
4220         list_add_tail(&rbd_dev->node, &rbd_dev_list);
4221         spin_unlock(&rbd_dev_list_lock);
4222         dout("rbd_dev %p given dev id %llu\n", rbd_dev,
4223                 (unsigned long long) rbd_dev->dev_id);
4224 }
4225
4226 /*
4227  * Remove an rbd_dev from the global list, and record that its
4228  * identifier is no longer in use.
4229  */
4230 static void rbd_dev_id_put(struct rbd_device *rbd_dev)
4231 {
4232         struct list_head *tmp;
4233         int rbd_id = rbd_dev->dev_id;
4234         int max_id;
4235
4236         rbd_assert(rbd_id > 0);
4237
4238         dout("rbd_dev %p released dev id %llu\n", rbd_dev,
4239                 (unsigned long long) rbd_dev->dev_id);
4240         spin_lock(&rbd_dev_list_lock);
4241         list_del_init(&rbd_dev->node);
4242
4243         /*
4244          * If the id being "put" is not the current maximum, there
4245          * is nothing special we need to do.
4246          */
4247         if (rbd_id != atomic64_read(&rbd_dev_id_max)) {
4248                 spin_unlock(&rbd_dev_list_lock);
4249                 return;
4250         }
4251
4252         /*
4253          * We need to update the current maximum id.  Search the
4254          * list to find out what it is.  We're more likely to find
4255          * the maximum at the end, so search the list backward.
4256          */
4257         max_id = 0;
4258         list_for_each_prev(tmp, &rbd_dev_list) {
4259                 struct rbd_device *rbd_dev;
4260
4261                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4262                 if (rbd_dev->dev_id > max_id)
4263                         max_id = rbd_dev->dev_id;
4264         }
4265         spin_unlock(&rbd_dev_list_lock);
4266
4267         /*
4268          * The max id could have been updated by rbd_dev_id_get(), in
4269          * which case it now accurately reflects the new maximum.
4270          * Be careful not to overwrite the maximum value in that
4271          * case.
4272          */
4273         atomic64_cmpxchg(&rbd_dev_id_max, rbd_id, max_id);
4274         dout("  max dev id has been reset\n");
4275 }
4276
4277 /*
4278  * Skips over white space at *buf, and updates *buf to point to the
4279  * first found non-space character (if any). Returns the length of
4280  * the token (string of non-white space characters) found.  Note
4281  * that *buf must be terminated with '\0'.
4282  */
4283 static inline size_t next_token(const char **buf)
4284 {
4285         /*
4286         * These are the characters that produce nonzero for
4287         * isspace() in the "C" and "POSIX" locales.
4288         */
4289         const char *spaces = " \f\n\r\t\v";
4290
4291         *buf += strspn(*buf, spaces);   /* Find start of token */
4292
4293         return strcspn(*buf, spaces);   /* Return token length */
4294 }
4295
4296 /*
4297  * Finds the next token in *buf, and if the provided token buffer is
4298  * big enough, copies the found token into it.  The result, if
4299  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
4300  * must be terminated with '\0' on entry.
4301  *
4302  * Returns the length of the token found (not including the '\0').
4303  * Return value will be 0 if no token is found, and it will be >=
4304  * token_size if the token would not fit.
4305  *
4306  * The *buf pointer will be updated to point beyond the end of the
4307  * found token.  Note that this occurs even if the token buffer is
4308  * too small to hold it.
4309  */
4310 static inline size_t copy_token(const char **buf,
4311                                 char *token,
4312                                 size_t token_size)
4313 {
4314         size_t len;
4315
4316         len = next_token(buf);
4317         if (len < token_size) {
4318                 memcpy(token, *buf, len);
4319                 *(token + len) = '\0';
4320         }
4321         *buf += len;
4322
4323         return len;
4324 }
4325
4326 /*
4327  * Finds the next token in *buf, dynamically allocates a buffer big
4328  * enough to hold a copy of it, and copies the token into the new
4329  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
4330  * that a duplicate buffer is created even for a zero-length token.
4331  *
4332  * Returns a pointer to the newly-allocated duplicate, or a null
4333  * pointer if memory for the duplicate was not available.  If
4334  * the lenp argument is a non-null pointer, the length of the token
4335  * (not including the '\0') is returned in *lenp.
4336  *
4337  * If successful, the *buf pointer will be updated to point beyond
4338  * the end of the found token.
4339  *
4340  * Note: uses GFP_KERNEL for allocation.
4341  */
4342 static inline char *dup_token(const char **buf, size_t *lenp)
4343 {
4344         char *dup;
4345         size_t len;
4346
4347         len = next_token(buf);
4348         dup = kmemdup(*buf, len + 1, GFP_KERNEL);
4349         if (!dup)
4350                 return NULL;
4351         *(dup + len) = '\0';
4352         *buf += len;
4353
4354         if (lenp)
4355                 *lenp = len;
4356
4357         return dup;
4358 }
4359
4360 /*
4361  * Parse the options provided for an "rbd add" (i.e., rbd image
4362  * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
4363  * and the data written is passed here via a NUL-terminated buffer.
4364  * Returns 0 if successful or an error code otherwise.
4365  *
4366  * The information extracted from these options is recorded in
4367  * the other parameters which return dynamically-allocated
4368  * structures:
4369  *  ceph_opts
4370  *      The address of a pointer that will refer to a ceph options
4371  *      structure.  Caller must release the returned pointer using
4372  *      ceph_destroy_options() when it is no longer needed.
4373  *  rbd_opts
4374  *      Address of an rbd options pointer.  Fully initialized by
4375  *      this function; caller must release with kfree().
4376  *  spec
4377  *      Address of an rbd image specification pointer.  Fully
4378  *      initialized by this function based on parsed options.
4379  *      Caller must release with rbd_spec_put().
4380  *
4381  * The options passed take this form:
4382  *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
4383  * where:
4384  *  <mon_addrs>
4385  *      A comma-separated list of one or more monitor addresses.
4386  *      A monitor address is an ip address, optionally followed
4387  *      by a port number (separated by a colon).
4388  *        I.e.:  ip1[:port1][,ip2[:port2]...]
4389  *  <options>
4390  *      A comma-separated list of ceph and/or rbd options.
4391  *  <pool_name>
4392  *      The name of the rados pool containing the rbd image.
4393  *  <image_name>
4394  *      The name of the image in that pool to map.
4395  *  <snap_id>
4396  *      An optional snapshot id.  If provided, the mapping will
4397  *      present data from the image at the time that snapshot was
4398  *      created.  The image head is used if no snapshot id is
4399  *      provided.  Snapshot mappings are always read-only.
4400  */
4401 static int rbd_add_parse_args(const char *buf,
4402                                 struct ceph_options **ceph_opts,
4403                                 struct rbd_options **opts,
4404                                 struct rbd_spec **rbd_spec)
4405 {
4406         size_t len;
4407         char *options;
4408         const char *mon_addrs;
4409         size_t mon_addrs_size;
4410         struct rbd_spec *spec = NULL;
4411         struct rbd_options *rbd_opts = NULL;
4412         struct ceph_options *copts;
4413         int ret;
4414
4415         /* The first four tokens are required */
4416
4417         len = next_token(&buf);
4418         if (!len) {
4419                 rbd_warn(NULL, "no monitor address(es) provided");
4420                 return -EINVAL;
4421         }
4422         mon_addrs = buf;
4423         mon_addrs_size = len + 1;
4424         buf += len;
4425
4426         ret = -EINVAL;
4427         options = dup_token(&buf, NULL);
4428         if (!options)
4429                 return -ENOMEM;
4430         if (!*options) {
4431                 rbd_warn(NULL, "no options provided");
4432                 goto out_err;
4433         }
4434
4435         spec = rbd_spec_alloc();
4436         if (!spec)
4437                 goto out_mem;
4438
4439         spec->pool_name = dup_token(&buf, NULL);
4440         if (!spec->pool_name)
4441                 goto out_mem;
4442         if (!*spec->pool_name) {
4443                 rbd_warn(NULL, "no pool name provided");
4444                 goto out_err;
4445         }
4446
4447         spec->image_name = dup_token(&buf, NULL);
4448         if (!spec->image_name)
4449                 goto out_mem;
4450         if (!*spec->image_name) {
4451                 rbd_warn(NULL, "no image name provided");
4452                 goto out_err;
4453         }
4454
4455         /*
4456          * Snapshot name is optional; default is to use "-"
4457          * (indicating the head/no snapshot).
4458          */
4459         len = next_token(&buf);
4460         if (!len) {
4461                 buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
4462                 len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
4463         } else if (len > RBD_MAX_SNAP_NAME_LEN) {
4464                 ret = -ENAMETOOLONG;
4465                 goto out_err;
4466         }
4467         spec->snap_name = kmemdup(buf, len + 1, GFP_KERNEL);
4468         if (!spec->snap_name)
4469                 goto out_mem;
4470         *(spec->snap_name + len) = '\0';
4471
4472         /* Initialize all rbd options to the defaults */
4473
4474         rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
4475         if (!rbd_opts)
4476                 goto out_mem;
4477
4478         rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
4479
4480         copts = ceph_parse_options(options, mon_addrs,
4481                                         mon_addrs + mon_addrs_size - 1,
4482                                         parse_rbd_opts_token, rbd_opts);
4483         if (IS_ERR(copts)) {
4484                 ret = PTR_ERR(copts);
4485                 goto out_err;
4486         }
4487         kfree(options);
4488
4489         *ceph_opts = copts;
4490         *opts = rbd_opts;
4491         *rbd_spec = spec;
4492
4493         return 0;
4494 out_mem:
4495         ret = -ENOMEM;
4496 out_err:
4497         kfree(rbd_opts);
4498         rbd_spec_put(spec);
4499         kfree(options);
4500
4501         return ret;
4502 }
4503
4504 /*
4505  * An rbd format 2 image has a unique identifier, distinct from the
4506  * name given to it by the user.  Internally, that identifier is
4507  * what's used to specify the names of objects related to the image.
4508  *
4509  * A special "rbd id" object is used to map an rbd image name to its
4510  * id.  If that object doesn't exist, then there is no v2 rbd image
4511  * with the supplied name.
4512  *
4513  * This function will record the given rbd_dev's image_id field if
4514  * it can be determined, and in that case will return 0.  If any
4515  * errors occur a negative errno will be returned and the rbd_dev's
4516  * image_id field will be unchanged (and should be NULL).
4517  */
4518 static int rbd_dev_image_id(struct rbd_device *rbd_dev)
4519 {
4520         int ret;
4521         size_t size;
4522         char *object_name;
4523         void *response;
4524         void *p;
4525
4526         /* If we already have it we don't need to look it up */
4527
4528         if (rbd_dev->spec->image_id)
4529                 return 0;
4530
4531         /*
4532          * When probing a parent image, the image id is already
4533          * known (and the image name likely is not).  There's no
4534          * need to fetch the image id again in this case.
4535          */
4536         if (rbd_dev->spec->image_id)
4537                 return 0;
4538
4539         /*
4540          * First, see if the format 2 image id file exists, and if
4541          * so, get the image's persistent id from it.
4542          */
4543         size = sizeof (RBD_ID_PREFIX) + strlen(rbd_dev->spec->image_name);
4544         object_name = kmalloc(size, GFP_NOIO);
4545         if (!object_name)
4546                 return -ENOMEM;
4547         sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
4548         dout("rbd id object name is %s\n", object_name);
4549
4550         /* Response will be an encoded string, which includes a length */
4551
4552         size = sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX;
4553         response = kzalloc(size, GFP_NOIO);
4554         if (!response) {
4555                 ret = -ENOMEM;
4556                 goto out;
4557         }
4558
4559         ret = rbd_obj_method_sync(rbd_dev, object_name,
4560                                 "rbd", "get_id", NULL, 0,
4561                                 response, RBD_IMAGE_ID_LEN_MAX, NULL);
4562         dout("%s: rbd_obj_method_sync returned %d\n", __func__, ret);
4563         if (ret < 0)
4564                 goto out;
4565
4566         p = response;
4567         rbd_dev->spec->image_id = ceph_extract_encoded_string(&p,
4568                                                 p + ret,
4569                                                 NULL, GFP_NOIO);
4570         ret = 0;
4571
4572         if (IS_ERR(rbd_dev->spec->image_id)) {
4573                 ret = PTR_ERR(rbd_dev->spec->image_id);
4574                 rbd_dev->spec->image_id = NULL;
4575         } else {
4576                 dout("image_id is %s\n", rbd_dev->spec->image_id);
4577         }
4578 out:
4579         kfree(response);
4580         kfree(object_name);
4581
4582         return ret;
4583 }
4584
4585 static int rbd_dev_v1_probe(struct rbd_device *rbd_dev)
4586 {
4587         int ret;
4588         size_t size;
4589
4590         /* Version 1 images have no id; empty string is used */
4591
4592         rbd_dev->spec->image_id = kstrdup("", GFP_KERNEL);
4593         if (!rbd_dev->spec->image_id)
4594                 return -ENOMEM;
4595
4596         /* Record the header object name for this rbd image. */
4597
4598         size = strlen(rbd_dev->spec->image_name) + sizeof (RBD_SUFFIX);
4599         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4600         if (!rbd_dev->header_name) {
4601                 ret = -ENOMEM;
4602                 goto out_err;
4603         }
4604         sprintf(rbd_dev->header_name, "%s%s",
4605                 rbd_dev->spec->image_name, RBD_SUFFIX);
4606
4607         /* Populate rbd image metadata */
4608
4609         ret = rbd_read_header(rbd_dev, &rbd_dev->header);
4610         if (ret < 0)
4611                 goto out_err;
4612
4613         /* Version 1 images have no parent (no layering) */
4614
4615         rbd_dev->parent_spec = NULL;
4616         rbd_dev->parent_overlap = 0;
4617
4618         rbd_dev->image_format = 1;
4619
4620         dout("discovered version 1 image, header name is %s\n",
4621                 rbd_dev->header_name);
4622
4623         return 0;
4624
4625 out_err:
4626         kfree(rbd_dev->header_name);
4627         rbd_dev->header_name = NULL;
4628         kfree(rbd_dev->spec->image_id);
4629         rbd_dev->spec->image_id = NULL;
4630
4631         return ret;
4632 }
4633
4634 static int rbd_dev_v2_probe(struct rbd_device *rbd_dev)
4635 {
4636         size_t size;
4637         int ret;
4638         u64 ver = 0;
4639
4640         /*
4641          * Image id was filled in by the caller.  Record the header
4642          * object name for this rbd image.
4643          */
4644         size = sizeof (RBD_HEADER_PREFIX) + strlen(rbd_dev->spec->image_id);
4645         rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
4646         if (!rbd_dev->header_name)
4647                 return -ENOMEM;
4648         sprintf(rbd_dev->header_name, "%s%s",
4649                         RBD_HEADER_PREFIX, rbd_dev->spec->image_id);
4650
4651         /* Get the size and object order for the image */
4652         ret = rbd_dev_v2_image_size(rbd_dev);
4653         if (ret)
4654                 goto out_err;
4655
4656         /* Get the object prefix (a.k.a. block_name) for the image */
4657
4658         ret = rbd_dev_v2_object_prefix(rbd_dev);
4659         if (ret)
4660                 goto out_err;
4661
4662         /* Get the and check features for the image */
4663
4664         ret = rbd_dev_v2_features(rbd_dev);
4665         if (ret)
4666                 goto out_err;
4667
4668         /* If the image supports layering, get the parent info */
4669
4670         if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
4671                 ret = rbd_dev_v2_parent_info(rbd_dev);
4672                 if (ret)
4673                         goto out_err;
4674         }
4675
4676         /* crypto and compression type aren't (yet) supported for v2 images */
4677
4678         rbd_dev->header.crypt_type = 0;
4679         rbd_dev->header.comp_type = 0;
4680
4681         /* Get the snapshot context, plus the header version */
4682
4683         ret = rbd_dev_v2_snap_context(rbd_dev, &ver);
4684         if (ret)
4685                 goto out_err;
4686         rbd_dev->header.obj_version = ver;
4687
4688         rbd_dev->image_format = 2;
4689
4690         dout("discovered version 2 image, header name is %s\n",
4691                 rbd_dev->header_name);
4692
4693         return 0;
4694 out_err:
4695         rbd_dev->parent_overlap = 0;
4696         rbd_spec_put(rbd_dev->parent_spec);
4697         rbd_dev->parent_spec = NULL;
4698         kfree(rbd_dev->header_name);
4699         rbd_dev->header_name = NULL;
4700         kfree(rbd_dev->header.object_prefix);
4701         rbd_dev->header.object_prefix = NULL;
4702
4703         return ret;
4704 }
4705
4706 static int rbd_dev_probe_finish(struct rbd_device *rbd_dev)
4707 {
4708         struct rbd_device *parent = NULL;
4709         struct rbd_spec *parent_spec = NULL;
4710         struct rbd_client *rbdc = NULL;
4711         int ret;
4712
4713         /* no need to lock here, as rbd_dev is not registered yet */
4714         ret = rbd_dev_snaps_update(rbd_dev);
4715         if (ret)
4716                 return ret;
4717
4718         ret = rbd_dev_probe_update_spec(rbd_dev);
4719         if (ret)
4720                 goto err_out_snaps;
4721
4722         ret = rbd_dev_set_mapping(rbd_dev);
4723         if (ret)
4724                 goto err_out_snaps;
4725
4726         /* generate unique id: find highest unique id, add one */
4727         rbd_dev_id_get(rbd_dev);
4728
4729         /* Fill in the device name, now that we have its id. */
4730         BUILD_BUG_ON(DEV_NAME_LEN
4731                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
4732         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
4733
4734         /* Get our block major device number. */
4735
4736         ret = register_blkdev(0, rbd_dev->name);
4737         if (ret < 0)
4738                 goto err_out_id;
4739         rbd_dev->major = ret;
4740
4741         /* Set up the blkdev mapping. */
4742
4743         ret = rbd_init_disk(rbd_dev);
4744         if (ret)
4745                 goto err_out_blkdev;
4746
4747         ret = rbd_bus_add_dev(rbd_dev);
4748         if (ret)
4749                 goto err_out_disk;
4750
4751         /*
4752          * At this point cleanup in the event of an error is the job
4753          * of the sysfs code (initiated by rbd_bus_del_dev()).
4754          */
4755         /* Probe the parent if there is one */
4756
4757         if (rbd_dev->parent_spec) {
4758                 /*
4759                  * We need to pass a reference to the client and the
4760                  * parent spec when creating the parent rbd_dev.
4761                  * Images related by parent/child relationships
4762                  * always share both.
4763                  */
4764                 parent_spec = rbd_spec_get(rbd_dev->parent_spec);
4765                 rbdc = __rbd_get_client(rbd_dev->rbd_client);
4766
4767                 parent = rbd_dev_create(rbdc, parent_spec);
4768                 if (!parent) {
4769                         ret = -ENOMEM;
4770                         goto err_out_spec;
4771                 }
4772                 rbdc = NULL;            /* parent now owns reference */
4773                 parent_spec = NULL;     /* parent now owns reference */
4774                 ret = rbd_dev_probe(parent);
4775                 if (ret < 0)
4776                         goto err_out_parent;
4777                 rbd_dev->parent = parent;
4778         }
4779
4780         down_write(&rbd_dev->header_rwsem);
4781         ret = rbd_dev_snaps_register(rbd_dev);
4782         up_write(&rbd_dev->header_rwsem);
4783         if (ret)
4784                 goto err_out_bus;
4785
4786         ret = rbd_dev_header_watch_sync(rbd_dev, 1);
4787         if (ret)
4788                 goto err_out_bus;
4789
4790         /* Everything's ready.  Announce the disk to the world. */
4791
4792         add_disk(rbd_dev->disk);
4793
4794         pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
4795                 (unsigned long long) rbd_dev->mapping.size);
4796
4797         return ret;
4798
4799 err_out_parent:
4800         rbd_dev_destroy(parent);
4801 err_out_spec:
4802         rbd_spec_put(parent_spec);
4803         rbd_put_client(rbdc);
4804 err_out_bus:
4805         /* this will also clean up rest of rbd_dev stuff */
4806
4807         rbd_bus_del_dev(rbd_dev);
4808
4809         return ret;
4810 err_out_disk:
4811         rbd_free_disk(rbd_dev);
4812 err_out_blkdev:
4813         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4814 err_out_id:
4815         rbd_dev_id_put(rbd_dev);
4816 err_out_snaps:
4817         rbd_remove_all_snaps(rbd_dev);
4818
4819         return ret;
4820 }
4821
4822 /*
4823  * Probe for the existence of the header object for the given rbd
4824  * device.  For format 2 images this includes determining the image
4825  * id.
4826  */
4827 static int rbd_dev_probe(struct rbd_device *rbd_dev)
4828 {
4829         int ret;
4830
4831         /*
4832          * Get the id from the image id object.  If it's not a
4833          * format 2 image, we'll get ENOENT back, and we'll assume
4834          * it's a format 1 image.
4835          */
4836         ret = rbd_dev_image_id(rbd_dev);
4837         if (ret)
4838                 ret = rbd_dev_v1_probe(rbd_dev);
4839         else
4840                 ret = rbd_dev_v2_probe(rbd_dev);
4841         if (ret) {
4842                 dout("probe failed, returning %d\n", ret);
4843
4844                 return ret;
4845         }
4846
4847         ret = rbd_dev_probe_finish(rbd_dev);
4848         if (ret)
4849                 rbd_header_free(&rbd_dev->header);
4850
4851         return ret;
4852 }
4853
4854 static ssize_t rbd_add(struct bus_type *bus,
4855                        const char *buf,
4856                        size_t count)
4857 {
4858         struct rbd_device *rbd_dev = NULL;
4859         struct ceph_options *ceph_opts = NULL;
4860         struct rbd_options *rbd_opts = NULL;
4861         struct rbd_spec *spec = NULL;
4862         struct rbd_client *rbdc;
4863         struct ceph_osd_client *osdc;
4864         int rc = -ENOMEM;
4865
4866         if (!try_module_get(THIS_MODULE))
4867                 return -ENODEV;
4868
4869         /* parse add command */
4870         rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
4871         if (rc < 0)
4872                 goto err_out_module;
4873
4874         rbdc = rbd_get_client(ceph_opts);
4875         if (IS_ERR(rbdc)) {
4876                 rc = PTR_ERR(rbdc);
4877                 goto err_out_args;
4878         }
4879         ceph_opts = NULL;       /* rbd_dev client now owns this */
4880
4881         /* pick the pool */
4882         osdc = &rbdc->client->osdc;
4883         rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
4884         if (rc < 0)
4885                 goto err_out_client;
4886         spec->pool_id = (u64) rc;
4887
4888         /* The ceph file layout needs to fit pool id in 32 bits */
4889
4890         if (WARN_ON(spec->pool_id > (u64) U32_MAX)) {
4891                 rc = -EIO;
4892                 goto err_out_client;
4893         }
4894
4895         rbd_dev = rbd_dev_create(rbdc, spec);
4896         if (!rbd_dev)
4897                 goto err_out_client;
4898         rbdc = NULL;            /* rbd_dev now owns this */
4899         spec = NULL;            /* rbd_dev now owns this */
4900
4901         rbd_dev->mapping.read_only = rbd_opts->read_only;
4902         kfree(rbd_opts);
4903         rbd_opts = NULL;        /* done with this */
4904
4905         rc = rbd_dev_probe(rbd_dev);
4906         if (rc < 0)
4907                 goto err_out_rbd_dev;
4908
4909         return count;
4910 err_out_rbd_dev:
4911         rbd_dev_destroy(rbd_dev);
4912 err_out_client:
4913         rbd_put_client(rbdc);
4914 err_out_args:
4915         if (ceph_opts)
4916                 ceph_destroy_options(ceph_opts);
4917         kfree(rbd_opts);
4918         rbd_spec_put(spec);
4919 err_out_module:
4920         module_put(THIS_MODULE);
4921
4922         dout("Error adding device %s\n", buf);
4923
4924         return (ssize_t) rc;
4925 }
4926
4927 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
4928 {
4929         struct list_head *tmp;
4930         struct rbd_device *rbd_dev;
4931
4932         spin_lock(&rbd_dev_list_lock);
4933         list_for_each(tmp, &rbd_dev_list) {
4934                 rbd_dev = list_entry(tmp, struct rbd_device, node);
4935                 if (rbd_dev->dev_id == dev_id) {
4936                         spin_unlock(&rbd_dev_list_lock);
4937                         return rbd_dev;
4938                 }
4939         }
4940         spin_unlock(&rbd_dev_list_lock);
4941         return NULL;
4942 }
4943
4944 static void rbd_dev_release(struct device *dev)
4945 {
4946         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
4947
4948         if (rbd_dev->watch_event)
4949                 rbd_dev_header_watch_sync(rbd_dev, 0);
4950
4951         /* clean up and free blkdev */
4952         rbd_free_disk(rbd_dev);
4953         unregister_blkdev(rbd_dev->major, rbd_dev->name);
4954
4955         /* release allocated disk header fields */
4956         rbd_header_free(&rbd_dev->header);
4957
4958         /* done with the id, and with the rbd_dev */
4959         rbd_dev_id_put(rbd_dev);
4960         rbd_assert(rbd_dev->rbd_client != NULL);
4961         rbd_dev_destroy(rbd_dev);
4962
4963         /* release module ref */
4964         module_put(THIS_MODULE);
4965 }
4966
4967 static void __rbd_remove(struct rbd_device *rbd_dev)
4968 {
4969         rbd_remove_all_snaps(rbd_dev);
4970         rbd_bus_del_dev(rbd_dev);
4971 }
4972
4973 static ssize_t rbd_remove(struct bus_type *bus,
4974                           const char *buf,
4975                           size_t count)
4976 {
4977         struct rbd_device *rbd_dev = NULL;
4978         int target_id, rc;
4979         unsigned long ul;
4980         int ret = count;
4981
4982         rc = strict_strtoul(buf, 10, &ul);
4983         if (rc)
4984                 return rc;
4985
4986         /* convert to int; abort if we lost anything in the conversion */
4987         target_id = (int) ul;
4988         if (target_id != ul)
4989                 return -EINVAL;
4990
4991         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
4992
4993         rbd_dev = __rbd_get_dev(target_id);
4994         if (!rbd_dev) {
4995                 ret = -ENOENT;
4996                 goto done;
4997         }
4998
4999         spin_lock_irq(&rbd_dev->lock);
5000         if (rbd_dev->open_count)
5001                 ret = -EBUSY;
5002         else
5003                 set_bit(RBD_DEV_FLAG_REMOVING, &rbd_dev->flags);
5004         spin_unlock_irq(&rbd_dev->lock);
5005         if (ret < 0)
5006                 goto done;
5007
5008         while (rbd_dev->parent_spec) {
5009                 struct rbd_device *first = rbd_dev;
5010                 struct rbd_device *second = first->parent;
5011                 struct rbd_device *third;
5012
5013                 /*
5014                  * Follow to the parent with no grandparent and
5015                  * remove it.
5016                  */
5017                 while (second && (third = second->parent)) {
5018                         first = second;
5019                         second = third;
5020                 }
5021                 __rbd_remove(second);
5022                 rbd_spec_put(first->parent_spec);
5023                 first->parent_spec = NULL;
5024                 first->parent_overlap = 0;
5025                 first->parent = NULL;
5026         }
5027         __rbd_remove(rbd_dev);
5028
5029 done:
5030         mutex_unlock(&ctl_mutex);
5031
5032         return ret;
5033 }
5034
5035 /*
5036  * create control files in sysfs
5037  * /sys/bus/rbd/...
5038  */
5039 static int rbd_sysfs_init(void)
5040 {
5041         int ret;
5042
5043         ret = device_register(&rbd_root_dev);
5044         if (ret < 0)
5045                 return ret;
5046
5047         ret = bus_register(&rbd_bus_type);
5048         if (ret < 0)
5049                 device_unregister(&rbd_root_dev);
5050
5051         return ret;
5052 }
5053
5054 static void rbd_sysfs_cleanup(void)
5055 {
5056         bus_unregister(&rbd_bus_type);
5057         device_unregister(&rbd_root_dev);
5058 }
5059
5060 static int __init rbd_init(void)
5061 {
5062         int rc;
5063
5064         if (!libceph_compatible(NULL)) {
5065                 rbd_warn(NULL, "libceph incompatibility (quitting)");
5066
5067                 return -EINVAL;
5068         }
5069         rc = rbd_sysfs_init();
5070         if (rc)
5071                 return rc;
5072         pr_info("loaded " RBD_DRV_NAME_LONG "\n");
5073         return 0;
5074 }
5075
5076 static void __exit rbd_exit(void)
5077 {
5078         rbd_sysfs_cleanup();
5079 }
5080
5081 module_init(rbd_init);
5082 module_exit(rbd_exit);
5083
5084 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
5085 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
5086 MODULE_DESCRIPTION("rados block device");
5087
5088 /* following authorship retained from original osdblk.c */
5089 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
5090
5091 MODULE_LICENSE("GPL");