rbd: use sizeof (object) instead of sizeof (type)
[cascardo/linux.git] / drivers / block / rbd.c
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3
4
5    based on drivers/block/osdblk.c:
6
7    Copyright 2009 Red Hat, Inc.
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24    For usage instructions, please refer to:
25
26                  Documentation/ABI/testing/sysfs-bus-rbd
27
28  */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 /*
45  * The basic unit of block I/O is a sector.  It is interpreted in a
46  * number of contexts in Linux (blk, bio, genhd), but the default is
47  * universally 512 bytes.  These symbols are just slightly more
48  * meaningful than the bare numbers they represent.
49  */
50 #define SECTOR_SHIFT    9
51 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
52
53 #define RBD_DRV_NAME "rbd"
54 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
55
56 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
57
58 #define RBD_MAX_SNAP_NAME_LEN   32
59 #define RBD_MAX_OPT_LEN         1024
60
61 #define RBD_SNAP_HEAD_NAME      "-"
62
63 /*
64  * An RBD device name will be "rbd#", where the "rbd" comes from
65  * RBD_DRV_NAME above, and # is a unique integer identifier.
66  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
67  * enough to hold all possible device names.
68  */
69 #define DEV_NAME_LEN            32
70 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
71
72 #define RBD_NOTIFY_TIMEOUT_DEFAULT 10
73
74 /*
75  * block device image metadata (in-memory version)
76  */
77 struct rbd_image_header {
78         u64 image_size;
79         char *object_prefix;
80         __u8 obj_order;
81         __u8 crypt_type;
82         __u8 comp_type;
83         struct ceph_snap_context *snapc;
84         u64 snap_names_len;
85         u32 total_snaps;
86
87         char *snap_names;
88         u64 *snap_sizes;
89
90         u64 obj_version;
91 };
92
93 struct rbd_options {
94         int     notify_timeout;
95 };
96
97 /*
98  * an instance of the client.  multiple devices may share an rbd client.
99  */
100 struct rbd_client {
101         struct ceph_client      *client;
102         struct rbd_options      *rbd_opts;
103         struct kref             kref;
104         struct list_head        node;
105 };
106
107 /*
108  * a request completion status
109  */
110 struct rbd_req_status {
111         int done;
112         int rc;
113         u64 bytes;
114 };
115
116 /*
117  * a collection of requests
118  */
119 struct rbd_req_coll {
120         int                     total;
121         int                     num_done;
122         struct kref             kref;
123         struct rbd_req_status   status[0];
124 };
125
126 /*
127  * a single io request
128  */
129 struct rbd_request {
130         struct request          *rq;            /* blk layer request */
131         struct bio              *bio;           /* cloned bio */
132         struct page             **pages;        /* list of used pages */
133         u64                     len;
134         int                     coll_index;
135         struct rbd_req_coll     *coll;
136 };
137
138 struct rbd_snap {
139         struct  device          dev;
140         const char              *name;
141         u64                     size;
142         struct list_head        node;
143         u64                     id;
144 };
145
146 /*
147  * a single device
148  */
149 struct rbd_device {
150         int                     dev_id;         /* blkdev unique id */
151
152         int                     major;          /* blkdev assigned major */
153         struct gendisk          *disk;          /* blkdev's gendisk and rq */
154         struct request_queue    *q;
155
156         struct rbd_client       *rbd_client;
157
158         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
159
160         spinlock_t              lock;           /* queue lock */
161
162         struct rbd_image_header header;
163         char                    *image_name;
164         size_t                  image_name_len;
165         char                    *header_name;
166         char                    *pool_name;
167         int                     pool_id;
168
169         struct ceph_osd_event   *watch_event;
170         struct ceph_osd_request *watch_request;
171
172         /* protects updating the header */
173         struct rw_semaphore     header_rwsem;
174         /* name of the snapshot this device reads from */
175         char                    *snap_name;
176         /* id of the snapshot this device reads from */
177         u64                     snap_id;        /* current snapshot id */
178         /* whether the snap_id this device reads from still exists */
179         bool                    snap_exists;
180         int                     read_only;
181
182         struct list_head        node;
183
184         /* list of snapshots */
185         struct list_head        snaps;
186
187         /* sysfs related */
188         struct device           dev;
189 };
190
191 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
192
193 static LIST_HEAD(rbd_dev_list);    /* devices */
194 static DEFINE_SPINLOCK(rbd_dev_list_lock);
195
196 static LIST_HEAD(rbd_client_list);              /* clients */
197 static DEFINE_SPINLOCK(rbd_client_list_lock);
198
199 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
200 static void rbd_dev_release(struct device *dev);
201 static ssize_t rbd_snap_add(struct device *dev,
202                             struct device_attribute *attr,
203                             const char *buf,
204                             size_t count);
205 static void __rbd_remove_snap_dev(struct rbd_snap *snap);
206
207 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
208                        size_t count);
209 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
210                           size_t count);
211
212 static struct bus_attribute rbd_bus_attrs[] = {
213         __ATTR(add, S_IWUSR, NULL, rbd_add),
214         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
215         __ATTR_NULL
216 };
217
218 static struct bus_type rbd_bus_type = {
219         .name           = "rbd",
220         .bus_attrs      = rbd_bus_attrs,
221 };
222
223 static void rbd_root_dev_release(struct device *dev)
224 {
225 }
226
227 static struct device rbd_root_dev = {
228         .init_name =    "rbd",
229         .release =      rbd_root_dev_release,
230 };
231
232
233 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
234 {
235         return get_device(&rbd_dev->dev);
236 }
237
238 static void rbd_put_dev(struct rbd_device *rbd_dev)
239 {
240         put_device(&rbd_dev->dev);
241 }
242
243 static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver);
244
245 static int rbd_open(struct block_device *bdev, fmode_t mode)
246 {
247         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
248
249         if ((mode & FMODE_WRITE) && rbd_dev->read_only)
250                 return -EROFS;
251
252         rbd_get_dev(rbd_dev);
253         set_device_ro(bdev, rbd_dev->read_only);
254
255         return 0;
256 }
257
258 static int rbd_release(struct gendisk *disk, fmode_t mode)
259 {
260         struct rbd_device *rbd_dev = disk->private_data;
261
262         rbd_put_dev(rbd_dev);
263
264         return 0;
265 }
266
267 static const struct block_device_operations rbd_bd_ops = {
268         .owner                  = THIS_MODULE,
269         .open                   = rbd_open,
270         .release                = rbd_release,
271 };
272
273 /*
274  * Initialize an rbd client instance.
275  * We own *ceph_opts.
276  */
277 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts,
278                                             struct rbd_options *rbd_opts)
279 {
280         struct rbd_client *rbdc;
281         int ret = -ENOMEM;
282
283         dout("rbd_client_create\n");
284         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
285         if (!rbdc)
286                 goto out_opt;
287
288         kref_init(&rbdc->kref);
289         INIT_LIST_HEAD(&rbdc->node);
290
291         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
292
293         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
294         if (IS_ERR(rbdc->client))
295                 goto out_mutex;
296         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
297
298         ret = ceph_open_session(rbdc->client);
299         if (ret < 0)
300                 goto out_err;
301
302         rbdc->rbd_opts = rbd_opts;
303
304         spin_lock(&rbd_client_list_lock);
305         list_add_tail(&rbdc->node, &rbd_client_list);
306         spin_unlock(&rbd_client_list_lock);
307
308         mutex_unlock(&ctl_mutex);
309
310         dout("rbd_client_create created %p\n", rbdc);
311         return rbdc;
312
313 out_err:
314         ceph_destroy_client(rbdc->client);
315 out_mutex:
316         mutex_unlock(&ctl_mutex);
317         kfree(rbdc);
318 out_opt:
319         if (ceph_opts)
320                 ceph_destroy_options(ceph_opts);
321         return ERR_PTR(ret);
322 }
323
324 /*
325  * Find a ceph client with specific addr and configuration.
326  */
327 static struct rbd_client *__rbd_client_find(struct ceph_options *ceph_opts)
328 {
329         struct rbd_client *client_node;
330
331         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
332                 return NULL;
333
334         list_for_each_entry(client_node, &rbd_client_list, node)
335                 if (!ceph_compare_options(ceph_opts, client_node->client))
336                         return client_node;
337         return NULL;
338 }
339
340 /*
341  * mount options
342  */
343 enum {
344         Opt_notify_timeout,
345         Opt_last_int,
346         /* int args above */
347         Opt_last_string,
348         /* string args above */
349 };
350
351 static match_table_t rbd_opts_tokens = {
352         {Opt_notify_timeout, "notify_timeout=%d"},
353         /* int args above */
354         /* string args above */
355         {-1, NULL}
356 };
357
358 static int parse_rbd_opts_token(char *c, void *private)
359 {
360         struct rbd_options *rbd_opts = private;
361         substring_t argstr[MAX_OPT_ARGS];
362         int token, intval, ret;
363
364         token = match_token(c, rbd_opts_tokens, argstr);
365         if (token < 0)
366                 return -EINVAL;
367
368         if (token < Opt_last_int) {
369                 ret = match_int(&argstr[0], &intval);
370                 if (ret < 0) {
371                         pr_err("bad mount option arg (not int) "
372                                "at '%s'\n", c);
373                         return ret;
374                 }
375                 dout("got int token %d val %d\n", token, intval);
376         } else if (token > Opt_last_int && token < Opt_last_string) {
377                 dout("got string token %d val %s\n", token,
378                      argstr[0].from);
379         } else {
380                 dout("got token %d\n", token);
381         }
382
383         switch (token) {
384         case Opt_notify_timeout:
385                 rbd_opts->notify_timeout = intval;
386                 break;
387         default:
388                 BUG_ON(token);
389         }
390         return 0;
391 }
392
393 /*
394  * Get a ceph client with specific addr and configuration, if one does
395  * not exist create it.
396  */
397 static struct rbd_client *rbd_get_client(const char *mon_addr,
398                                          size_t mon_addr_len,
399                                          char *options)
400 {
401         struct rbd_client *rbdc;
402         struct ceph_options *ceph_opts;
403         struct rbd_options *rbd_opts;
404
405         rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
406         if (!rbd_opts)
407                 return ERR_PTR(-ENOMEM);
408
409         rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
410
411         ceph_opts = ceph_parse_options(options, mon_addr,
412                                         mon_addr + mon_addr_len,
413                                         parse_rbd_opts_token, rbd_opts);
414         if (IS_ERR(ceph_opts)) {
415                 kfree(rbd_opts);
416                 return ERR_CAST(ceph_opts);
417         }
418
419         spin_lock(&rbd_client_list_lock);
420         rbdc = __rbd_client_find(ceph_opts);
421         if (rbdc) {
422                 /* using an existing client */
423                 kref_get(&rbdc->kref);
424                 spin_unlock(&rbd_client_list_lock);
425
426                 ceph_destroy_options(ceph_opts);
427                 kfree(rbd_opts);
428
429                 return rbdc;
430         }
431         spin_unlock(&rbd_client_list_lock);
432
433         rbdc = rbd_client_create(ceph_opts, rbd_opts);
434
435         if (IS_ERR(rbdc))
436                 kfree(rbd_opts);
437
438         return rbdc;
439 }
440
441 /*
442  * Destroy ceph client
443  *
444  * Caller must hold rbd_client_list_lock.
445  */
446 static void rbd_client_release(struct kref *kref)
447 {
448         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
449
450         dout("rbd_release_client %p\n", rbdc);
451         spin_lock(&rbd_client_list_lock);
452         list_del(&rbdc->node);
453         spin_unlock(&rbd_client_list_lock);
454
455         ceph_destroy_client(rbdc->client);
456         kfree(rbdc->rbd_opts);
457         kfree(rbdc);
458 }
459
460 /*
461  * Drop reference to ceph client node. If it's not referenced anymore, release
462  * it.
463  */
464 static void rbd_put_client(struct rbd_device *rbd_dev)
465 {
466         kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
467         rbd_dev->rbd_client = NULL;
468 }
469
470 /*
471  * Destroy requests collection
472  */
473 static void rbd_coll_release(struct kref *kref)
474 {
475         struct rbd_req_coll *coll =
476                 container_of(kref, struct rbd_req_coll, kref);
477
478         dout("rbd_coll_release %p\n", coll);
479         kfree(coll);
480 }
481
482 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
483 {
484         return !memcmp(&ondisk->text,
485                         RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT));
486 }
487
488 /*
489  * Create a new header structure, translate header format from the on-disk
490  * header.
491  */
492 static int rbd_header_from_disk(struct rbd_image_header *header,
493                                  struct rbd_image_header_ondisk *ondisk,
494                                  u32 allocated_snaps)
495 {
496         u32 snap_count;
497         size_t size;
498
499         if (!rbd_dev_ondisk_valid(ondisk))
500                 return -ENXIO;
501
502         snap_count = le32_to_cpu(ondisk->snap_count);
503
504         /* Make sure we don't overflow below */
505         size = SIZE_MAX - sizeof (struct ceph_snap_context);
506         if (snap_count > size / sizeof (header->snapc->snaps[0]))
507                 return -EINVAL;
508
509         size = sizeof (struct ceph_snap_context);
510         size += snap_count * sizeof (header->snapc->snaps[0]);
511         header->snapc = kmalloc(size, GFP_KERNEL);
512         if (!header->snapc)
513                 return -ENOMEM;
514
515         if (snap_count) {
516                 header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
517                 BUG_ON(header->snap_names_len > (u64) SIZE_MAX);
518                 header->snap_names = kmalloc(header->snap_names_len,
519                                              GFP_KERNEL);
520                 if (!header->snap_names)
521                         goto err_snapc;
522                 size = snap_count * sizeof (*header->snap_sizes);
523                 header->snap_sizes = kmalloc(size, GFP_KERNEL);
524                 if (!header->snap_sizes)
525                         goto err_names;
526         } else {
527                 WARN_ON(ondisk->snap_names_len);
528                 header->snap_names_len = 0;
529                 header->snap_names = NULL;
530                 header->snap_sizes = NULL;
531         }
532
533         size = sizeof (ondisk->block_name) + 1;
534         header->object_prefix = kmalloc(size, GFP_KERNEL);
535         if (!header->object_prefix)
536                 goto err_sizes;
537         memcpy(header->object_prefix, ondisk->block_name, size - 1);
538         header->object_prefix[size - 1] = '\0';
539
540         header->image_size = le64_to_cpu(ondisk->image_size);
541         header->obj_order = ondisk->options.order;
542         header->crypt_type = ondisk->options.crypt_type;
543         header->comp_type = ondisk->options.comp_type;
544
545         atomic_set(&header->snapc->nref, 1);
546         header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
547         header->snapc->num_snaps = snap_count;
548         header->total_snaps = snap_count;
549
550         if (snap_count && allocated_snaps == snap_count) {
551                 int i;
552
553                 for (i = 0; i < snap_count; i++) {
554                         header->snapc->snaps[i] =
555                                 le64_to_cpu(ondisk->snaps[i].id);
556                         header->snap_sizes[i] =
557                                 le64_to_cpu(ondisk->snaps[i].image_size);
558                 }
559
560                 /* copy snapshot names */
561                 memcpy(header->snap_names, &ondisk->snaps[snap_count],
562                         header->snap_names_len);
563         }
564
565         return 0;
566
567 err_sizes:
568         kfree(header->snap_sizes);
569         header->snap_sizes = NULL;
570 err_names:
571         kfree(header->snap_names);
572         header->snap_names = NULL;
573         header->snap_names_len = 0;
574 err_snapc:
575         kfree(header->snapc);
576         header->snapc = NULL;
577
578         return -ENOMEM;
579 }
580
581 static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
582                         u64 *seq, u64 *size)
583 {
584         int i;
585         char *p = header->snap_names;
586
587         for (i = 0; i < header->total_snaps; i++) {
588                 if (!strcmp(snap_name, p)) {
589
590                         /* Found it.  Pass back its id and/or size */
591
592                         if (seq)
593                                 *seq = header->snapc->snaps[i];
594                         if (size)
595                                 *size = header->snap_sizes[i];
596                         return i;
597                 }
598                 p += strlen(p) + 1;     /* Skip ahead to the next name */
599         }
600         return -ENOENT;
601 }
602
603 static int rbd_header_set_snap(struct rbd_device *rbd_dev, u64 *size)
604 {
605         int ret;
606
607         down_write(&rbd_dev->header_rwsem);
608
609         if (!memcmp(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
610                     sizeof (RBD_SNAP_HEAD_NAME))) {
611                 rbd_dev->snap_id = CEPH_NOSNAP;
612                 rbd_dev->snap_exists = false;
613                 rbd_dev->read_only = 0;
614                 if (size)
615                         *size = rbd_dev->header.image_size;
616         } else {
617                 u64 snap_id = 0;
618
619                 ret = snap_by_name(&rbd_dev->header, rbd_dev->snap_name,
620                                         &snap_id, size);
621                 if (ret < 0)
622                         goto done;
623                 rbd_dev->snap_id = snap_id;
624                 rbd_dev->snap_exists = true;
625                 rbd_dev->read_only = 1;
626         }
627
628         ret = 0;
629 done:
630         up_write(&rbd_dev->header_rwsem);
631         return ret;
632 }
633
634 static void rbd_header_free(struct rbd_image_header *header)
635 {
636         kfree(header->object_prefix);
637         header->object_prefix = NULL;
638         kfree(header->snap_sizes);
639         header->snap_sizes = NULL;
640         kfree(header->snap_names);
641         header->snap_names = NULL;
642         header->snap_names_len = 0;
643         ceph_put_snap_context(header->snapc);
644         header->snapc = NULL;
645 }
646
647 /*
648  * get the actual striped segment name, offset and length
649  */
650 static u64 rbd_get_segment(struct rbd_image_header *header,
651                            const char *object_prefix,
652                            u64 ofs, u64 len,
653                            char *seg_name, u64 *segofs)
654 {
655         u64 seg = ofs >> header->obj_order;
656
657         if (seg_name)
658                 snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
659                          "%s.%012llx", object_prefix, seg);
660
661         ofs = ofs & ((1 << header->obj_order) - 1);
662         len = min_t(u64, len, (1 << header->obj_order) - ofs);
663
664         if (segofs)
665                 *segofs = ofs;
666
667         return len;
668 }
669
670 static int rbd_get_num_segments(struct rbd_image_header *header,
671                                 u64 ofs, u64 len)
672 {
673         u64 start_seg = ofs >> header->obj_order;
674         u64 end_seg = (ofs + len - 1) >> header->obj_order;
675         return end_seg - start_seg + 1;
676 }
677
678 /*
679  * returns the size of an object in the image
680  */
681 static u64 rbd_obj_bytes(struct rbd_image_header *header)
682 {
683         return 1 << header->obj_order;
684 }
685
686 /*
687  * bio helpers
688  */
689
690 static void bio_chain_put(struct bio *chain)
691 {
692         struct bio *tmp;
693
694         while (chain) {
695                 tmp = chain;
696                 chain = chain->bi_next;
697                 bio_put(tmp);
698         }
699 }
700
701 /*
702  * zeros a bio chain, starting at specific offset
703  */
704 static void zero_bio_chain(struct bio *chain, int start_ofs)
705 {
706         struct bio_vec *bv;
707         unsigned long flags;
708         void *buf;
709         int i;
710         int pos = 0;
711
712         while (chain) {
713                 bio_for_each_segment(bv, chain, i) {
714                         if (pos + bv->bv_len > start_ofs) {
715                                 int remainder = max(start_ofs - pos, 0);
716                                 buf = bvec_kmap_irq(bv, &flags);
717                                 memset(buf + remainder, 0,
718                                        bv->bv_len - remainder);
719                                 bvec_kunmap_irq(buf, &flags);
720                         }
721                         pos += bv->bv_len;
722                 }
723
724                 chain = chain->bi_next;
725         }
726 }
727
728 /*
729  * bio_chain_clone - clone a chain of bios up to a certain length.
730  * might return a bio_pair that will need to be released.
731  */
732 static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
733                                    struct bio_pair **bp,
734                                    int len, gfp_t gfpmask)
735 {
736         struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
737         int total = 0;
738
739         if (*bp) {
740                 bio_pair_release(*bp);
741                 *bp = NULL;
742         }
743
744         while (old_chain && (total < len)) {
745                 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
746                 if (!tmp)
747                         goto err_out;
748
749                 if (total + old_chain->bi_size > len) {
750                         struct bio_pair *bp;
751
752                         /*
753                          * this split can only happen with a single paged bio,
754                          * split_bio will BUG_ON if this is not the case
755                          */
756                         dout("bio_chain_clone split! total=%d remaining=%d"
757                              "bi_size=%u\n",
758                              total, len - total, old_chain->bi_size);
759
760                         /* split the bio. We'll release it either in the next
761                            call, or it will have to be released outside */
762                         bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
763                         if (!bp)
764                                 goto err_out;
765
766                         __bio_clone(tmp, &bp->bio1);
767
768                         *next = &bp->bio2;
769                 } else {
770                         __bio_clone(tmp, old_chain);
771                         *next = old_chain->bi_next;
772                 }
773
774                 tmp->bi_bdev = NULL;
775                 gfpmask &= ~__GFP_WAIT;
776                 tmp->bi_next = NULL;
777
778                 if (!new_chain) {
779                         new_chain = tail = tmp;
780                 } else {
781                         tail->bi_next = tmp;
782                         tail = tmp;
783                 }
784                 old_chain = old_chain->bi_next;
785
786                 total += tmp->bi_size;
787         }
788
789         BUG_ON(total < len);
790
791         if (tail)
792                 tail->bi_next = NULL;
793
794         *old = old_chain;
795
796         return new_chain;
797
798 err_out:
799         dout("bio_chain_clone with err\n");
800         bio_chain_put(new_chain);
801         return NULL;
802 }
803
804 /*
805  * helpers for osd request op vectors.
806  */
807 static struct ceph_osd_req_op *rbd_create_rw_ops(int num_ops,
808                                         int opcode, u32 payload_len)
809 {
810         struct ceph_osd_req_op *ops;
811
812         ops = kzalloc(sizeof (*ops) * (num_ops + 1), GFP_NOIO);
813         if (!ops)
814                 return NULL;
815
816         ops[0].op = opcode;
817
818         /*
819          * op extent offset and length will be set later on
820          * in calc_raw_layout()
821          */
822         ops[0].payload_len = payload_len;
823
824         return ops;
825 }
826
827 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
828 {
829         kfree(ops);
830 }
831
832 static void rbd_coll_end_req_index(struct request *rq,
833                                    struct rbd_req_coll *coll,
834                                    int index,
835                                    int ret, u64 len)
836 {
837         struct request_queue *q;
838         int min, max, i;
839
840         dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
841              coll, index, ret, (unsigned long long) len);
842
843         if (!rq)
844                 return;
845
846         if (!coll) {
847                 blk_end_request(rq, ret, len);
848                 return;
849         }
850
851         q = rq->q;
852
853         spin_lock_irq(q->queue_lock);
854         coll->status[index].done = 1;
855         coll->status[index].rc = ret;
856         coll->status[index].bytes = len;
857         max = min = coll->num_done;
858         while (max < coll->total && coll->status[max].done)
859                 max++;
860
861         for (i = min; i<max; i++) {
862                 __blk_end_request(rq, coll->status[i].rc,
863                                   coll->status[i].bytes);
864                 coll->num_done++;
865                 kref_put(&coll->kref, rbd_coll_release);
866         }
867         spin_unlock_irq(q->queue_lock);
868 }
869
870 static void rbd_coll_end_req(struct rbd_request *req,
871                              int ret, u64 len)
872 {
873         rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
874 }
875
876 /*
877  * Send ceph osd request
878  */
879 static int rbd_do_request(struct request *rq,
880                           struct rbd_device *rbd_dev,
881                           struct ceph_snap_context *snapc,
882                           u64 snapid,
883                           const char *object_name, u64 ofs, u64 len,
884                           struct bio *bio,
885                           struct page **pages,
886                           int num_pages,
887                           int flags,
888                           struct ceph_osd_req_op *ops,
889                           struct rbd_req_coll *coll,
890                           int coll_index,
891                           void (*rbd_cb)(struct ceph_osd_request *req,
892                                          struct ceph_msg *msg),
893                           struct ceph_osd_request **linger_req,
894                           u64 *ver)
895 {
896         struct ceph_osd_request *req;
897         struct ceph_file_layout *layout;
898         int ret;
899         u64 bno;
900         struct timespec mtime = CURRENT_TIME;
901         struct rbd_request *req_data;
902         struct ceph_osd_request_head *reqhead;
903         struct ceph_osd_client *osdc;
904
905         req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
906         if (!req_data) {
907                 if (coll)
908                         rbd_coll_end_req_index(rq, coll, coll_index,
909                                                -ENOMEM, len);
910                 return -ENOMEM;
911         }
912
913         if (coll) {
914                 req_data->coll = coll;
915                 req_data->coll_index = coll_index;
916         }
917
918         dout("rbd_do_request object_name=%s ofs=%llu len=%llu\n", object_name,
919                 (unsigned long long) ofs, (unsigned long long) len);
920
921         osdc = &rbd_dev->rbd_client->client->osdc;
922         req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
923                                         false, GFP_NOIO, pages, bio);
924         if (!req) {
925                 ret = -ENOMEM;
926                 goto done_pages;
927         }
928
929         req->r_callback = rbd_cb;
930
931         req_data->rq = rq;
932         req_data->bio = bio;
933         req_data->pages = pages;
934         req_data->len = len;
935
936         req->r_priv = req_data;
937
938         reqhead = req->r_request->front.iov_base;
939         reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
940
941         strncpy(req->r_oid, object_name, sizeof(req->r_oid));
942         req->r_oid_len = strlen(req->r_oid);
943
944         layout = &req->r_file_layout;
945         memset(layout, 0, sizeof(*layout));
946         layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
947         layout->fl_stripe_count = cpu_to_le32(1);
948         layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
949         layout->fl_pg_pool = cpu_to_le32(rbd_dev->pool_id);
950         ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
951                                 req, ops);
952
953         ceph_osdc_build_request(req, ofs, &len,
954                                 ops,
955                                 snapc,
956                                 &mtime,
957                                 req->r_oid, req->r_oid_len);
958
959         if (linger_req) {
960                 ceph_osdc_set_request_linger(osdc, req);
961                 *linger_req = req;
962         }
963
964         ret = ceph_osdc_start_request(osdc, req, false);
965         if (ret < 0)
966                 goto done_err;
967
968         if (!rbd_cb) {
969                 ret = ceph_osdc_wait_request(osdc, req);
970                 if (ver)
971                         *ver = le64_to_cpu(req->r_reassert_version.version);
972                 dout("reassert_ver=%llu\n",
973                         (unsigned long long)
974                                 le64_to_cpu(req->r_reassert_version.version));
975                 ceph_osdc_put_request(req);
976         }
977         return ret;
978
979 done_err:
980         bio_chain_put(req_data->bio);
981         ceph_osdc_put_request(req);
982 done_pages:
983         rbd_coll_end_req(req_data, ret, len);
984         kfree(req_data);
985         return ret;
986 }
987
988 /*
989  * Ceph osd op callback
990  */
991 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
992 {
993         struct rbd_request *req_data = req->r_priv;
994         struct ceph_osd_reply_head *replyhead;
995         struct ceph_osd_op *op;
996         __s32 rc;
997         u64 bytes;
998         int read_op;
999
1000         /* parse reply */
1001         replyhead = msg->front.iov_base;
1002         WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
1003         op = (void *)(replyhead + 1);
1004         rc = le32_to_cpu(replyhead->result);
1005         bytes = le64_to_cpu(op->extent.length);
1006         read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
1007
1008         dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
1009                 (unsigned long long) bytes, read_op, (int) rc);
1010
1011         if (rc == -ENOENT && read_op) {
1012                 zero_bio_chain(req_data->bio, 0);
1013                 rc = 0;
1014         } else if (rc == 0 && read_op && bytes < req_data->len) {
1015                 zero_bio_chain(req_data->bio, bytes);
1016                 bytes = req_data->len;
1017         }
1018
1019         rbd_coll_end_req(req_data, rc, bytes);
1020
1021         if (req_data->bio)
1022                 bio_chain_put(req_data->bio);
1023
1024         ceph_osdc_put_request(req);
1025         kfree(req_data);
1026 }
1027
1028 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1029 {
1030         ceph_osdc_put_request(req);
1031 }
1032
1033 /*
1034  * Do a synchronous ceph osd operation
1035  */
1036 static int rbd_req_sync_op(struct rbd_device *rbd_dev,
1037                            struct ceph_snap_context *snapc,
1038                            u64 snapid,
1039                            int flags,
1040                            struct ceph_osd_req_op *ops,
1041                            const char *object_name,
1042                            u64 ofs, u64 len,
1043                            char *buf,
1044                            struct ceph_osd_request **linger_req,
1045                            u64 *ver)
1046 {
1047         int ret;
1048         struct page **pages;
1049         int num_pages;
1050
1051         BUG_ON(ops == NULL);
1052
1053         num_pages = calc_pages_for(ofs , len);
1054         pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1055         if (IS_ERR(pages))
1056                 return PTR_ERR(pages);
1057
1058         ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
1059                           object_name, ofs, len, NULL,
1060                           pages, num_pages,
1061                           flags,
1062                           ops,
1063                           NULL, 0,
1064                           NULL,
1065                           linger_req, ver);
1066         if (ret < 0)
1067                 goto done;
1068
1069         if ((flags & CEPH_OSD_FLAG_READ) && buf)
1070                 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1071
1072 done:
1073         ceph_release_page_vector(pages, num_pages);
1074         return ret;
1075 }
1076
1077 /*
1078  * Do an asynchronous ceph osd operation
1079  */
1080 static int rbd_do_op(struct request *rq,
1081                      struct rbd_device *rbd_dev,
1082                      struct ceph_snap_context *snapc,
1083                      u64 snapid,
1084                      int opcode, int flags,
1085                      u64 ofs, u64 len,
1086                      struct bio *bio,
1087                      struct rbd_req_coll *coll,
1088                      int coll_index)
1089 {
1090         char *seg_name;
1091         u64 seg_ofs;
1092         u64 seg_len;
1093         int ret;
1094         struct ceph_osd_req_op *ops;
1095         u32 payload_len;
1096
1097         seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1098         if (!seg_name)
1099                 return -ENOMEM;
1100
1101         seg_len = rbd_get_segment(&rbd_dev->header,
1102                                   rbd_dev->header.object_prefix,
1103                                   ofs, len,
1104                                   seg_name, &seg_ofs);
1105
1106         payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1107
1108         ret = -ENOMEM;
1109         ops = rbd_create_rw_ops(1, opcode, payload_len);
1110         if (!ops)
1111                 goto done;
1112
1113         /* we've taken care of segment sizes earlier when we
1114            cloned the bios. We should never have a segment
1115            truncated at this point */
1116         BUG_ON(seg_len < len);
1117
1118         ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1119                              seg_name, seg_ofs, seg_len,
1120                              bio,
1121                              NULL, 0,
1122                              flags,
1123                              ops,
1124                              coll, coll_index,
1125                              rbd_req_cb, 0, NULL);
1126
1127         rbd_destroy_ops(ops);
1128 done:
1129         kfree(seg_name);
1130         return ret;
1131 }
1132
1133 /*
1134  * Request async osd write
1135  */
1136 static int rbd_req_write(struct request *rq,
1137                          struct rbd_device *rbd_dev,
1138                          struct ceph_snap_context *snapc,
1139                          u64 ofs, u64 len,
1140                          struct bio *bio,
1141                          struct rbd_req_coll *coll,
1142                          int coll_index)
1143 {
1144         return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1145                          CEPH_OSD_OP_WRITE,
1146                          CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1147                          ofs, len, bio, coll, coll_index);
1148 }
1149
1150 /*
1151  * Request async osd read
1152  */
1153 static int rbd_req_read(struct request *rq,
1154                          struct rbd_device *rbd_dev,
1155                          u64 snapid,
1156                          u64 ofs, u64 len,
1157                          struct bio *bio,
1158                          struct rbd_req_coll *coll,
1159                          int coll_index)
1160 {
1161         return rbd_do_op(rq, rbd_dev, NULL,
1162                          snapid,
1163                          CEPH_OSD_OP_READ,
1164                          CEPH_OSD_FLAG_READ,
1165                          ofs, len, bio, coll, coll_index);
1166 }
1167
1168 /*
1169  * Request sync osd read
1170  */
1171 static int rbd_req_sync_read(struct rbd_device *rbd_dev,
1172                           u64 snapid,
1173                           const char *object_name,
1174                           u64 ofs, u64 len,
1175                           char *buf,
1176                           u64 *ver)
1177 {
1178         struct ceph_osd_req_op *ops;
1179         int ret;
1180
1181         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_READ, 0);
1182         if (!ops)
1183                 return -ENOMEM;
1184
1185         ret = rbd_req_sync_op(rbd_dev, NULL,
1186                                snapid,
1187                                CEPH_OSD_FLAG_READ,
1188                                ops, object_name, ofs, len, buf, NULL, ver);
1189         rbd_destroy_ops(ops);
1190
1191         return ret;
1192 }
1193
1194 /*
1195  * Request sync osd watch
1196  */
1197 static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
1198                                    u64 ver,
1199                                    u64 notify_id)
1200 {
1201         struct ceph_osd_req_op *ops;
1202         int ret;
1203
1204         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1205         if (!ops)
1206                 return -ENOMEM;
1207
1208         ops[0].watch.ver = cpu_to_le64(ver);
1209         ops[0].watch.cookie = notify_id;
1210         ops[0].watch.flag = 0;
1211
1212         ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
1213                           rbd_dev->header_name, 0, 0, NULL,
1214                           NULL, 0,
1215                           CEPH_OSD_FLAG_READ,
1216                           ops,
1217                           NULL, 0,
1218                           rbd_simple_req_cb, 0, NULL);
1219
1220         rbd_destroy_ops(ops);
1221         return ret;
1222 }
1223
1224 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1225 {
1226         struct rbd_device *rbd_dev = (struct rbd_device *)data;
1227         u64 hver;
1228         int rc;
1229
1230         if (!rbd_dev)
1231                 return;
1232
1233         dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1234                 rbd_dev->header_name, (unsigned long long) notify_id,
1235                 (unsigned int) opcode);
1236         rc = rbd_refresh_header(rbd_dev, &hver);
1237         if (rc)
1238                 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1239                            " update snaps: %d\n", rbd_dev->major, rc);
1240
1241         rbd_req_sync_notify_ack(rbd_dev, hver, notify_id);
1242 }
1243
1244 /*
1245  * Request sync osd watch
1246  */
1247 static int rbd_req_sync_watch(struct rbd_device *rbd_dev)
1248 {
1249         struct ceph_osd_req_op *ops;
1250         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1251         int ret;
1252
1253         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1254         if (!ops)
1255                 return -ENOMEM;
1256
1257         ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1258                                      (void *)rbd_dev, &rbd_dev->watch_event);
1259         if (ret < 0)
1260                 goto fail;
1261
1262         ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version);
1263         ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1264         ops[0].watch.flag = 1;
1265
1266         ret = rbd_req_sync_op(rbd_dev, NULL,
1267                               CEPH_NOSNAP,
1268                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1269                               ops,
1270                               rbd_dev->header_name,
1271                               0, 0, NULL,
1272                               &rbd_dev->watch_request, NULL);
1273
1274         if (ret < 0)
1275                 goto fail_event;
1276
1277         rbd_destroy_ops(ops);
1278         return 0;
1279
1280 fail_event:
1281         ceph_osdc_cancel_event(rbd_dev->watch_event);
1282         rbd_dev->watch_event = NULL;
1283 fail:
1284         rbd_destroy_ops(ops);
1285         return ret;
1286 }
1287
1288 /*
1289  * Request sync osd unwatch
1290  */
1291 static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev)
1292 {
1293         struct ceph_osd_req_op *ops;
1294         int ret;
1295
1296         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1297         if (!ops)
1298                 return -ENOMEM;
1299
1300         ops[0].watch.ver = 0;
1301         ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1302         ops[0].watch.flag = 0;
1303
1304         ret = rbd_req_sync_op(rbd_dev, NULL,
1305                               CEPH_NOSNAP,
1306                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1307                               ops,
1308                               rbd_dev->header_name,
1309                               0, 0, NULL, NULL, NULL);
1310
1311
1312         rbd_destroy_ops(ops);
1313         ceph_osdc_cancel_event(rbd_dev->watch_event);
1314         rbd_dev->watch_event = NULL;
1315         return ret;
1316 }
1317
1318 struct rbd_notify_info {
1319         struct rbd_device *rbd_dev;
1320 };
1321
1322 static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1323 {
1324         struct rbd_device *rbd_dev = (struct rbd_device *)data;
1325         if (!rbd_dev)
1326                 return;
1327
1328         dout("rbd_notify_cb %s notify_id=%llu opcode=%u\n",
1329                         rbd_dev->header_name, (unsigned long long) notify_id,
1330                         (unsigned int) opcode);
1331 }
1332
1333 /*
1334  * Request sync osd notify
1335  */
1336 static int rbd_req_sync_notify(struct rbd_device *rbd_dev)
1337 {
1338         struct ceph_osd_req_op *ops;
1339         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1340         struct ceph_osd_event *event;
1341         struct rbd_notify_info info;
1342         int payload_len = sizeof(u32) + sizeof(u32);
1343         int ret;
1344
1345         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY, payload_len);
1346         if (!ops)
1347                 return -ENOMEM;
1348
1349         info.rbd_dev = rbd_dev;
1350
1351         ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1352                                      (void *)&info, &event);
1353         if (ret < 0)
1354                 goto fail;
1355
1356         ops[0].watch.ver = 1;
1357         ops[0].watch.flag = 1;
1358         ops[0].watch.cookie = event->cookie;
1359         ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1360         ops[0].watch.timeout = 12;
1361
1362         ret = rbd_req_sync_op(rbd_dev, NULL,
1363                                CEPH_NOSNAP,
1364                                CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1365                                ops,
1366                                rbd_dev->header_name,
1367                                0, 0, NULL, NULL, NULL);
1368         if (ret < 0)
1369                 goto fail_event;
1370
1371         ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1372         dout("ceph_osdc_wait_event returned %d\n", ret);
1373         rbd_destroy_ops(ops);
1374         return 0;
1375
1376 fail_event:
1377         ceph_osdc_cancel_event(event);
1378 fail:
1379         rbd_destroy_ops(ops);
1380         return ret;
1381 }
1382
1383 /*
1384  * Request sync osd read
1385  */
1386 static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
1387                              const char *object_name,
1388                              const char *class_name,
1389                              const char *method_name,
1390                              const char *data,
1391                              int len,
1392                              u64 *ver)
1393 {
1394         struct ceph_osd_req_op *ops;
1395         int class_name_len = strlen(class_name);
1396         int method_name_len = strlen(method_name);
1397         int ret;
1398
1399         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL,
1400                                     class_name_len + method_name_len + len);
1401         if (!ops)
1402                 return -ENOMEM;
1403
1404         ops[0].cls.class_name = class_name;
1405         ops[0].cls.class_len = (__u8) class_name_len;
1406         ops[0].cls.method_name = method_name;
1407         ops[0].cls.method_len = (__u8) method_name_len;
1408         ops[0].cls.argc = 0;
1409         ops[0].cls.indata = data;
1410         ops[0].cls.indata_len = len;
1411
1412         ret = rbd_req_sync_op(rbd_dev, NULL,
1413                                CEPH_NOSNAP,
1414                                CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1415                                ops,
1416                                object_name, 0, 0, NULL, NULL, ver);
1417
1418         rbd_destroy_ops(ops);
1419
1420         dout("cls_exec returned %d\n", ret);
1421         return ret;
1422 }
1423
1424 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1425 {
1426         struct rbd_req_coll *coll =
1427                         kzalloc(sizeof(struct rbd_req_coll) +
1428                                 sizeof(struct rbd_req_status) * num_reqs,
1429                                 GFP_ATOMIC);
1430
1431         if (!coll)
1432                 return NULL;
1433         coll->total = num_reqs;
1434         kref_init(&coll->kref);
1435         return coll;
1436 }
1437
1438 /*
1439  * block device queue callback
1440  */
1441 static void rbd_rq_fn(struct request_queue *q)
1442 {
1443         struct rbd_device *rbd_dev = q->queuedata;
1444         struct request *rq;
1445         struct bio_pair *bp = NULL;
1446
1447         while ((rq = blk_fetch_request(q))) {
1448                 struct bio *bio;
1449                 struct bio *rq_bio, *next_bio = NULL;
1450                 bool do_write;
1451                 unsigned int size;
1452                 u64 op_size = 0;
1453                 u64 ofs;
1454                 int num_segs, cur_seg = 0;
1455                 struct rbd_req_coll *coll;
1456                 struct ceph_snap_context *snapc;
1457
1458                 /* peek at request from block layer */
1459                 if (!rq)
1460                         break;
1461
1462                 dout("fetched request\n");
1463
1464                 /* filter out block requests we don't understand */
1465                 if ((rq->cmd_type != REQ_TYPE_FS)) {
1466                         __blk_end_request_all(rq, 0);
1467                         continue;
1468                 }
1469
1470                 /* deduce our operation (read, write) */
1471                 do_write = (rq_data_dir(rq) == WRITE);
1472
1473                 size = blk_rq_bytes(rq);
1474                 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1475                 rq_bio = rq->bio;
1476                 if (do_write && rbd_dev->read_only) {
1477                         __blk_end_request_all(rq, -EROFS);
1478                         continue;
1479                 }
1480
1481                 spin_unlock_irq(q->queue_lock);
1482
1483                 down_read(&rbd_dev->header_rwsem);
1484
1485                 if (rbd_dev->snap_id != CEPH_NOSNAP && !rbd_dev->snap_exists) {
1486                         up_read(&rbd_dev->header_rwsem);
1487                         dout("request for non-existent snapshot");
1488                         spin_lock_irq(q->queue_lock);
1489                         __blk_end_request_all(rq, -ENXIO);
1490                         continue;
1491                 }
1492
1493                 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1494
1495                 up_read(&rbd_dev->header_rwsem);
1496
1497                 dout("%s 0x%x bytes at 0x%llx\n",
1498                      do_write ? "write" : "read",
1499                      size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
1500
1501                 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1502                 coll = rbd_alloc_coll(num_segs);
1503                 if (!coll) {
1504                         spin_lock_irq(q->queue_lock);
1505                         __blk_end_request_all(rq, -ENOMEM);
1506                         ceph_put_snap_context(snapc);
1507                         continue;
1508                 }
1509
1510                 do {
1511                         /* a bio clone to be passed down to OSD req */
1512                         dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
1513                         op_size = rbd_get_segment(&rbd_dev->header,
1514                                                   rbd_dev->header.object_prefix,
1515                                                   ofs, size,
1516                                                   NULL, NULL);
1517                         kref_get(&coll->kref);
1518                         bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1519                                               op_size, GFP_ATOMIC);
1520                         if (!bio) {
1521                                 rbd_coll_end_req_index(rq, coll, cur_seg,
1522                                                        -ENOMEM, op_size);
1523                                 goto next_seg;
1524                         }
1525
1526
1527                         /* init OSD command: write or read */
1528                         if (do_write)
1529                                 rbd_req_write(rq, rbd_dev,
1530                                               snapc,
1531                                               ofs,
1532                                               op_size, bio,
1533                                               coll, cur_seg);
1534                         else
1535                                 rbd_req_read(rq, rbd_dev,
1536                                              rbd_dev->snap_id,
1537                                              ofs,
1538                                              op_size, bio,
1539                                              coll, cur_seg);
1540
1541 next_seg:
1542                         size -= op_size;
1543                         ofs += op_size;
1544
1545                         cur_seg++;
1546                         rq_bio = next_bio;
1547                 } while (size > 0);
1548                 kref_put(&coll->kref, rbd_coll_release);
1549
1550                 if (bp)
1551                         bio_pair_release(bp);
1552                 spin_lock_irq(q->queue_lock);
1553
1554                 ceph_put_snap_context(snapc);
1555         }
1556 }
1557
1558 /*
1559  * a queue callback. Makes sure that we don't create a bio that spans across
1560  * multiple osd objects. One exception would be with a single page bios,
1561  * which we handle later at bio_chain_clone
1562  */
1563 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1564                           struct bio_vec *bvec)
1565 {
1566         struct rbd_device *rbd_dev = q->queuedata;
1567         unsigned int chunk_sectors;
1568         sector_t sector;
1569         unsigned int bio_sectors;
1570         int max;
1571
1572         chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1573         sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1574         bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
1575
1576         max =  (chunk_sectors - ((sector & (chunk_sectors - 1))
1577                                  + bio_sectors)) << SECTOR_SHIFT;
1578         if (max < 0)
1579                 max = 0; /* bio_add cannot handle a negative return */
1580         if (max <= bvec->bv_len && bio_sectors == 0)
1581                 return bvec->bv_len;
1582         return max;
1583 }
1584
1585 static void rbd_free_disk(struct rbd_device *rbd_dev)
1586 {
1587         struct gendisk *disk = rbd_dev->disk;
1588
1589         if (!disk)
1590                 return;
1591
1592         rbd_header_free(&rbd_dev->header);
1593
1594         if (disk->flags & GENHD_FL_UP)
1595                 del_gendisk(disk);
1596         if (disk->queue)
1597                 blk_cleanup_queue(disk->queue);
1598         put_disk(disk);
1599 }
1600
1601 /*
1602  * reload the ondisk the header 
1603  */
1604 static int rbd_read_header(struct rbd_device *rbd_dev,
1605                            struct rbd_image_header *header)
1606 {
1607         ssize_t rc;
1608         struct rbd_image_header_ondisk *dh;
1609         u32 snap_count = 0;
1610         u64 ver;
1611         size_t len;
1612
1613         /*
1614          * First reads the fixed-size header to determine the number
1615          * of snapshots, then re-reads it, along with all snapshot
1616          * records as well as their stored names.
1617          */
1618         len = sizeof (*dh);
1619         while (1) {
1620                 dh = kmalloc(len, GFP_KERNEL);
1621                 if (!dh)
1622                         return -ENOMEM;
1623
1624                 rc = rbd_req_sync_read(rbd_dev,
1625                                        CEPH_NOSNAP,
1626                                        rbd_dev->header_name,
1627                                        0, len,
1628                                        (char *)dh, &ver);
1629                 if (rc < 0)
1630                         goto out_dh;
1631
1632                 rc = rbd_header_from_disk(header, dh, snap_count);
1633                 if (rc < 0) {
1634                         if (rc == -ENXIO)
1635                                 pr_warning("unrecognized header format"
1636                                            " for image %s\n",
1637                                            rbd_dev->image_name);
1638                         goto out_dh;
1639                 }
1640
1641                 if (snap_count == header->total_snaps)
1642                         break;
1643
1644                 snap_count = header->total_snaps;
1645                 len = sizeof (*dh) +
1646                         snap_count * sizeof(struct rbd_image_snap_ondisk) +
1647                         header->snap_names_len;
1648
1649                 rbd_header_free(header);
1650                 kfree(dh);
1651         }
1652         header->obj_version = ver;
1653
1654 out_dh:
1655         kfree(dh);
1656         return rc;
1657 }
1658
1659 /*
1660  * create a snapshot
1661  */
1662 static int rbd_header_add_snap(struct rbd_device *rbd_dev,
1663                                const char *snap_name,
1664                                gfp_t gfp_flags)
1665 {
1666         int name_len = strlen(snap_name);
1667         u64 new_snapid;
1668         int ret;
1669         void *data, *p, *e;
1670         struct ceph_mon_client *monc;
1671
1672         /* we should create a snapshot only if we're pointing at the head */
1673         if (rbd_dev->snap_id != CEPH_NOSNAP)
1674                 return -EINVAL;
1675
1676         monc = &rbd_dev->rbd_client->client->monc;
1677         ret = ceph_monc_create_snapid(monc, rbd_dev->pool_id, &new_snapid);
1678         dout("created snapid=%llu\n", (unsigned long long) new_snapid);
1679         if (ret < 0)
1680                 return ret;
1681
1682         data = kmalloc(name_len + 16, gfp_flags);
1683         if (!data)
1684                 return -ENOMEM;
1685
1686         p = data;
1687         e = data + name_len + 16;
1688
1689         ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
1690         ceph_encode_64_safe(&p, e, new_snapid, bad);
1691
1692         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
1693                                 "rbd", "snap_add",
1694                                 data, p - data, NULL);
1695
1696         kfree(data);
1697
1698         return ret < 0 ? ret : 0;
1699 bad:
1700         return -ERANGE;
1701 }
1702
1703 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1704 {
1705         struct rbd_snap *snap;
1706         struct rbd_snap *next;
1707
1708         list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
1709                 __rbd_remove_snap_dev(snap);
1710 }
1711
1712 /*
1713  * only read the first part of the ondisk header, without the snaps info
1714  */
1715 static int __rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
1716 {
1717         int ret;
1718         struct rbd_image_header h;
1719
1720         ret = rbd_read_header(rbd_dev, &h);
1721         if (ret < 0)
1722                 return ret;
1723
1724         down_write(&rbd_dev->header_rwsem);
1725
1726         /* resized? */
1727         if (rbd_dev->snap_id == CEPH_NOSNAP) {
1728                 sector_t size = (sector_t) h.image_size / SECTOR_SIZE;
1729
1730                 dout("setting size to %llu sectors", (unsigned long long) size);
1731                 set_capacity(rbd_dev->disk, size);
1732         }
1733
1734         /* rbd_dev->header.object_prefix shouldn't change */
1735         kfree(rbd_dev->header.snap_sizes);
1736         kfree(rbd_dev->header.snap_names);
1737         /* osd requests may still refer to snapc */
1738         ceph_put_snap_context(rbd_dev->header.snapc);
1739
1740         if (hver)
1741                 *hver = h.obj_version;
1742         rbd_dev->header.obj_version = h.obj_version;
1743         rbd_dev->header.image_size = h.image_size;
1744         rbd_dev->header.total_snaps = h.total_snaps;
1745         rbd_dev->header.snapc = h.snapc;
1746         rbd_dev->header.snap_names = h.snap_names;
1747         rbd_dev->header.snap_names_len = h.snap_names_len;
1748         rbd_dev->header.snap_sizes = h.snap_sizes;
1749         /* Free the extra copy of the object prefix */
1750         WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1751         kfree(h.object_prefix);
1752
1753         ret = __rbd_init_snaps_header(rbd_dev);
1754
1755         up_write(&rbd_dev->header_rwsem);
1756
1757         return ret;
1758 }
1759
1760 static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
1761 {
1762         int ret;
1763
1764         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1765         ret = __rbd_refresh_header(rbd_dev, hver);
1766         mutex_unlock(&ctl_mutex);
1767
1768         return ret;
1769 }
1770
1771 static int rbd_init_disk(struct rbd_device *rbd_dev)
1772 {
1773         struct gendisk *disk;
1774         struct request_queue *q;
1775         int rc;
1776         u64 segment_size;
1777         u64 total_size = 0;
1778
1779         /* contact OSD, request size info about the object being mapped */
1780         rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1781         if (rc)
1782                 return rc;
1783
1784         /* no need to lock here, as rbd_dev is not registered yet */
1785         rc = __rbd_init_snaps_header(rbd_dev);
1786         if (rc)
1787                 return rc;
1788
1789         rc = rbd_header_set_snap(rbd_dev, &total_size);
1790         if (rc)
1791                 return rc;
1792
1793         /* create gendisk info */
1794         rc = -ENOMEM;
1795         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1796         if (!disk)
1797                 goto out;
1798
1799         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1800                  rbd_dev->dev_id);
1801         disk->major = rbd_dev->major;
1802         disk->first_minor = 0;
1803         disk->fops = &rbd_bd_ops;
1804         disk->private_data = rbd_dev;
1805
1806         /* init rq */
1807         rc = -ENOMEM;
1808         q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1809         if (!q)
1810                 goto out_disk;
1811
1812         /* We use the default size, but let's be explicit about it. */
1813         blk_queue_physical_block_size(q, SECTOR_SIZE);
1814
1815         /* set io sizes to object size */
1816         segment_size = rbd_obj_bytes(&rbd_dev->header);
1817         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1818         blk_queue_max_segment_size(q, segment_size);
1819         blk_queue_io_min(q, segment_size);
1820         blk_queue_io_opt(q, segment_size);
1821
1822         blk_queue_merge_bvec(q, rbd_merge_bvec);
1823         disk->queue = q;
1824
1825         q->queuedata = rbd_dev;
1826
1827         rbd_dev->disk = disk;
1828         rbd_dev->q = q;
1829
1830         /* finally, announce the disk to the world */
1831         set_capacity(disk, total_size / SECTOR_SIZE);
1832         add_disk(disk);
1833
1834         pr_info("%s: added with size 0x%llx\n",
1835                 disk->disk_name, (unsigned long long)total_size);
1836         return 0;
1837
1838 out_disk:
1839         put_disk(disk);
1840 out:
1841         return rc;
1842 }
1843
1844 /*
1845   sysfs
1846 */
1847
1848 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1849 {
1850         return container_of(dev, struct rbd_device, dev);
1851 }
1852
1853 static ssize_t rbd_size_show(struct device *dev,
1854                              struct device_attribute *attr, char *buf)
1855 {
1856         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1857         sector_t size;
1858
1859         down_read(&rbd_dev->header_rwsem);
1860         size = get_capacity(rbd_dev->disk);
1861         up_read(&rbd_dev->header_rwsem);
1862
1863         return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
1864 }
1865
1866 static ssize_t rbd_major_show(struct device *dev,
1867                               struct device_attribute *attr, char *buf)
1868 {
1869         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1870
1871         return sprintf(buf, "%d\n", rbd_dev->major);
1872 }
1873
1874 static ssize_t rbd_client_id_show(struct device *dev,
1875                                   struct device_attribute *attr, char *buf)
1876 {
1877         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1878
1879         return sprintf(buf, "client%lld\n",
1880                         ceph_client_id(rbd_dev->rbd_client->client));
1881 }
1882
1883 static ssize_t rbd_pool_show(struct device *dev,
1884                              struct device_attribute *attr, char *buf)
1885 {
1886         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1887
1888         return sprintf(buf, "%s\n", rbd_dev->pool_name);
1889 }
1890
1891 static ssize_t rbd_pool_id_show(struct device *dev,
1892                              struct device_attribute *attr, char *buf)
1893 {
1894         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1895
1896         return sprintf(buf, "%d\n", rbd_dev->pool_id);
1897 }
1898
1899 static ssize_t rbd_name_show(struct device *dev,
1900                              struct device_attribute *attr, char *buf)
1901 {
1902         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1903
1904         return sprintf(buf, "%s\n", rbd_dev->image_name);
1905 }
1906
1907 static ssize_t rbd_snap_show(struct device *dev,
1908                              struct device_attribute *attr,
1909                              char *buf)
1910 {
1911         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1912
1913         return sprintf(buf, "%s\n", rbd_dev->snap_name);
1914 }
1915
1916 static ssize_t rbd_image_refresh(struct device *dev,
1917                                  struct device_attribute *attr,
1918                                  const char *buf,
1919                                  size_t size)
1920 {
1921         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1922         int ret;
1923
1924         ret = rbd_refresh_header(rbd_dev, NULL);
1925
1926         return ret < 0 ? ret : size;
1927 }
1928
1929 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1930 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1931 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1932 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1933 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
1934 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1935 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1936 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1937 static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
1938
1939 static struct attribute *rbd_attrs[] = {
1940         &dev_attr_size.attr,
1941         &dev_attr_major.attr,
1942         &dev_attr_client_id.attr,
1943         &dev_attr_pool.attr,
1944         &dev_attr_pool_id.attr,
1945         &dev_attr_name.attr,
1946         &dev_attr_current_snap.attr,
1947         &dev_attr_refresh.attr,
1948         &dev_attr_create_snap.attr,
1949         NULL
1950 };
1951
1952 static struct attribute_group rbd_attr_group = {
1953         .attrs = rbd_attrs,
1954 };
1955
1956 static const struct attribute_group *rbd_attr_groups[] = {
1957         &rbd_attr_group,
1958         NULL
1959 };
1960
1961 static void rbd_sysfs_dev_release(struct device *dev)
1962 {
1963 }
1964
1965 static struct device_type rbd_device_type = {
1966         .name           = "rbd",
1967         .groups         = rbd_attr_groups,
1968         .release        = rbd_sysfs_dev_release,
1969 };
1970
1971
1972 /*
1973   sysfs - snapshots
1974 */
1975
1976 static ssize_t rbd_snap_size_show(struct device *dev,
1977                                   struct device_attribute *attr,
1978                                   char *buf)
1979 {
1980         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1981
1982         return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
1983 }
1984
1985 static ssize_t rbd_snap_id_show(struct device *dev,
1986                                 struct device_attribute *attr,
1987                                 char *buf)
1988 {
1989         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1990
1991         return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
1992 }
1993
1994 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
1995 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
1996
1997 static struct attribute *rbd_snap_attrs[] = {
1998         &dev_attr_snap_size.attr,
1999         &dev_attr_snap_id.attr,
2000         NULL,
2001 };
2002
2003 static struct attribute_group rbd_snap_attr_group = {
2004         .attrs = rbd_snap_attrs,
2005 };
2006
2007 static void rbd_snap_dev_release(struct device *dev)
2008 {
2009         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2010         kfree(snap->name);
2011         kfree(snap);
2012 }
2013
2014 static const struct attribute_group *rbd_snap_attr_groups[] = {
2015         &rbd_snap_attr_group,
2016         NULL
2017 };
2018
2019 static struct device_type rbd_snap_device_type = {
2020         .groups         = rbd_snap_attr_groups,
2021         .release        = rbd_snap_dev_release,
2022 };
2023
2024 static void __rbd_remove_snap_dev(struct rbd_snap *snap)
2025 {
2026         list_del(&snap->node);
2027         device_unregister(&snap->dev);
2028 }
2029
2030 static int rbd_register_snap_dev(struct rbd_snap *snap,
2031                                   struct device *parent)
2032 {
2033         struct device *dev = &snap->dev;
2034         int ret;
2035
2036         dev->type = &rbd_snap_device_type;
2037         dev->parent = parent;
2038         dev->release = rbd_snap_dev_release;
2039         dev_set_name(dev, "snap_%s", snap->name);
2040         ret = device_register(dev);
2041
2042         return ret;
2043 }
2044
2045 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2046                                               int i, const char *name)
2047 {
2048         struct rbd_snap *snap;
2049         int ret;
2050
2051         snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2052         if (!snap)
2053                 return ERR_PTR(-ENOMEM);
2054
2055         ret = -ENOMEM;
2056         snap->name = kstrdup(name, GFP_KERNEL);
2057         if (!snap->name)
2058                 goto err;
2059
2060         snap->size = rbd_dev->header.snap_sizes[i];
2061         snap->id = rbd_dev->header.snapc->snaps[i];
2062         if (device_is_registered(&rbd_dev->dev)) {
2063                 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2064                 if (ret < 0)
2065                         goto err;
2066         }
2067
2068         return snap;
2069
2070 err:
2071         kfree(snap->name);
2072         kfree(snap);
2073
2074         return ERR_PTR(ret);
2075 }
2076
2077 /*
2078  * Scan the rbd device's current snapshot list and compare it to the
2079  * newly-received snapshot context.  Remove any existing snapshots
2080  * not present in the new snapshot context.  Add a new snapshot for
2081  * any snaphots in the snapshot context not in the current list.
2082  * And verify there are no changes to snapshots we already know
2083  * about.
2084  *
2085  * Assumes the snapshots in the snapshot context are sorted by
2086  * snapshot id, highest id first.  (Snapshots in the rbd_dev's list
2087  * are also maintained in that order.)
2088  */
2089 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
2090 {
2091         struct ceph_snap_context *snapc = rbd_dev->header.snapc;
2092         const u32 snap_count = snapc->num_snaps;
2093         char *snap_name = rbd_dev->header.snap_names;
2094         struct list_head *head = &rbd_dev->snaps;
2095         struct list_head *links = head->next;
2096         u32 index = 0;
2097
2098         while (index < snap_count || links != head) {
2099                 u64 snap_id;
2100                 struct rbd_snap *snap;
2101
2102                 snap_id = index < snap_count ? snapc->snaps[index]
2103                                              : CEPH_NOSNAP;
2104                 snap = links != head ? list_entry(links, struct rbd_snap, node)
2105                                      : NULL;
2106                 BUG_ON(snap && snap->id == CEPH_NOSNAP);
2107
2108                 if (snap_id == CEPH_NOSNAP || (snap && snap->id > snap_id)) {
2109                         struct list_head *next = links->next;
2110
2111                         /* Existing snapshot not in the new snap context */
2112
2113                         if (rbd_dev->snap_id == snap->id)
2114                                 rbd_dev->snap_exists = false;
2115                         __rbd_remove_snap_dev(snap);
2116
2117                         /* Done with this list entry; advance */
2118
2119                         links = next;
2120                         continue;
2121                 }
2122
2123                 if (!snap || (snap_id != CEPH_NOSNAP && snap->id < snap_id)) {
2124                         struct rbd_snap *new_snap;
2125
2126                         /* We haven't seen this snapshot before */
2127
2128                         new_snap = __rbd_add_snap_dev(rbd_dev, index,
2129                                                         snap_name);
2130                         if (IS_ERR(new_snap))
2131                                 return PTR_ERR(new_snap);
2132
2133                         /* New goes before existing, or at end of list */
2134
2135                         if (snap)
2136                                 list_add_tail(&new_snap->node, &snap->node);
2137                         else
2138                                 list_add(&new_snap->node, head);
2139                 } else {
2140                         /* Already have this one */
2141
2142                         BUG_ON(snap->size != rbd_dev->header.snap_sizes[index]);
2143                         BUG_ON(strcmp(snap->name, snap_name));
2144
2145                         /* Done with this list entry; advance */
2146
2147                         links = links->next;
2148                 }
2149
2150                 /* Advance to the next entry in the snapshot context */
2151
2152                 index++;
2153                 snap_name += strlen(snap_name) + 1;
2154         }
2155
2156         return 0;
2157 }
2158
2159 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2160 {
2161         int ret;
2162         struct device *dev;
2163         struct rbd_snap *snap;
2164
2165         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2166         dev = &rbd_dev->dev;
2167
2168         dev->bus = &rbd_bus_type;
2169         dev->type = &rbd_device_type;
2170         dev->parent = &rbd_root_dev;
2171         dev->release = rbd_dev_release;
2172         dev_set_name(dev, "%d", rbd_dev->dev_id);
2173         ret = device_register(dev);
2174         if (ret < 0)
2175                 goto out;
2176
2177         list_for_each_entry(snap, &rbd_dev->snaps, node) {
2178                 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2179                 if (ret < 0)
2180                         break;
2181         }
2182 out:
2183         mutex_unlock(&ctl_mutex);
2184         return ret;
2185 }
2186
2187 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2188 {
2189         device_unregister(&rbd_dev->dev);
2190 }
2191
2192 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2193 {
2194         int ret, rc;
2195
2196         do {
2197                 ret = rbd_req_sync_watch(rbd_dev);
2198                 if (ret == -ERANGE) {
2199                         rc = rbd_refresh_header(rbd_dev, NULL);
2200                         if (rc < 0)
2201                                 return rc;
2202                 }
2203         } while (ret == -ERANGE);
2204
2205         return ret;
2206 }
2207
2208 static atomic64_t rbd_id_max = ATOMIC64_INIT(0);
2209
2210 /*
2211  * Get a unique rbd identifier for the given new rbd_dev, and add
2212  * the rbd_dev to the global list.  The minimum rbd id is 1.
2213  */
2214 static void rbd_id_get(struct rbd_device *rbd_dev)
2215 {
2216         rbd_dev->dev_id = atomic64_inc_return(&rbd_id_max);
2217
2218         spin_lock(&rbd_dev_list_lock);
2219         list_add_tail(&rbd_dev->node, &rbd_dev_list);
2220         spin_unlock(&rbd_dev_list_lock);
2221 }
2222
2223 /*
2224  * Remove an rbd_dev from the global list, and record that its
2225  * identifier is no longer in use.
2226  */
2227 static void rbd_id_put(struct rbd_device *rbd_dev)
2228 {
2229         struct list_head *tmp;
2230         int rbd_id = rbd_dev->dev_id;
2231         int max_id;
2232
2233         BUG_ON(rbd_id < 1);
2234
2235         spin_lock(&rbd_dev_list_lock);
2236         list_del_init(&rbd_dev->node);
2237
2238         /*
2239          * If the id being "put" is not the current maximum, there
2240          * is nothing special we need to do.
2241          */
2242         if (rbd_id != atomic64_read(&rbd_id_max)) {
2243                 spin_unlock(&rbd_dev_list_lock);
2244                 return;
2245         }
2246
2247         /*
2248          * We need to update the current maximum id.  Search the
2249          * list to find out what it is.  We're more likely to find
2250          * the maximum at the end, so search the list backward.
2251          */
2252         max_id = 0;
2253         list_for_each_prev(tmp, &rbd_dev_list) {
2254                 struct rbd_device *rbd_dev;
2255
2256                 rbd_dev = list_entry(tmp, struct rbd_device, node);
2257                 if (rbd_id > max_id)
2258                         max_id = rbd_id;
2259         }
2260         spin_unlock(&rbd_dev_list_lock);
2261
2262         /*
2263          * The max id could have been updated by rbd_id_get(), in
2264          * which case it now accurately reflects the new maximum.
2265          * Be careful not to overwrite the maximum value in that
2266          * case.
2267          */
2268         atomic64_cmpxchg(&rbd_id_max, rbd_id, max_id);
2269 }
2270
2271 /*
2272  * Skips over white space at *buf, and updates *buf to point to the
2273  * first found non-space character (if any). Returns the length of
2274  * the token (string of non-white space characters) found.  Note
2275  * that *buf must be terminated with '\0'.
2276  */
2277 static inline size_t next_token(const char **buf)
2278 {
2279         /*
2280         * These are the characters that produce nonzero for
2281         * isspace() in the "C" and "POSIX" locales.
2282         */
2283         const char *spaces = " \f\n\r\t\v";
2284
2285         *buf += strspn(*buf, spaces);   /* Find start of token */
2286
2287         return strcspn(*buf, spaces);   /* Return token length */
2288 }
2289
2290 /*
2291  * Finds the next token in *buf, and if the provided token buffer is
2292  * big enough, copies the found token into it.  The result, if
2293  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
2294  * must be terminated with '\0' on entry.
2295  *
2296  * Returns the length of the token found (not including the '\0').
2297  * Return value will be 0 if no token is found, and it will be >=
2298  * token_size if the token would not fit.
2299  *
2300  * The *buf pointer will be updated to point beyond the end of the
2301  * found token.  Note that this occurs even if the token buffer is
2302  * too small to hold it.
2303  */
2304 static inline size_t copy_token(const char **buf,
2305                                 char *token,
2306                                 size_t token_size)
2307 {
2308         size_t len;
2309
2310         len = next_token(buf);
2311         if (len < token_size) {
2312                 memcpy(token, *buf, len);
2313                 *(token + len) = '\0';
2314         }
2315         *buf += len;
2316
2317         return len;
2318 }
2319
2320 /*
2321  * Finds the next token in *buf, dynamically allocates a buffer big
2322  * enough to hold a copy of it, and copies the token into the new
2323  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
2324  * that a duplicate buffer is created even for a zero-length token.
2325  *
2326  * Returns a pointer to the newly-allocated duplicate, or a null
2327  * pointer if memory for the duplicate was not available.  If
2328  * the lenp argument is a non-null pointer, the length of the token
2329  * (not including the '\0') is returned in *lenp.
2330  *
2331  * If successful, the *buf pointer will be updated to point beyond
2332  * the end of the found token.
2333  *
2334  * Note: uses GFP_KERNEL for allocation.
2335  */
2336 static inline char *dup_token(const char **buf, size_t *lenp)
2337 {
2338         char *dup;
2339         size_t len;
2340
2341         len = next_token(buf);
2342         dup = kmalloc(len + 1, GFP_KERNEL);
2343         if (!dup)
2344                 return NULL;
2345
2346         memcpy(dup, *buf, len);
2347         *(dup + len) = '\0';
2348         *buf += len;
2349
2350         if (lenp)
2351                 *lenp = len;
2352
2353         return dup;
2354 }
2355
2356 /*
2357  * This fills in the pool_name, image_name, image_name_len, snap_name,
2358  * rbd_dev, rbd_md_name, and name fields of the given rbd_dev, based
2359  * on the list of monitor addresses and other options provided via
2360  * /sys/bus/rbd/add.
2361  *
2362  * Note: rbd_dev is assumed to have been initially zero-filled.
2363  */
2364 static int rbd_add_parse_args(struct rbd_device *rbd_dev,
2365                               const char *buf,
2366                               const char **mon_addrs,
2367                               size_t *mon_addrs_size,
2368                               char *options,
2369                              size_t options_size)
2370 {
2371         size_t len;
2372         int ret;
2373
2374         /* The first four tokens are required */
2375
2376         len = next_token(&buf);
2377         if (!len)
2378                 return -EINVAL;
2379         *mon_addrs_size = len + 1;
2380         *mon_addrs = buf;
2381
2382         buf += len;
2383
2384         len = copy_token(&buf, options, options_size);
2385         if (!len || len >= options_size)
2386                 return -EINVAL;
2387
2388         ret = -ENOMEM;
2389         rbd_dev->pool_name = dup_token(&buf, NULL);
2390         if (!rbd_dev->pool_name)
2391                 goto out_err;
2392
2393         rbd_dev->image_name = dup_token(&buf, &rbd_dev->image_name_len);
2394         if (!rbd_dev->image_name)
2395                 goto out_err;
2396
2397         /* Create the name of the header object */
2398
2399         rbd_dev->header_name = kmalloc(rbd_dev->image_name_len
2400                                                 + sizeof (RBD_SUFFIX),
2401                                         GFP_KERNEL);
2402         if (!rbd_dev->header_name)
2403                 goto out_err;
2404         sprintf(rbd_dev->header_name, "%s%s", rbd_dev->image_name, RBD_SUFFIX);
2405
2406         /*
2407          * The snapshot name is optional.  If none is is supplied,
2408          * we use the default value.
2409          */
2410         rbd_dev->snap_name = dup_token(&buf, &len);
2411         if (!rbd_dev->snap_name)
2412                 goto out_err;
2413         if (!len) {
2414                 /* Replace the empty name with the default */
2415                 kfree(rbd_dev->snap_name);
2416                 rbd_dev->snap_name
2417                         = kmalloc(sizeof (RBD_SNAP_HEAD_NAME), GFP_KERNEL);
2418                 if (!rbd_dev->snap_name)
2419                         goto out_err;
2420
2421                 memcpy(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
2422                         sizeof (RBD_SNAP_HEAD_NAME));
2423         }
2424
2425         return 0;
2426
2427 out_err:
2428         kfree(rbd_dev->header_name);
2429         rbd_dev->header_name = NULL;
2430         kfree(rbd_dev->image_name);
2431         rbd_dev->image_name = NULL;
2432         rbd_dev->image_name_len = 0;
2433         kfree(rbd_dev->pool_name);
2434         rbd_dev->pool_name = NULL;
2435
2436         return ret;
2437 }
2438
2439 static ssize_t rbd_add(struct bus_type *bus,
2440                        const char *buf,
2441                        size_t count)
2442 {
2443         char *options;
2444         struct rbd_device *rbd_dev = NULL;
2445         const char *mon_addrs = NULL;
2446         size_t mon_addrs_size = 0;
2447         struct ceph_osd_client *osdc;
2448         int rc = -ENOMEM;
2449
2450         if (!try_module_get(THIS_MODULE))
2451                 return -ENODEV;
2452
2453         options = kmalloc(count, GFP_KERNEL);
2454         if (!options)
2455                 goto err_nomem;
2456         rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2457         if (!rbd_dev)
2458                 goto err_nomem;
2459
2460         /* static rbd_device initialization */
2461         spin_lock_init(&rbd_dev->lock);
2462         INIT_LIST_HEAD(&rbd_dev->node);
2463         INIT_LIST_HEAD(&rbd_dev->snaps);
2464         init_rwsem(&rbd_dev->header_rwsem);
2465
2466         /* generate unique id: find highest unique id, add one */
2467         rbd_id_get(rbd_dev);
2468
2469         /* Fill in the device name, now that we have its id. */
2470         BUILD_BUG_ON(DEV_NAME_LEN
2471                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
2472         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
2473
2474         /* parse add command */
2475         rc = rbd_add_parse_args(rbd_dev, buf, &mon_addrs, &mon_addrs_size,
2476                                 options, count);
2477         if (rc)
2478                 goto err_put_id;
2479
2480         rbd_dev->rbd_client = rbd_get_client(mon_addrs, mon_addrs_size - 1,
2481                                                 options);
2482         if (IS_ERR(rbd_dev->rbd_client)) {
2483                 rc = PTR_ERR(rbd_dev->rbd_client);
2484                 rbd_dev->rbd_client = NULL;
2485                 goto err_put_id;
2486         }
2487
2488         /* pick the pool */
2489         osdc = &rbd_dev->rbd_client->client->osdc;
2490         rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2491         if (rc < 0)
2492                 goto err_out_client;
2493         rbd_dev->pool_id = rc;
2494
2495         /* register our block device */
2496         rc = register_blkdev(0, rbd_dev->name);
2497         if (rc < 0)
2498                 goto err_out_client;
2499         rbd_dev->major = rc;
2500
2501         rc = rbd_bus_add_dev(rbd_dev);
2502         if (rc)
2503                 goto err_out_blkdev;
2504
2505         /*
2506          * At this point cleanup in the event of an error is the job
2507          * of the sysfs code (initiated by rbd_bus_del_dev()).
2508          *
2509          * Set up and announce blkdev mapping.
2510          */
2511         rc = rbd_init_disk(rbd_dev);
2512         if (rc)
2513                 goto err_out_bus;
2514
2515         rc = rbd_init_watch_dev(rbd_dev);
2516         if (rc)
2517                 goto err_out_bus;
2518
2519         return count;
2520
2521 err_out_bus:
2522         /* this will also clean up rest of rbd_dev stuff */
2523
2524         rbd_bus_del_dev(rbd_dev);
2525         kfree(options);
2526         return rc;
2527
2528 err_out_blkdev:
2529         unregister_blkdev(rbd_dev->major, rbd_dev->name);
2530 err_out_client:
2531         rbd_put_client(rbd_dev);
2532 err_put_id:
2533         if (rbd_dev->pool_name) {
2534                 kfree(rbd_dev->snap_name);
2535                 kfree(rbd_dev->header_name);
2536                 kfree(rbd_dev->image_name);
2537                 kfree(rbd_dev->pool_name);
2538         }
2539         rbd_id_put(rbd_dev);
2540 err_nomem:
2541         kfree(rbd_dev);
2542         kfree(options);
2543
2544         dout("Error adding device %s\n", buf);
2545         module_put(THIS_MODULE);
2546
2547         return (ssize_t) rc;
2548 }
2549
2550 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
2551 {
2552         struct list_head *tmp;
2553         struct rbd_device *rbd_dev;
2554
2555         spin_lock(&rbd_dev_list_lock);
2556         list_for_each(tmp, &rbd_dev_list) {
2557                 rbd_dev = list_entry(tmp, struct rbd_device, node);
2558                 if (rbd_dev->dev_id == dev_id) {
2559                         spin_unlock(&rbd_dev_list_lock);
2560                         return rbd_dev;
2561                 }
2562         }
2563         spin_unlock(&rbd_dev_list_lock);
2564         return NULL;
2565 }
2566
2567 static void rbd_dev_release(struct device *dev)
2568 {
2569         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2570
2571         if (rbd_dev->watch_request) {
2572                 struct ceph_client *client = rbd_dev->rbd_client->client;
2573
2574                 ceph_osdc_unregister_linger_request(&client->osdc,
2575                                                     rbd_dev->watch_request);
2576         }
2577         if (rbd_dev->watch_event)
2578                 rbd_req_sync_unwatch(rbd_dev);
2579
2580         rbd_put_client(rbd_dev);
2581
2582         /* clean up and free blkdev */
2583         rbd_free_disk(rbd_dev);
2584         unregister_blkdev(rbd_dev->major, rbd_dev->name);
2585
2586         /* done with the id, and with the rbd_dev */
2587         kfree(rbd_dev->snap_name);
2588         kfree(rbd_dev->header_name);
2589         kfree(rbd_dev->pool_name);
2590         kfree(rbd_dev->image_name);
2591         rbd_id_put(rbd_dev);
2592         kfree(rbd_dev);
2593
2594         /* release module ref */
2595         module_put(THIS_MODULE);
2596 }
2597
2598 static ssize_t rbd_remove(struct bus_type *bus,
2599                           const char *buf,
2600                           size_t count)
2601 {
2602         struct rbd_device *rbd_dev = NULL;
2603         int target_id, rc;
2604         unsigned long ul;
2605         int ret = count;
2606
2607         rc = strict_strtoul(buf, 10, &ul);
2608         if (rc)
2609                 return rc;
2610
2611         /* convert to int; abort if we lost anything in the conversion */
2612         target_id = (int) ul;
2613         if (target_id != ul)
2614                 return -EINVAL;
2615
2616         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2617
2618         rbd_dev = __rbd_get_dev(target_id);
2619         if (!rbd_dev) {
2620                 ret = -ENOENT;
2621                 goto done;
2622         }
2623
2624         __rbd_remove_all_snaps(rbd_dev);
2625         rbd_bus_del_dev(rbd_dev);
2626
2627 done:
2628         mutex_unlock(&ctl_mutex);
2629         return ret;
2630 }
2631
2632 static ssize_t rbd_snap_add(struct device *dev,
2633                             struct device_attribute *attr,
2634                             const char *buf,
2635                             size_t count)
2636 {
2637         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2638         int ret;
2639         char *name = kmalloc(count + 1, GFP_KERNEL);
2640         if (!name)
2641                 return -ENOMEM;
2642
2643         snprintf(name, count, "%s", buf);
2644
2645         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2646
2647         ret = rbd_header_add_snap(rbd_dev,
2648                                   name, GFP_KERNEL);
2649         if (ret < 0)
2650                 goto err_unlock;
2651
2652         ret = __rbd_refresh_header(rbd_dev, NULL);
2653         if (ret < 0)
2654                 goto err_unlock;
2655
2656         /* shouldn't hold ctl_mutex when notifying.. notify might
2657            trigger a watch callback that would need to get that mutex */
2658         mutex_unlock(&ctl_mutex);
2659
2660         /* make a best effort, don't error if failed */
2661         rbd_req_sync_notify(rbd_dev);
2662
2663         ret = count;
2664         kfree(name);
2665         return ret;
2666
2667 err_unlock:
2668         mutex_unlock(&ctl_mutex);
2669         kfree(name);
2670         return ret;
2671 }
2672
2673 /*
2674  * create control files in sysfs
2675  * /sys/bus/rbd/...
2676  */
2677 static int rbd_sysfs_init(void)
2678 {
2679         int ret;
2680
2681         ret = device_register(&rbd_root_dev);
2682         if (ret < 0)
2683                 return ret;
2684
2685         ret = bus_register(&rbd_bus_type);
2686         if (ret < 0)
2687                 device_unregister(&rbd_root_dev);
2688
2689         return ret;
2690 }
2691
2692 static void rbd_sysfs_cleanup(void)
2693 {
2694         bus_unregister(&rbd_bus_type);
2695         device_unregister(&rbd_root_dev);
2696 }
2697
2698 int __init rbd_init(void)
2699 {
2700         int rc;
2701
2702         rc = rbd_sysfs_init();
2703         if (rc)
2704                 return rc;
2705         pr_info("loaded " RBD_DRV_NAME_LONG "\n");
2706         return 0;
2707 }
2708
2709 void __exit rbd_exit(void)
2710 {
2711         rbd_sysfs_cleanup();
2712 }
2713
2714 module_init(rbd_init);
2715 module_exit(rbd_exit);
2716
2717 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2718 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2719 MODULE_DESCRIPTION("rados block device");
2720
2721 /* following authorship retained from original osdblk.c */
2722 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2723
2724 MODULE_LICENSE("GPL");