rbd: option symbol renames
[cascardo/linux.git] / drivers / block / rbd.c
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3
4
5    based on drivers/block/osdblk.c:
6
7    Copyright 2009 Red Hat, Inc.
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24    For usage instructions, please refer to:
25
26                  Documentation/ABI/testing/sysfs-bus-rbd
27
28  */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 /*
45  * The basic unit of block I/O is a sector.  It is interpreted in a
46  * number of contexts in Linux (blk, bio, genhd), but the default is
47  * universally 512 bytes.  These symbols are just slightly more
48  * meaningful than the bare numbers they represent.
49  */
50 #define SECTOR_SHIFT    9
51 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
52
53 #define RBD_DRV_NAME "rbd"
54 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
55
56 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
57
58 #define RBD_MAX_SNAP_NAME_LEN   32
59 #define RBD_MAX_OPT_LEN         1024
60
61 #define RBD_SNAP_HEAD_NAME      "-"
62
63 /*
64  * An RBD device name will be "rbd#", where the "rbd" comes from
65  * RBD_DRV_NAME above, and # is a unique integer identifier.
66  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
67  * enough to hold all possible device names.
68  */
69 #define DEV_NAME_LEN            32
70 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
71
72 #define RBD_NOTIFY_TIMEOUT_DEFAULT 10
73
74 /*
75  * block device image metadata (in-memory version)
76  */
77 struct rbd_image_header {
78         u64 image_size;
79         char *object_prefix;
80         __u8 obj_order;
81         __u8 crypt_type;
82         __u8 comp_type;
83         struct ceph_snap_context *snapc;
84         size_t snap_names_len;
85         u64 snap_seq;
86         u32 total_snaps;
87
88         char *snap_names;
89         u64 *snap_sizes;
90
91         u64 obj_version;
92 };
93
94 struct rbd_options {
95         int     notify_timeout;
96 };
97
98 /*
99  * an instance of the client.  multiple devices may share an rbd client.
100  */
101 struct rbd_client {
102         struct ceph_client      *client;
103         struct rbd_options      *rbd_opts;
104         struct kref             kref;
105         struct list_head        node;
106 };
107
108 /*
109  * a request completion status
110  */
111 struct rbd_req_status {
112         int done;
113         int rc;
114         u64 bytes;
115 };
116
117 /*
118  * a collection of requests
119  */
120 struct rbd_req_coll {
121         int                     total;
122         int                     num_done;
123         struct kref             kref;
124         struct rbd_req_status   status[0];
125 };
126
127 /*
128  * a single io request
129  */
130 struct rbd_request {
131         struct request          *rq;            /* blk layer request */
132         struct bio              *bio;           /* cloned bio */
133         struct page             **pages;        /* list of used pages */
134         u64                     len;
135         int                     coll_index;
136         struct rbd_req_coll     *coll;
137 };
138
139 struct rbd_snap {
140         struct  device          dev;
141         const char              *name;
142         u64                     size;
143         struct list_head        node;
144         u64                     id;
145 };
146
147 /*
148  * a single device
149  */
150 struct rbd_device {
151         int                     id;             /* blkdev unique id */
152
153         int                     major;          /* blkdev assigned major */
154         struct gendisk          *disk;          /* blkdev's gendisk and rq */
155         struct request_queue    *q;
156
157         struct rbd_client       *rbd_client;
158
159         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
160
161         spinlock_t              lock;           /* queue lock */
162
163         struct rbd_image_header header;
164         char                    *image_name;
165         size_t                  image_name_len;
166         char                    *header_name;
167         char                    *pool_name;
168         int                     pool_id;
169
170         struct ceph_osd_event   *watch_event;
171         struct ceph_osd_request *watch_request;
172
173         /* protects updating the header */
174         struct rw_semaphore     header_rwsem;
175         char                    *snap_name;
176         u64                     snap_id;        /* current snapshot id */
177         int read_only;
178
179         struct list_head        node;
180
181         /* list of snapshots */
182         struct list_head        snaps;
183
184         /* sysfs related */
185         struct device           dev;
186 };
187
188 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
189
190 static LIST_HEAD(rbd_dev_list);    /* devices */
191 static DEFINE_SPINLOCK(rbd_dev_list_lock);
192
193 static LIST_HEAD(rbd_client_list);              /* clients */
194 static DEFINE_SPINLOCK(rbd_client_list_lock);
195
196 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
197 static void rbd_dev_release(struct device *dev);
198 static ssize_t rbd_snap_add(struct device *dev,
199                             struct device_attribute *attr,
200                             const char *buf,
201                             size_t count);
202 static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
203                                   struct rbd_snap *snap);
204
205 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
206                        size_t count);
207 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
208                           size_t count);
209
210 static struct bus_attribute rbd_bus_attrs[] = {
211         __ATTR(add, S_IWUSR, NULL, rbd_add),
212         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
213         __ATTR_NULL
214 };
215
216 static struct bus_type rbd_bus_type = {
217         .name           = "rbd",
218         .bus_attrs      = rbd_bus_attrs,
219 };
220
221 static void rbd_root_dev_release(struct device *dev)
222 {
223 }
224
225 static struct device rbd_root_dev = {
226         .init_name =    "rbd",
227         .release =      rbd_root_dev_release,
228 };
229
230
231 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
232 {
233         return get_device(&rbd_dev->dev);
234 }
235
236 static void rbd_put_dev(struct rbd_device *rbd_dev)
237 {
238         put_device(&rbd_dev->dev);
239 }
240
241 static int __rbd_refresh_header(struct rbd_device *rbd_dev);
242
243 static int rbd_open(struct block_device *bdev, fmode_t mode)
244 {
245         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
246
247         rbd_get_dev(rbd_dev);
248
249         set_device_ro(bdev, rbd_dev->read_only);
250
251         if ((mode & FMODE_WRITE) && rbd_dev->read_only)
252                 return -EROFS;
253
254         return 0;
255 }
256
257 static int rbd_release(struct gendisk *disk, fmode_t mode)
258 {
259         struct rbd_device *rbd_dev = disk->private_data;
260
261         rbd_put_dev(rbd_dev);
262
263         return 0;
264 }
265
266 static const struct block_device_operations rbd_bd_ops = {
267         .owner                  = THIS_MODULE,
268         .open                   = rbd_open,
269         .release                = rbd_release,
270 };
271
272 /*
273  * Initialize an rbd client instance.
274  * We own *ceph_opts.
275  */
276 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts,
277                                             struct rbd_options *rbd_opts)
278 {
279         struct rbd_client *rbdc;
280         int ret = -ENOMEM;
281
282         dout("rbd_client_create\n");
283         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
284         if (!rbdc)
285                 goto out_opt;
286
287         kref_init(&rbdc->kref);
288         INIT_LIST_HEAD(&rbdc->node);
289
290         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
291
292         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
293         if (IS_ERR(rbdc->client))
294                 goto out_mutex;
295         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
296
297         ret = ceph_open_session(rbdc->client);
298         if (ret < 0)
299                 goto out_err;
300
301         rbdc->rbd_opts = rbd_opts;
302
303         spin_lock(&rbd_client_list_lock);
304         list_add_tail(&rbdc->node, &rbd_client_list);
305         spin_unlock(&rbd_client_list_lock);
306
307         mutex_unlock(&ctl_mutex);
308
309         dout("rbd_client_create created %p\n", rbdc);
310         return rbdc;
311
312 out_err:
313         ceph_destroy_client(rbdc->client);
314 out_mutex:
315         mutex_unlock(&ctl_mutex);
316         kfree(rbdc);
317 out_opt:
318         if (ceph_opts)
319                 ceph_destroy_options(ceph_opts);
320         return ERR_PTR(ret);
321 }
322
323 /*
324  * Find a ceph client with specific addr and configuration.
325  */
326 static struct rbd_client *__rbd_client_find(struct ceph_options *ceph_opts)
327 {
328         struct rbd_client *client_node;
329
330         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
331                 return NULL;
332
333         list_for_each_entry(client_node, &rbd_client_list, node)
334                 if (!ceph_compare_options(ceph_opts, client_node->client))
335                         return client_node;
336         return NULL;
337 }
338
339 /*
340  * mount options
341  */
342 enum {
343         Opt_notify_timeout,
344         Opt_last_int,
345         /* int args above */
346         Opt_last_string,
347         /* string args above */
348 };
349
350 static match_table_t rbd_opts_tokens = {
351         {Opt_notify_timeout, "notify_timeout=%d"},
352         /* int args above */
353         /* string args above */
354         {-1, NULL}
355 };
356
357 static int parse_rbd_opts_token(char *c, void *private)
358 {
359         struct rbd_options *rbd_opts = private;
360         substring_t argstr[MAX_OPT_ARGS];
361         int token, intval, ret;
362
363         token = match_token(c, rbd_opts_tokens, argstr);
364         if (token < 0)
365                 return -EINVAL;
366
367         if (token < Opt_last_int) {
368                 ret = match_int(&argstr[0], &intval);
369                 if (ret < 0) {
370                         pr_err("bad mount option arg (not int) "
371                                "at '%s'\n", c);
372                         return ret;
373                 }
374                 dout("got int token %d val %d\n", token, intval);
375         } else if (token > Opt_last_int && token < Opt_last_string) {
376                 dout("got string token %d val %s\n", token,
377                      argstr[0].from);
378         } else {
379                 dout("got token %d\n", token);
380         }
381
382         switch (token) {
383         case Opt_notify_timeout:
384                 rbd_opts->notify_timeout = intval;
385                 break;
386         default:
387                 BUG_ON(token);
388         }
389         return 0;
390 }
391
392 /*
393  * Get a ceph client with specific addr and configuration, if one does
394  * not exist create it.
395  */
396 static struct rbd_client *rbd_get_client(const char *mon_addr,
397                                          size_t mon_addr_len,
398                                          char *options)
399 {
400         struct rbd_client *rbdc;
401         struct ceph_options *ceph_opts;
402         struct rbd_options *rbd_opts;
403
404         rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
405         if (!rbd_opts)
406                 return ERR_PTR(-ENOMEM);
407
408         rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
409
410         ceph_opts = ceph_parse_options(options, mon_addr,
411                                         mon_addr + mon_addr_len,
412                                         parse_rbd_opts_token, rbd_opts);
413         if (IS_ERR(ceph_opts)) {
414                 kfree(rbd_opts);
415                 return ERR_CAST(ceph_opts);
416         }
417
418         spin_lock(&rbd_client_list_lock);
419         rbdc = __rbd_client_find(ceph_opts);
420         if (rbdc) {
421                 /* using an existing client */
422                 kref_get(&rbdc->kref);
423                 spin_unlock(&rbd_client_list_lock);
424
425                 ceph_destroy_options(ceph_opts);
426                 kfree(rbd_opts);
427
428                 return rbdc;
429         }
430         spin_unlock(&rbd_client_list_lock);
431
432         rbdc = rbd_client_create(ceph_opts, rbd_opts);
433
434         if (IS_ERR(rbdc))
435                 kfree(rbd_opts);
436
437         return rbdc;
438 }
439
440 /*
441  * Destroy ceph client
442  *
443  * Caller must hold rbd_client_list_lock.
444  */
445 static void rbd_client_release(struct kref *kref)
446 {
447         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
448
449         dout("rbd_release_client %p\n", rbdc);
450         spin_lock(&rbd_client_list_lock);
451         list_del(&rbdc->node);
452         spin_unlock(&rbd_client_list_lock);
453
454         ceph_destroy_client(rbdc->client);
455         kfree(rbdc->rbd_opts);
456         kfree(rbdc);
457 }
458
459 /*
460  * Drop reference to ceph client node. If it's not referenced anymore, release
461  * it.
462  */
463 static void rbd_put_client(struct rbd_device *rbd_dev)
464 {
465         kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
466         rbd_dev->rbd_client = NULL;
467 }
468
469 /*
470  * Destroy requests collection
471  */
472 static void rbd_coll_release(struct kref *kref)
473 {
474         struct rbd_req_coll *coll =
475                 container_of(kref, struct rbd_req_coll, kref);
476
477         dout("rbd_coll_release %p\n", coll);
478         kfree(coll);
479 }
480
481 /*
482  * Create a new header structure, translate header format from the on-disk
483  * header.
484  */
485 static int rbd_header_from_disk(struct rbd_image_header *header,
486                                  struct rbd_image_header_ondisk *ondisk,
487                                  u32 allocated_snaps,
488                                  gfp_t gfp_flags)
489 {
490         u32 i, snap_count;
491
492         if (memcmp(ondisk, RBD_HEADER_TEXT, sizeof(RBD_HEADER_TEXT)))
493                 return -ENXIO;
494
495         snap_count = le32_to_cpu(ondisk->snap_count);
496         if (snap_count > (UINT_MAX - sizeof(struct ceph_snap_context))
497                          / sizeof (*ondisk))
498                 return -EINVAL;
499         header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
500                                 snap_count * sizeof(u64),
501                                 gfp_flags);
502         if (!header->snapc)
503                 return -ENOMEM;
504
505         header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
506         if (snap_count) {
507                 header->snap_names = kmalloc(header->snap_names_len,
508                                              gfp_flags);
509                 if (!header->snap_names)
510                         goto err_snapc;
511                 header->snap_sizes = kmalloc(snap_count * sizeof(u64),
512                                              gfp_flags);
513                 if (!header->snap_sizes)
514                         goto err_names;
515         } else {
516                 header->snap_names = NULL;
517                 header->snap_sizes = NULL;
518         }
519
520         header->object_prefix = kmalloc(sizeof (ondisk->block_name) + 1,
521                                         gfp_flags);
522         if (!header->object_prefix)
523                 goto err_sizes;
524
525         memcpy(header->object_prefix, ondisk->block_name,
526                sizeof(ondisk->block_name));
527         header->object_prefix[sizeof (ondisk->block_name)] = '\0';
528
529         header->image_size = le64_to_cpu(ondisk->image_size);
530         header->obj_order = ondisk->options.order;
531         header->crypt_type = ondisk->options.crypt_type;
532         header->comp_type = ondisk->options.comp_type;
533
534         atomic_set(&header->snapc->nref, 1);
535         header->snap_seq = le64_to_cpu(ondisk->snap_seq);
536         header->snapc->num_snaps = snap_count;
537         header->total_snaps = snap_count;
538
539         if (snap_count && allocated_snaps == snap_count) {
540                 for (i = 0; i < snap_count; i++) {
541                         header->snapc->snaps[i] =
542                                 le64_to_cpu(ondisk->snaps[i].id);
543                         header->snap_sizes[i] =
544                                 le64_to_cpu(ondisk->snaps[i].image_size);
545                 }
546
547                 /* copy snapshot names */
548                 memcpy(header->snap_names, &ondisk->snaps[i],
549                         header->snap_names_len);
550         }
551
552         return 0;
553
554 err_sizes:
555         kfree(header->snap_sizes);
556 err_names:
557         kfree(header->snap_names);
558 err_snapc:
559         kfree(header->snapc);
560         return -ENOMEM;
561 }
562
563 static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
564                         u64 *seq, u64 *size)
565 {
566         int i;
567         char *p = header->snap_names;
568
569         for (i = 0; i < header->total_snaps; i++) {
570                 if (!strcmp(snap_name, p)) {
571
572                         /* Found it.  Pass back its id and/or size */
573
574                         if (seq)
575                                 *seq = header->snapc->snaps[i];
576                         if (size)
577                                 *size = header->snap_sizes[i];
578                         return i;
579                 }
580                 p += strlen(p) + 1;     /* Skip ahead to the next name */
581         }
582         return -ENOENT;
583 }
584
585 static int rbd_header_set_snap(struct rbd_device *rbd_dev, u64 *size)
586 {
587         struct rbd_image_header *header = &rbd_dev->header;
588         struct ceph_snap_context *snapc = header->snapc;
589         int ret = -ENOENT;
590
591         down_write(&rbd_dev->header_rwsem);
592
593         if (!memcmp(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
594                     sizeof (RBD_SNAP_HEAD_NAME))) {
595                 if (header->total_snaps)
596                         snapc->seq = header->snap_seq;
597                 else
598                         snapc->seq = 0;
599                 rbd_dev->snap_id = CEPH_NOSNAP;
600                 rbd_dev->read_only = 0;
601                 if (size)
602                         *size = header->image_size;
603         } else {
604                 ret = snap_by_name(header, rbd_dev->snap_name,
605                                         &snapc->seq, size);
606                 if (ret < 0)
607                         goto done;
608                 rbd_dev->snap_id = snapc->seq;
609                 rbd_dev->read_only = 1;
610         }
611
612         ret = 0;
613 done:
614         up_write(&rbd_dev->header_rwsem);
615         return ret;
616 }
617
618 static void rbd_header_free(struct rbd_image_header *header)
619 {
620         kfree(header->object_prefix);
621         kfree(header->snap_sizes);
622         kfree(header->snap_names);
623         kfree(header->snapc);
624 }
625
626 /*
627  * get the actual striped segment name, offset and length
628  */
629 static u64 rbd_get_segment(struct rbd_image_header *header,
630                            const char *object_prefix,
631                            u64 ofs, u64 len,
632                            char *seg_name, u64 *segofs)
633 {
634         u64 seg = ofs >> header->obj_order;
635
636         if (seg_name)
637                 snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
638                          "%s.%012llx", object_prefix, seg);
639
640         ofs = ofs & ((1 << header->obj_order) - 1);
641         len = min_t(u64, len, (1 << header->obj_order) - ofs);
642
643         if (segofs)
644                 *segofs = ofs;
645
646         return len;
647 }
648
649 static int rbd_get_num_segments(struct rbd_image_header *header,
650                                 u64 ofs, u64 len)
651 {
652         u64 start_seg = ofs >> header->obj_order;
653         u64 end_seg = (ofs + len - 1) >> header->obj_order;
654         return end_seg - start_seg + 1;
655 }
656
657 /*
658  * returns the size of an object in the image
659  */
660 static u64 rbd_obj_bytes(struct rbd_image_header *header)
661 {
662         return 1 << header->obj_order;
663 }
664
665 /*
666  * bio helpers
667  */
668
669 static void bio_chain_put(struct bio *chain)
670 {
671         struct bio *tmp;
672
673         while (chain) {
674                 tmp = chain;
675                 chain = chain->bi_next;
676                 bio_put(tmp);
677         }
678 }
679
680 /*
681  * zeros a bio chain, starting at specific offset
682  */
683 static void zero_bio_chain(struct bio *chain, int start_ofs)
684 {
685         struct bio_vec *bv;
686         unsigned long flags;
687         void *buf;
688         int i;
689         int pos = 0;
690
691         while (chain) {
692                 bio_for_each_segment(bv, chain, i) {
693                         if (pos + bv->bv_len > start_ofs) {
694                                 int remainder = max(start_ofs - pos, 0);
695                                 buf = bvec_kmap_irq(bv, &flags);
696                                 memset(buf + remainder, 0,
697                                        bv->bv_len - remainder);
698                                 bvec_kunmap_irq(buf, &flags);
699                         }
700                         pos += bv->bv_len;
701                 }
702
703                 chain = chain->bi_next;
704         }
705 }
706
707 /*
708  * bio_chain_clone - clone a chain of bios up to a certain length.
709  * might return a bio_pair that will need to be released.
710  */
711 static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
712                                    struct bio_pair **bp,
713                                    int len, gfp_t gfpmask)
714 {
715         struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
716         int total = 0;
717
718         if (*bp) {
719                 bio_pair_release(*bp);
720                 *bp = NULL;
721         }
722
723         while (old_chain && (total < len)) {
724                 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
725                 if (!tmp)
726                         goto err_out;
727
728                 if (total + old_chain->bi_size > len) {
729                         struct bio_pair *bp;
730
731                         /*
732                          * this split can only happen with a single paged bio,
733                          * split_bio will BUG_ON if this is not the case
734                          */
735                         dout("bio_chain_clone split! total=%d remaining=%d"
736                              "bi_size=%d\n",
737                              (int)total, (int)len-total,
738                              (int)old_chain->bi_size);
739
740                         /* split the bio. We'll release it either in the next
741                            call, or it will have to be released outside */
742                         bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
743                         if (!bp)
744                                 goto err_out;
745
746                         __bio_clone(tmp, &bp->bio1);
747
748                         *next = &bp->bio2;
749                 } else {
750                         __bio_clone(tmp, old_chain);
751                         *next = old_chain->bi_next;
752                 }
753
754                 tmp->bi_bdev = NULL;
755                 gfpmask &= ~__GFP_WAIT;
756                 tmp->bi_next = NULL;
757
758                 if (!new_chain) {
759                         new_chain = tail = tmp;
760                 } else {
761                         tail->bi_next = tmp;
762                         tail = tmp;
763                 }
764                 old_chain = old_chain->bi_next;
765
766                 total += tmp->bi_size;
767         }
768
769         BUG_ON(total < len);
770
771         if (tail)
772                 tail->bi_next = NULL;
773
774         *old = old_chain;
775
776         return new_chain;
777
778 err_out:
779         dout("bio_chain_clone with err\n");
780         bio_chain_put(new_chain);
781         return NULL;
782 }
783
784 /*
785  * helpers for osd request op vectors.
786  */
787 static int rbd_create_rw_ops(struct ceph_osd_req_op **ops,
788                             int num_ops,
789                             int opcode,
790                             u32 payload_len)
791 {
792         *ops = kzalloc(sizeof(struct ceph_osd_req_op) * (num_ops + 1),
793                        GFP_NOIO);
794         if (!*ops)
795                 return -ENOMEM;
796         (*ops)[0].op = opcode;
797         /*
798          * op extent offset and length will be set later on
799          * in calc_raw_layout()
800          */
801         (*ops)[0].payload_len = payload_len;
802         return 0;
803 }
804
805 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
806 {
807         kfree(ops);
808 }
809
810 static void rbd_coll_end_req_index(struct request *rq,
811                                    struct rbd_req_coll *coll,
812                                    int index,
813                                    int ret, u64 len)
814 {
815         struct request_queue *q;
816         int min, max, i;
817
818         dout("rbd_coll_end_req_index %p index %d ret %d len %lld\n",
819              coll, index, ret, len);
820
821         if (!rq)
822                 return;
823
824         if (!coll) {
825                 blk_end_request(rq, ret, len);
826                 return;
827         }
828
829         q = rq->q;
830
831         spin_lock_irq(q->queue_lock);
832         coll->status[index].done = 1;
833         coll->status[index].rc = ret;
834         coll->status[index].bytes = len;
835         max = min = coll->num_done;
836         while (max < coll->total && coll->status[max].done)
837                 max++;
838
839         for (i = min; i<max; i++) {
840                 __blk_end_request(rq, coll->status[i].rc,
841                                   coll->status[i].bytes);
842                 coll->num_done++;
843                 kref_put(&coll->kref, rbd_coll_release);
844         }
845         spin_unlock_irq(q->queue_lock);
846 }
847
848 static void rbd_coll_end_req(struct rbd_request *req,
849                              int ret, u64 len)
850 {
851         rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
852 }
853
854 /*
855  * Send ceph osd request
856  */
857 static int rbd_do_request(struct request *rq,
858                           struct rbd_device *rbd_dev,
859                           struct ceph_snap_context *snapc,
860                           u64 snapid,
861                           const char *object_name, u64 ofs, u64 len,
862                           struct bio *bio,
863                           struct page **pages,
864                           int num_pages,
865                           int flags,
866                           struct ceph_osd_req_op *ops,
867                           int num_reply,
868                           struct rbd_req_coll *coll,
869                           int coll_index,
870                           void (*rbd_cb)(struct ceph_osd_request *req,
871                                          struct ceph_msg *msg),
872                           struct ceph_osd_request **linger_req,
873                           u64 *ver)
874 {
875         struct ceph_osd_request *req;
876         struct ceph_file_layout *layout;
877         int ret;
878         u64 bno;
879         struct timespec mtime = CURRENT_TIME;
880         struct rbd_request *req_data;
881         struct ceph_osd_request_head *reqhead;
882         struct ceph_osd_client *osdc;
883
884         req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
885         if (!req_data) {
886                 if (coll)
887                         rbd_coll_end_req_index(rq, coll, coll_index,
888                                                -ENOMEM, len);
889                 return -ENOMEM;
890         }
891
892         if (coll) {
893                 req_data->coll = coll;
894                 req_data->coll_index = coll_index;
895         }
896
897         dout("rbd_do_request object_name=%s ofs=%lld len=%lld\n",
898                 object_name, len, ofs);
899
900         down_read(&rbd_dev->header_rwsem);
901
902         osdc = &rbd_dev->rbd_client->client->osdc;
903         req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
904                                         false, GFP_NOIO, pages, bio);
905         if (!req) {
906                 up_read(&rbd_dev->header_rwsem);
907                 ret = -ENOMEM;
908                 goto done_pages;
909         }
910
911         req->r_callback = rbd_cb;
912
913         req_data->rq = rq;
914         req_data->bio = bio;
915         req_data->pages = pages;
916         req_data->len = len;
917
918         req->r_priv = req_data;
919
920         reqhead = req->r_request->front.iov_base;
921         reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
922
923         strncpy(req->r_oid, object_name, sizeof(req->r_oid));
924         req->r_oid_len = strlen(req->r_oid);
925
926         layout = &req->r_file_layout;
927         memset(layout, 0, sizeof(*layout));
928         layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
929         layout->fl_stripe_count = cpu_to_le32(1);
930         layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
931         layout->fl_pg_pool = cpu_to_le32(rbd_dev->pool_id);
932         ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
933                                 req, ops);
934
935         ceph_osdc_build_request(req, ofs, &len,
936                                 ops,
937                                 snapc,
938                                 &mtime,
939                                 req->r_oid, req->r_oid_len);
940         up_read(&rbd_dev->header_rwsem);
941
942         if (linger_req) {
943                 ceph_osdc_set_request_linger(osdc, req);
944                 *linger_req = req;
945         }
946
947         ret = ceph_osdc_start_request(osdc, req, false);
948         if (ret < 0)
949                 goto done_err;
950
951         if (!rbd_cb) {
952                 ret = ceph_osdc_wait_request(osdc, req);
953                 if (ver)
954                         *ver = le64_to_cpu(req->r_reassert_version.version);
955                 dout("reassert_ver=%lld\n",
956                      le64_to_cpu(req->r_reassert_version.version));
957                 ceph_osdc_put_request(req);
958         }
959         return ret;
960
961 done_err:
962         bio_chain_put(req_data->bio);
963         ceph_osdc_put_request(req);
964 done_pages:
965         rbd_coll_end_req(req_data, ret, len);
966         kfree(req_data);
967         return ret;
968 }
969
970 /*
971  * Ceph osd op callback
972  */
973 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
974 {
975         struct rbd_request *req_data = req->r_priv;
976         struct ceph_osd_reply_head *replyhead;
977         struct ceph_osd_op *op;
978         __s32 rc;
979         u64 bytes;
980         int read_op;
981
982         /* parse reply */
983         replyhead = msg->front.iov_base;
984         WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
985         op = (void *)(replyhead + 1);
986         rc = le32_to_cpu(replyhead->result);
987         bytes = le64_to_cpu(op->extent.length);
988         read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
989
990         dout("rbd_req_cb bytes=%lld readop=%d rc=%d\n", bytes, read_op, rc);
991
992         if (rc == -ENOENT && read_op) {
993                 zero_bio_chain(req_data->bio, 0);
994                 rc = 0;
995         } else if (rc == 0 && read_op && bytes < req_data->len) {
996                 zero_bio_chain(req_data->bio, bytes);
997                 bytes = req_data->len;
998         }
999
1000         rbd_coll_end_req(req_data, rc, bytes);
1001
1002         if (req_data->bio)
1003                 bio_chain_put(req_data->bio);
1004
1005         ceph_osdc_put_request(req);
1006         kfree(req_data);
1007 }
1008
1009 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1010 {
1011         ceph_osdc_put_request(req);
1012 }
1013
1014 /*
1015  * Do a synchronous ceph osd operation
1016  */
1017 static int rbd_req_sync_op(struct rbd_device *rbd_dev,
1018                            struct ceph_snap_context *snapc,
1019                            u64 snapid,
1020                            int opcode,
1021                            int flags,
1022                            struct ceph_osd_req_op *orig_ops,
1023                            int num_reply,
1024                            const char *object_name,
1025                            u64 ofs, u64 len,
1026                            char *buf,
1027                            struct ceph_osd_request **linger_req,
1028                            u64 *ver)
1029 {
1030         int ret;
1031         struct page **pages;
1032         int num_pages;
1033         struct ceph_osd_req_op *ops = orig_ops;
1034         u32 payload_len;
1035
1036         num_pages = calc_pages_for(ofs , len);
1037         pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1038         if (IS_ERR(pages))
1039                 return PTR_ERR(pages);
1040
1041         if (!orig_ops) {
1042                 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? len : 0);
1043                 ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1044                 if (ret < 0)
1045                         goto done;
1046
1047                 if ((flags & CEPH_OSD_FLAG_WRITE) && buf) {
1048                         ret = ceph_copy_to_page_vector(pages, buf, ofs, len);
1049                         if (ret < 0)
1050                                 goto done_ops;
1051                 }
1052         }
1053
1054         ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
1055                           object_name, ofs, len, NULL,
1056                           pages, num_pages,
1057                           flags,
1058                           ops,
1059                           2,
1060                           NULL, 0,
1061                           NULL,
1062                           linger_req, ver);
1063         if (ret < 0)
1064                 goto done_ops;
1065
1066         if ((flags & CEPH_OSD_FLAG_READ) && buf)
1067                 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1068
1069 done_ops:
1070         if (!orig_ops)
1071                 rbd_destroy_ops(ops);
1072 done:
1073         ceph_release_page_vector(pages, num_pages);
1074         return ret;
1075 }
1076
1077 /*
1078  * Do an asynchronous ceph osd operation
1079  */
1080 static int rbd_do_op(struct request *rq,
1081                      struct rbd_device *rbd_dev,
1082                      struct ceph_snap_context *snapc,
1083                      u64 snapid,
1084                      int opcode, int flags, int num_reply,
1085                      u64 ofs, u64 len,
1086                      struct bio *bio,
1087                      struct rbd_req_coll *coll,
1088                      int coll_index)
1089 {
1090         char *seg_name;
1091         u64 seg_ofs;
1092         u64 seg_len;
1093         int ret;
1094         struct ceph_osd_req_op *ops;
1095         u32 payload_len;
1096
1097         seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1098         if (!seg_name)
1099                 return -ENOMEM;
1100
1101         seg_len = rbd_get_segment(&rbd_dev->header,
1102                                   rbd_dev->header.object_prefix,
1103                                   ofs, len,
1104                                   seg_name, &seg_ofs);
1105
1106         payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1107
1108         ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1109         if (ret < 0)
1110                 goto done;
1111
1112         /* we've taken care of segment sizes earlier when we
1113            cloned the bios. We should never have a segment
1114            truncated at this point */
1115         BUG_ON(seg_len < len);
1116
1117         ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1118                              seg_name, seg_ofs, seg_len,
1119                              bio,
1120                              NULL, 0,
1121                              flags,
1122                              ops,
1123                              num_reply,
1124                              coll, coll_index,
1125                              rbd_req_cb, 0, NULL);
1126
1127         rbd_destroy_ops(ops);
1128 done:
1129         kfree(seg_name);
1130         return ret;
1131 }
1132
1133 /*
1134  * Request async osd write
1135  */
1136 static int rbd_req_write(struct request *rq,
1137                          struct rbd_device *rbd_dev,
1138                          struct ceph_snap_context *snapc,
1139                          u64 ofs, u64 len,
1140                          struct bio *bio,
1141                          struct rbd_req_coll *coll,
1142                          int coll_index)
1143 {
1144         return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1145                          CEPH_OSD_OP_WRITE,
1146                          CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1147                          2,
1148                          ofs, len, bio, coll, coll_index);
1149 }
1150
1151 /*
1152  * Request async osd read
1153  */
1154 static int rbd_req_read(struct request *rq,
1155                          struct rbd_device *rbd_dev,
1156                          u64 snapid,
1157                          u64 ofs, u64 len,
1158                          struct bio *bio,
1159                          struct rbd_req_coll *coll,
1160                          int coll_index)
1161 {
1162         return rbd_do_op(rq, rbd_dev, NULL,
1163                          snapid,
1164                          CEPH_OSD_OP_READ,
1165                          CEPH_OSD_FLAG_READ,
1166                          2,
1167                          ofs, len, bio, coll, coll_index);
1168 }
1169
1170 /*
1171  * Request sync osd read
1172  */
1173 static int rbd_req_sync_read(struct rbd_device *rbd_dev,
1174                           struct ceph_snap_context *snapc,
1175                           u64 snapid,
1176                           const char *object_name,
1177                           u64 ofs, u64 len,
1178                           char *buf,
1179                           u64 *ver)
1180 {
1181         return rbd_req_sync_op(rbd_dev, NULL,
1182                                snapid,
1183                                CEPH_OSD_OP_READ,
1184                                CEPH_OSD_FLAG_READ,
1185                                NULL,
1186                                1, object_name, ofs, len, buf, NULL, ver);
1187 }
1188
1189 /*
1190  * Request sync osd watch
1191  */
1192 static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
1193                                    u64 ver,
1194                                    u64 notify_id,
1195                                    const char *object_name)
1196 {
1197         struct ceph_osd_req_op *ops;
1198         int ret;
1199
1200         ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1201         if (ret < 0)
1202                 return ret;
1203
1204         ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version);
1205         ops[0].watch.cookie = notify_id;
1206         ops[0].watch.flag = 0;
1207
1208         ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
1209                           object_name, 0, 0, NULL,
1210                           NULL, 0,
1211                           CEPH_OSD_FLAG_READ,
1212                           ops,
1213                           1,
1214                           NULL, 0,
1215                           rbd_simple_req_cb, 0, NULL);
1216
1217         rbd_destroy_ops(ops);
1218         return ret;
1219 }
1220
1221 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1222 {
1223         struct rbd_device *rbd_dev = (struct rbd_device *)data;
1224         int rc;
1225
1226         if (!rbd_dev)
1227                 return;
1228
1229         dout("rbd_watch_cb %s notify_id=%lld opcode=%d\n",
1230                 rbd_dev->header_name, notify_id, (int) opcode);
1231         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1232         rc = __rbd_refresh_header(rbd_dev);
1233         mutex_unlock(&ctl_mutex);
1234         if (rc)
1235                 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1236                            " update snaps: %d\n", rbd_dev->major, rc);
1237
1238         rbd_req_sync_notify_ack(rbd_dev, ver, notify_id, rbd_dev->header_name);
1239 }
1240
1241 /*
1242  * Request sync osd watch
1243  */
1244 static int rbd_req_sync_watch(struct rbd_device *rbd_dev,
1245                               const char *object_name,
1246                               u64 ver)
1247 {
1248         struct ceph_osd_req_op *ops;
1249         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1250
1251         int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1252         if (ret < 0)
1253                 return ret;
1254
1255         ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1256                                      (void *)rbd_dev, &rbd_dev->watch_event);
1257         if (ret < 0)
1258                 goto fail;
1259
1260         ops[0].watch.ver = cpu_to_le64(ver);
1261         ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1262         ops[0].watch.flag = 1;
1263
1264         ret = rbd_req_sync_op(rbd_dev, NULL,
1265                               CEPH_NOSNAP,
1266                               0,
1267                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1268                               ops,
1269                               1, object_name, 0, 0, NULL,
1270                               &rbd_dev->watch_request, NULL);
1271
1272         if (ret < 0)
1273                 goto fail_event;
1274
1275         rbd_destroy_ops(ops);
1276         return 0;
1277
1278 fail_event:
1279         ceph_osdc_cancel_event(rbd_dev->watch_event);
1280         rbd_dev->watch_event = NULL;
1281 fail:
1282         rbd_destroy_ops(ops);
1283         return ret;
1284 }
1285
1286 /*
1287  * Request sync osd unwatch
1288  */
1289 static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev,
1290                                 const char *object_name)
1291 {
1292         struct ceph_osd_req_op *ops;
1293
1294         int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1295         if (ret < 0)
1296                 return ret;
1297
1298         ops[0].watch.ver = 0;
1299         ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1300         ops[0].watch.flag = 0;
1301
1302         ret = rbd_req_sync_op(rbd_dev, NULL,
1303                               CEPH_NOSNAP,
1304                               0,
1305                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1306                               ops,
1307                               1, object_name, 0, 0, NULL, NULL, NULL);
1308
1309         rbd_destroy_ops(ops);
1310         ceph_osdc_cancel_event(rbd_dev->watch_event);
1311         rbd_dev->watch_event = NULL;
1312         return ret;
1313 }
1314
1315 struct rbd_notify_info {
1316         struct rbd_device *rbd_dev;
1317 };
1318
1319 static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1320 {
1321         struct rbd_device *rbd_dev = (struct rbd_device *)data;
1322         if (!rbd_dev)
1323                 return;
1324
1325         dout("rbd_notify_cb %s notify_id=%lld opcode=%d\n",
1326                                 rbd_dev->header_name,
1327                 notify_id, (int)opcode);
1328 }
1329
1330 /*
1331  * Request sync osd notify
1332  */
1333 static int rbd_req_sync_notify(struct rbd_device *rbd_dev,
1334                           const char *object_name)
1335 {
1336         struct ceph_osd_req_op *ops;
1337         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1338         struct ceph_osd_event *event;
1339         struct rbd_notify_info info;
1340         int payload_len = sizeof(u32) + sizeof(u32);
1341         int ret;
1342
1343         ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY, payload_len);
1344         if (ret < 0)
1345                 return ret;
1346
1347         info.rbd_dev = rbd_dev;
1348
1349         ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1350                                      (void *)&info, &event);
1351         if (ret < 0)
1352                 goto fail;
1353
1354         ops[0].watch.ver = 1;
1355         ops[0].watch.flag = 1;
1356         ops[0].watch.cookie = event->cookie;
1357         ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1358         ops[0].watch.timeout = 12;
1359
1360         ret = rbd_req_sync_op(rbd_dev, NULL,
1361                                CEPH_NOSNAP,
1362                                0,
1363                                CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1364                                ops,
1365                                1, object_name, 0, 0, NULL, NULL, NULL);
1366         if (ret < 0)
1367                 goto fail_event;
1368
1369         ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1370         dout("ceph_osdc_wait_event returned %d\n", ret);
1371         rbd_destroy_ops(ops);
1372         return 0;
1373
1374 fail_event:
1375         ceph_osdc_cancel_event(event);
1376 fail:
1377         rbd_destroy_ops(ops);
1378         return ret;
1379 }
1380
1381 /*
1382  * Request sync osd read
1383  */
1384 static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
1385                              const char *object_name,
1386                              const char *class_name,
1387                              const char *method_name,
1388                              const char *data,
1389                              int len,
1390                              u64 *ver)
1391 {
1392         struct ceph_osd_req_op *ops;
1393         int class_name_len = strlen(class_name);
1394         int method_name_len = strlen(method_name);
1395         int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_CALL,
1396                                     class_name_len + method_name_len + len);
1397         if (ret < 0)
1398                 return ret;
1399
1400         ops[0].cls.class_name = class_name;
1401         ops[0].cls.class_len = (__u8) class_name_len;
1402         ops[0].cls.method_name = method_name;
1403         ops[0].cls.method_len = (__u8) method_name_len;
1404         ops[0].cls.argc = 0;
1405         ops[0].cls.indata = data;
1406         ops[0].cls.indata_len = len;
1407
1408         ret = rbd_req_sync_op(rbd_dev, NULL,
1409                                CEPH_NOSNAP,
1410                                0,
1411                                CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1412                                ops,
1413                                1, object_name, 0, 0, NULL, NULL, ver);
1414
1415         rbd_destroy_ops(ops);
1416
1417         dout("cls_exec returned %d\n", ret);
1418         return ret;
1419 }
1420
1421 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1422 {
1423         struct rbd_req_coll *coll =
1424                         kzalloc(sizeof(struct rbd_req_coll) +
1425                                 sizeof(struct rbd_req_status) * num_reqs,
1426                                 GFP_ATOMIC);
1427
1428         if (!coll)
1429                 return NULL;
1430         coll->total = num_reqs;
1431         kref_init(&coll->kref);
1432         return coll;
1433 }
1434
1435 /*
1436  * block device queue callback
1437  */
1438 static void rbd_rq_fn(struct request_queue *q)
1439 {
1440         struct rbd_device *rbd_dev = q->queuedata;
1441         struct request *rq;
1442         struct bio_pair *bp = NULL;
1443
1444         while ((rq = blk_fetch_request(q))) {
1445                 struct bio *bio;
1446                 struct bio *rq_bio, *next_bio = NULL;
1447                 bool do_write;
1448                 int size, op_size = 0;
1449                 u64 ofs;
1450                 int num_segs, cur_seg = 0;
1451                 struct rbd_req_coll *coll;
1452
1453                 /* peek at request from block layer */
1454                 if (!rq)
1455                         break;
1456
1457                 dout("fetched request\n");
1458
1459                 /* filter out block requests we don't understand */
1460                 if ((rq->cmd_type != REQ_TYPE_FS)) {
1461                         __blk_end_request_all(rq, 0);
1462                         continue;
1463                 }
1464
1465                 /* deduce our operation (read, write) */
1466                 do_write = (rq_data_dir(rq) == WRITE);
1467
1468                 size = blk_rq_bytes(rq);
1469                 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1470                 rq_bio = rq->bio;
1471                 if (do_write && rbd_dev->read_only) {
1472                         __blk_end_request_all(rq, -EROFS);
1473                         continue;
1474                 }
1475
1476                 spin_unlock_irq(q->queue_lock);
1477
1478                 dout("%s 0x%x bytes at 0x%llx\n",
1479                      do_write ? "write" : "read",
1480                      size, blk_rq_pos(rq) * SECTOR_SIZE);
1481
1482                 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1483                 coll = rbd_alloc_coll(num_segs);
1484                 if (!coll) {
1485                         spin_lock_irq(q->queue_lock);
1486                         __blk_end_request_all(rq, -ENOMEM);
1487                         continue;
1488                 }
1489
1490                 do {
1491                         /* a bio clone to be passed down to OSD req */
1492                         dout("rq->bio->bi_vcnt=%d\n", rq->bio->bi_vcnt);
1493                         op_size = rbd_get_segment(&rbd_dev->header,
1494                                                   rbd_dev->header.object_prefix,
1495                                                   ofs, size,
1496                                                   NULL, NULL);
1497                         kref_get(&coll->kref);
1498                         bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1499                                               op_size, GFP_ATOMIC);
1500                         if (!bio) {
1501                                 rbd_coll_end_req_index(rq, coll, cur_seg,
1502                                                        -ENOMEM, op_size);
1503                                 goto next_seg;
1504                         }
1505
1506
1507                         /* init OSD command: write or read */
1508                         if (do_write)
1509                                 rbd_req_write(rq, rbd_dev,
1510                                               rbd_dev->header.snapc,
1511                                               ofs,
1512                                               op_size, bio,
1513                                               coll, cur_seg);
1514                         else
1515                                 rbd_req_read(rq, rbd_dev,
1516                                              rbd_dev->snap_id,
1517                                              ofs,
1518                                              op_size, bio,
1519                                              coll, cur_seg);
1520
1521 next_seg:
1522                         size -= op_size;
1523                         ofs += op_size;
1524
1525                         cur_seg++;
1526                         rq_bio = next_bio;
1527                 } while (size > 0);
1528                 kref_put(&coll->kref, rbd_coll_release);
1529
1530                 if (bp)
1531                         bio_pair_release(bp);
1532                 spin_lock_irq(q->queue_lock);
1533         }
1534 }
1535
1536 /*
1537  * a queue callback. Makes sure that we don't create a bio that spans across
1538  * multiple osd objects. One exception would be with a single page bios,
1539  * which we handle later at bio_chain_clone
1540  */
1541 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1542                           struct bio_vec *bvec)
1543 {
1544         struct rbd_device *rbd_dev = q->queuedata;
1545         unsigned int chunk_sectors;
1546         sector_t sector;
1547         unsigned int bio_sectors;
1548         int max;
1549
1550         chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1551         sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1552         bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
1553
1554         max =  (chunk_sectors - ((sector & (chunk_sectors - 1))
1555                                  + bio_sectors)) << SECTOR_SHIFT;
1556         if (max < 0)
1557                 max = 0; /* bio_add cannot handle a negative return */
1558         if (max <= bvec->bv_len && bio_sectors == 0)
1559                 return bvec->bv_len;
1560         return max;
1561 }
1562
1563 static void rbd_free_disk(struct rbd_device *rbd_dev)
1564 {
1565         struct gendisk *disk = rbd_dev->disk;
1566
1567         if (!disk)
1568                 return;
1569
1570         rbd_header_free(&rbd_dev->header);
1571
1572         if (disk->flags & GENHD_FL_UP)
1573                 del_gendisk(disk);
1574         if (disk->queue)
1575                 blk_cleanup_queue(disk->queue);
1576         put_disk(disk);
1577 }
1578
1579 /*
1580  * reload the ondisk the header 
1581  */
1582 static int rbd_read_header(struct rbd_device *rbd_dev,
1583                            struct rbd_image_header *header)
1584 {
1585         ssize_t rc;
1586         struct rbd_image_header_ondisk *dh;
1587         u32 snap_count = 0;
1588         u64 ver;
1589         size_t len;
1590
1591         /*
1592          * First reads the fixed-size header to determine the number
1593          * of snapshots, then re-reads it, along with all snapshot
1594          * records as well as their stored names.
1595          */
1596         len = sizeof (*dh);
1597         while (1) {
1598                 dh = kmalloc(len, GFP_KERNEL);
1599                 if (!dh)
1600                         return -ENOMEM;
1601
1602                 rc = rbd_req_sync_read(rbd_dev,
1603                                        NULL, CEPH_NOSNAP,
1604                                        rbd_dev->header_name,
1605                                        0, len,
1606                                        (char *)dh, &ver);
1607                 if (rc < 0)
1608                         goto out_dh;
1609
1610                 rc = rbd_header_from_disk(header, dh, snap_count, GFP_KERNEL);
1611                 if (rc < 0) {
1612                         if (rc == -ENXIO)
1613                                 pr_warning("unrecognized header format"
1614                                            " for image %s\n",
1615                                            rbd_dev->image_name);
1616                         goto out_dh;
1617                 }
1618
1619                 if (snap_count == header->total_snaps)
1620                         break;
1621
1622                 snap_count = header->total_snaps;
1623                 len = sizeof (*dh) +
1624                         snap_count * sizeof(struct rbd_image_snap_ondisk) +
1625                         header->snap_names_len;
1626
1627                 rbd_header_free(header);
1628                 kfree(dh);
1629         }
1630         header->obj_version = ver;
1631
1632 out_dh:
1633         kfree(dh);
1634         return rc;
1635 }
1636
1637 /*
1638  * create a snapshot
1639  */
1640 static int rbd_header_add_snap(struct rbd_device *rbd_dev,
1641                                const char *snap_name,
1642                                gfp_t gfp_flags)
1643 {
1644         int name_len = strlen(snap_name);
1645         u64 new_snapid;
1646         int ret;
1647         void *data, *p, *e;
1648         u64 ver;
1649         struct ceph_mon_client *monc;
1650
1651         /* we should create a snapshot only if we're pointing at the head */
1652         if (rbd_dev->snap_id != CEPH_NOSNAP)
1653                 return -EINVAL;
1654
1655         monc = &rbd_dev->rbd_client->client->monc;
1656         ret = ceph_monc_create_snapid(monc, rbd_dev->pool_id, &new_snapid);
1657         dout("created snapid=%lld\n", new_snapid);
1658         if (ret < 0)
1659                 return ret;
1660
1661         data = kmalloc(name_len + 16, gfp_flags);
1662         if (!data)
1663                 return -ENOMEM;
1664
1665         p = data;
1666         e = data + name_len + 16;
1667
1668         ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
1669         ceph_encode_64_safe(&p, e, new_snapid, bad);
1670
1671         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
1672                                 "rbd", "snap_add",
1673                                 data, p - data, &ver);
1674
1675         kfree(data);
1676
1677         if (ret < 0)
1678                 return ret;
1679
1680         down_write(&rbd_dev->header_rwsem);
1681         rbd_dev->header.snapc->seq = new_snapid;
1682         up_write(&rbd_dev->header_rwsem);
1683
1684         return 0;
1685 bad:
1686         return -ERANGE;
1687 }
1688
1689 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1690 {
1691         struct rbd_snap *snap;
1692
1693         while (!list_empty(&rbd_dev->snaps)) {
1694                 snap = list_first_entry(&rbd_dev->snaps, struct rbd_snap, node);
1695                 __rbd_remove_snap_dev(rbd_dev, snap);
1696         }
1697 }
1698
1699 /*
1700  * only read the first part of the ondisk header, without the snaps info
1701  */
1702 static int __rbd_refresh_header(struct rbd_device *rbd_dev)
1703 {
1704         int ret;
1705         struct rbd_image_header h;
1706         u64 snap_seq;
1707         int follow_seq = 0;
1708
1709         ret = rbd_read_header(rbd_dev, &h);
1710         if (ret < 0)
1711                 return ret;
1712
1713         /* resized? */
1714         set_capacity(rbd_dev->disk, h.image_size / SECTOR_SIZE);
1715
1716         down_write(&rbd_dev->header_rwsem);
1717
1718         snap_seq = rbd_dev->header.snapc->seq;
1719         if (rbd_dev->header.total_snaps &&
1720             rbd_dev->header.snapc->snaps[0] == snap_seq)
1721                 /* pointing at the head, will need to follow that
1722                    if head moves */
1723                 follow_seq = 1;
1724
1725         /* rbd_dev->header.object_prefix shouldn't change */
1726         kfree(rbd_dev->header.snap_sizes);
1727         kfree(rbd_dev->header.snap_names);
1728         kfree(rbd_dev->header.snapc);
1729
1730         rbd_dev->header.total_snaps = h.total_snaps;
1731         rbd_dev->header.snapc = h.snapc;
1732         rbd_dev->header.snap_names = h.snap_names;
1733         rbd_dev->header.snap_names_len = h.snap_names_len;
1734         rbd_dev->header.snap_sizes = h.snap_sizes;
1735         /* Free the extra copy of the object prefix */
1736         WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1737         kfree(h.object_prefix);
1738
1739         if (follow_seq)
1740                 rbd_dev->header.snapc->seq = rbd_dev->header.snapc->snaps[0];
1741         else
1742                 rbd_dev->header.snapc->seq = snap_seq;
1743
1744         ret = __rbd_init_snaps_header(rbd_dev);
1745
1746         up_write(&rbd_dev->header_rwsem);
1747
1748         return ret;
1749 }
1750
1751 static int rbd_init_disk(struct rbd_device *rbd_dev)
1752 {
1753         struct gendisk *disk;
1754         struct request_queue *q;
1755         int rc;
1756         u64 segment_size;
1757         u64 total_size = 0;
1758
1759         /* contact OSD, request size info about the object being mapped */
1760         rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1761         if (rc)
1762                 return rc;
1763
1764         /* no need to lock here, as rbd_dev is not registered yet */
1765         rc = __rbd_init_snaps_header(rbd_dev);
1766         if (rc)
1767                 return rc;
1768
1769         rc = rbd_header_set_snap(rbd_dev, &total_size);
1770         if (rc)
1771                 return rc;
1772
1773         /* create gendisk info */
1774         rc = -ENOMEM;
1775         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1776         if (!disk)
1777                 goto out;
1778
1779         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1780                  rbd_dev->id);
1781         disk->major = rbd_dev->major;
1782         disk->first_minor = 0;
1783         disk->fops = &rbd_bd_ops;
1784         disk->private_data = rbd_dev;
1785
1786         /* init rq */
1787         rc = -ENOMEM;
1788         q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1789         if (!q)
1790                 goto out_disk;
1791
1792         /* We use the default size, but let's be explicit about it. */
1793         blk_queue_physical_block_size(q, SECTOR_SIZE);
1794
1795         /* set io sizes to object size */
1796         segment_size = rbd_obj_bytes(&rbd_dev->header);
1797         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1798         blk_queue_max_segment_size(q, segment_size);
1799         blk_queue_io_min(q, segment_size);
1800         blk_queue_io_opt(q, segment_size);
1801
1802         blk_queue_merge_bvec(q, rbd_merge_bvec);
1803         disk->queue = q;
1804
1805         q->queuedata = rbd_dev;
1806
1807         rbd_dev->disk = disk;
1808         rbd_dev->q = q;
1809
1810         /* finally, announce the disk to the world */
1811         set_capacity(disk, total_size / SECTOR_SIZE);
1812         add_disk(disk);
1813
1814         pr_info("%s: added with size 0x%llx\n",
1815                 disk->disk_name, (unsigned long long)total_size);
1816         return 0;
1817
1818 out_disk:
1819         put_disk(disk);
1820 out:
1821         return rc;
1822 }
1823
1824 /*
1825   sysfs
1826 */
1827
1828 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1829 {
1830         return container_of(dev, struct rbd_device, dev);
1831 }
1832
1833 static ssize_t rbd_size_show(struct device *dev,
1834                              struct device_attribute *attr, char *buf)
1835 {
1836         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1837
1838         return sprintf(buf, "%llu\n", (unsigned long long)rbd_dev->header.image_size);
1839 }
1840
1841 static ssize_t rbd_major_show(struct device *dev,
1842                               struct device_attribute *attr, char *buf)
1843 {
1844         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1845
1846         return sprintf(buf, "%d\n", rbd_dev->major);
1847 }
1848
1849 static ssize_t rbd_client_id_show(struct device *dev,
1850                                   struct device_attribute *attr, char *buf)
1851 {
1852         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1853
1854         return sprintf(buf, "client%lld\n",
1855                         ceph_client_id(rbd_dev->rbd_client->client));
1856 }
1857
1858 static ssize_t rbd_pool_show(struct device *dev,
1859                              struct device_attribute *attr, char *buf)
1860 {
1861         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1862
1863         return sprintf(buf, "%s\n", rbd_dev->pool_name);
1864 }
1865
1866 static ssize_t rbd_pool_id_show(struct device *dev,
1867                              struct device_attribute *attr, char *buf)
1868 {
1869         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1870
1871         return sprintf(buf, "%d\n", rbd_dev->pool_id);
1872 }
1873
1874 static ssize_t rbd_name_show(struct device *dev,
1875                              struct device_attribute *attr, char *buf)
1876 {
1877         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1878
1879         return sprintf(buf, "%s\n", rbd_dev->image_name);
1880 }
1881
1882 static ssize_t rbd_snap_show(struct device *dev,
1883                              struct device_attribute *attr,
1884                              char *buf)
1885 {
1886         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1887
1888         return sprintf(buf, "%s\n", rbd_dev->snap_name);
1889 }
1890
1891 static ssize_t rbd_image_refresh(struct device *dev,
1892                                  struct device_attribute *attr,
1893                                  const char *buf,
1894                                  size_t size)
1895 {
1896         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1897         int rc;
1898         int ret = size;
1899
1900         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1901
1902         rc = __rbd_refresh_header(rbd_dev);
1903         if (rc < 0)
1904                 ret = rc;
1905
1906         mutex_unlock(&ctl_mutex);
1907         return ret;
1908 }
1909
1910 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1911 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1912 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1913 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1914 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
1915 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1916 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1917 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1918 static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
1919
1920 static struct attribute *rbd_attrs[] = {
1921         &dev_attr_size.attr,
1922         &dev_attr_major.attr,
1923         &dev_attr_client_id.attr,
1924         &dev_attr_pool.attr,
1925         &dev_attr_pool_id.attr,
1926         &dev_attr_name.attr,
1927         &dev_attr_current_snap.attr,
1928         &dev_attr_refresh.attr,
1929         &dev_attr_create_snap.attr,
1930         NULL
1931 };
1932
1933 static struct attribute_group rbd_attr_group = {
1934         .attrs = rbd_attrs,
1935 };
1936
1937 static const struct attribute_group *rbd_attr_groups[] = {
1938         &rbd_attr_group,
1939         NULL
1940 };
1941
1942 static void rbd_sysfs_dev_release(struct device *dev)
1943 {
1944 }
1945
1946 static struct device_type rbd_device_type = {
1947         .name           = "rbd",
1948         .groups         = rbd_attr_groups,
1949         .release        = rbd_sysfs_dev_release,
1950 };
1951
1952
1953 /*
1954   sysfs - snapshots
1955 */
1956
1957 static ssize_t rbd_snap_size_show(struct device *dev,
1958                                   struct device_attribute *attr,
1959                                   char *buf)
1960 {
1961         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1962
1963         return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
1964 }
1965
1966 static ssize_t rbd_snap_id_show(struct device *dev,
1967                                 struct device_attribute *attr,
1968                                 char *buf)
1969 {
1970         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1971
1972         return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
1973 }
1974
1975 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
1976 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
1977
1978 static struct attribute *rbd_snap_attrs[] = {
1979         &dev_attr_snap_size.attr,
1980         &dev_attr_snap_id.attr,
1981         NULL,
1982 };
1983
1984 static struct attribute_group rbd_snap_attr_group = {
1985         .attrs = rbd_snap_attrs,
1986 };
1987
1988 static void rbd_snap_dev_release(struct device *dev)
1989 {
1990         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1991         kfree(snap->name);
1992         kfree(snap);
1993 }
1994
1995 static const struct attribute_group *rbd_snap_attr_groups[] = {
1996         &rbd_snap_attr_group,
1997         NULL
1998 };
1999
2000 static struct device_type rbd_snap_device_type = {
2001         .groups         = rbd_snap_attr_groups,
2002         .release        = rbd_snap_dev_release,
2003 };
2004
2005 static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
2006                                   struct rbd_snap *snap)
2007 {
2008         list_del(&snap->node);
2009         device_unregister(&snap->dev);
2010 }
2011
2012 static int rbd_register_snap_dev(struct rbd_device *rbd_dev,
2013                                   struct rbd_snap *snap,
2014                                   struct device *parent)
2015 {
2016         struct device *dev = &snap->dev;
2017         int ret;
2018
2019         dev->type = &rbd_snap_device_type;
2020         dev->parent = parent;
2021         dev->release = rbd_snap_dev_release;
2022         dev_set_name(dev, "snap_%s", snap->name);
2023         ret = device_register(dev);
2024
2025         return ret;
2026 }
2027
2028 static int __rbd_add_snap_dev(struct rbd_device *rbd_dev,
2029                               int i, const char *name,
2030                               struct rbd_snap **snapp)
2031 {
2032         int ret;
2033         struct rbd_snap *snap = kzalloc(sizeof(*snap), GFP_KERNEL);
2034         if (!snap)
2035                 return -ENOMEM;
2036         snap->name = kstrdup(name, GFP_KERNEL);
2037         snap->size = rbd_dev->header.snap_sizes[i];
2038         snap->id = rbd_dev->header.snapc->snaps[i];
2039         if (device_is_registered(&rbd_dev->dev)) {
2040                 ret = rbd_register_snap_dev(rbd_dev, snap,
2041                                              &rbd_dev->dev);
2042                 if (ret < 0)
2043                         goto err;
2044         }
2045         *snapp = snap;
2046         return 0;
2047 err:
2048         kfree(snap->name);
2049         kfree(snap);
2050         return ret;
2051 }
2052
2053 /*
2054  * search for the previous snap in a null delimited string list
2055  */
2056 const char *rbd_prev_snap_name(const char *name, const char *start)
2057 {
2058         if (name < start + 2)
2059                 return NULL;
2060
2061         name -= 2;
2062         while (*name) {
2063                 if (name == start)
2064                         return start;
2065                 name--;
2066         }
2067         return name + 1;
2068 }
2069
2070 /*
2071  * compare the old list of snapshots that we have to what's in the header
2072  * and update it accordingly. Note that the header holds the snapshots
2073  * in a reverse order (from newest to oldest) and we need to go from
2074  * older to new so that we don't get a duplicate snap name when
2075  * doing the process (e.g., removed snapshot and recreated a new
2076  * one with the same name.
2077  */
2078 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
2079 {
2080         const char *name, *first_name;
2081         int i = rbd_dev->header.total_snaps;
2082         struct rbd_snap *snap, *old_snap = NULL;
2083         int ret;
2084         struct list_head *p, *n;
2085
2086         first_name = rbd_dev->header.snap_names;
2087         name = first_name + rbd_dev->header.snap_names_len;
2088
2089         list_for_each_prev_safe(p, n, &rbd_dev->snaps) {
2090                 u64 cur_id;
2091
2092                 old_snap = list_entry(p, struct rbd_snap, node);
2093
2094                 if (i)
2095                         cur_id = rbd_dev->header.snapc->snaps[i - 1];
2096
2097                 if (!i || old_snap->id < cur_id) {
2098                         /* old_snap->id was skipped, thus was removed */
2099                         __rbd_remove_snap_dev(rbd_dev, old_snap);
2100                         continue;
2101                 }
2102                 if (old_snap->id == cur_id) {
2103                         /* we have this snapshot already */
2104                         i--;
2105                         name = rbd_prev_snap_name(name, first_name);
2106                         continue;
2107                 }
2108                 for (; i > 0;
2109                      i--, name = rbd_prev_snap_name(name, first_name)) {
2110                         if (!name) {
2111                                 WARN_ON(1);
2112                                 return -EINVAL;
2113                         }
2114                         cur_id = rbd_dev->header.snapc->snaps[i];
2115                         /* snapshot removal? handle it above */
2116                         if (cur_id >= old_snap->id)
2117                                 break;
2118                         /* a new snapshot */
2119                         ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2120                         if (ret < 0)
2121                                 return ret;
2122
2123                         /* note that we add it backward so using n and not p */
2124                         list_add(&snap->node, n);
2125                         p = &snap->node;
2126                 }
2127         }
2128         /* we're done going over the old snap list, just add what's left */
2129         for (; i > 0; i--) {
2130                 name = rbd_prev_snap_name(name, first_name);
2131                 if (!name) {
2132                         WARN_ON(1);
2133                         return -EINVAL;
2134                 }
2135                 ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2136                 if (ret < 0)
2137                         return ret;
2138                 list_add(&snap->node, &rbd_dev->snaps);
2139         }
2140
2141         return 0;
2142 }
2143
2144 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2145 {
2146         int ret;
2147         struct device *dev;
2148         struct rbd_snap *snap;
2149
2150         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2151         dev = &rbd_dev->dev;
2152
2153         dev->bus = &rbd_bus_type;
2154         dev->type = &rbd_device_type;
2155         dev->parent = &rbd_root_dev;
2156         dev->release = rbd_dev_release;
2157         dev_set_name(dev, "%d", rbd_dev->id);
2158         ret = device_register(dev);
2159         if (ret < 0)
2160                 goto out;
2161
2162         list_for_each_entry(snap, &rbd_dev->snaps, node) {
2163                 ret = rbd_register_snap_dev(rbd_dev, snap,
2164                                              &rbd_dev->dev);
2165                 if (ret < 0)
2166                         break;
2167         }
2168 out:
2169         mutex_unlock(&ctl_mutex);
2170         return ret;
2171 }
2172
2173 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2174 {
2175         device_unregister(&rbd_dev->dev);
2176 }
2177
2178 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2179 {
2180         int ret, rc;
2181
2182         do {
2183                 ret = rbd_req_sync_watch(rbd_dev, rbd_dev->header_name,
2184                                          rbd_dev->header.obj_version);
2185                 if (ret == -ERANGE) {
2186                         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2187                         rc = __rbd_refresh_header(rbd_dev);
2188                         mutex_unlock(&ctl_mutex);
2189                         if (rc < 0)
2190                                 return rc;
2191                 }
2192         } while (ret == -ERANGE);
2193
2194         return ret;
2195 }
2196
2197 static atomic64_t rbd_id_max = ATOMIC64_INIT(0);
2198
2199 /*
2200  * Get a unique rbd identifier for the given new rbd_dev, and add
2201  * the rbd_dev to the global list.  The minimum rbd id is 1.
2202  */
2203 static void rbd_id_get(struct rbd_device *rbd_dev)
2204 {
2205         rbd_dev->id = atomic64_inc_return(&rbd_id_max);
2206
2207         spin_lock(&rbd_dev_list_lock);
2208         list_add_tail(&rbd_dev->node, &rbd_dev_list);
2209         spin_unlock(&rbd_dev_list_lock);
2210 }
2211
2212 /*
2213  * Remove an rbd_dev from the global list, and record that its
2214  * identifier is no longer in use.
2215  */
2216 static void rbd_id_put(struct rbd_device *rbd_dev)
2217 {
2218         struct list_head *tmp;
2219         int rbd_id = rbd_dev->id;
2220         int max_id;
2221
2222         BUG_ON(rbd_id < 1);
2223
2224         spin_lock(&rbd_dev_list_lock);
2225         list_del_init(&rbd_dev->node);
2226
2227         /*
2228          * If the id being "put" is not the current maximum, there
2229          * is nothing special we need to do.
2230          */
2231         if (rbd_id != atomic64_read(&rbd_id_max)) {
2232                 spin_unlock(&rbd_dev_list_lock);
2233                 return;
2234         }
2235
2236         /*
2237          * We need to update the current maximum id.  Search the
2238          * list to find out what it is.  We're more likely to find
2239          * the maximum at the end, so search the list backward.
2240          */
2241         max_id = 0;
2242         list_for_each_prev(tmp, &rbd_dev_list) {
2243                 struct rbd_device *rbd_dev;
2244
2245                 rbd_dev = list_entry(tmp, struct rbd_device, node);
2246                 if (rbd_id > max_id)
2247                         max_id = rbd_id;
2248         }
2249         spin_unlock(&rbd_dev_list_lock);
2250
2251         /*
2252          * The max id could have been updated by rbd_id_get(), in
2253          * which case it now accurately reflects the new maximum.
2254          * Be careful not to overwrite the maximum value in that
2255          * case.
2256          */
2257         atomic64_cmpxchg(&rbd_id_max, rbd_id, max_id);
2258 }
2259
2260 /*
2261  * Skips over white space at *buf, and updates *buf to point to the
2262  * first found non-space character (if any). Returns the length of
2263  * the token (string of non-white space characters) found.  Note
2264  * that *buf must be terminated with '\0'.
2265  */
2266 static inline size_t next_token(const char **buf)
2267 {
2268         /*
2269         * These are the characters that produce nonzero for
2270         * isspace() in the "C" and "POSIX" locales.
2271         */
2272         const char *spaces = " \f\n\r\t\v";
2273
2274         *buf += strspn(*buf, spaces);   /* Find start of token */
2275
2276         return strcspn(*buf, spaces);   /* Return token length */
2277 }
2278
2279 /*
2280  * Finds the next token in *buf, and if the provided token buffer is
2281  * big enough, copies the found token into it.  The result, if
2282  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
2283  * must be terminated with '\0' on entry.
2284  *
2285  * Returns the length of the token found (not including the '\0').
2286  * Return value will be 0 if no token is found, and it will be >=
2287  * token_size if the token would not fit.
2288  *
2289  * The *buf pointer will be updated to point beyond the end of the
2290  * found token.  Note that this occurs even if the token buffer is
2291  * too small to hold it.
2292  */
2293 static inline size_t copy_token(const char **buf,
2294                                 char *token,
2295                                 size_t token_size)
2296 {
2297         size_t len;
2298
2299         len = next_token(buf);
2300         if (len < token_size) {
2301                 memcpy(token, *buf, len);
2302                 *(token + len) = '\0';
2303         }
2304         *buf += len;
2305
2306         return len;
2307 }
2308
2309 /*
2310  * Finds the next token in *buf, dynamically allocates a buffer big
2311  * enough to hold a copy of it, and copies the token into the new
2312  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
2313  * that a duplicate buffer is created even for a zero-length token.
2314  *
2315  * Returns a pointer to the newly-allocated duplicate, or a null
2316  * pointer if memory for the duplicate was not available.  If
2317  * the lenp argument is a non-null pointer, the length of the token
2318  * (not including the '\0') is returned in *lenp.
2319  *
2320  * If successful, the *buf pointer will be updated to point beyond
2321  * the end of the found token.
2322  *
2323  * Note: uses GFP_KERNEL for allocation.
2324  */
2325 static inline char *dup_token(const char **buf, size_t *lenp)
2326 {
2327         char *dup;
2328         size_t len;
2329
2330         len = next_token(buf);
2331         dup = kmalloc(len + 1, GFP_KERNEL);
2332         if (!dup)
2333                 return NULL;
2334
2335         memcpy(dup, *buf, len);
2336         *(dup + len) = '\0';
2337         *buf += len;
2338
2339         if (lenp)
2340                 *lenp = len;
2341
2342         return dup;
2343 }
2344
2345 /*
2346  * This fills in the pool_name, image_name, image_name_len, snap_name,
2347  * rbd_dev, rbd_md_name, and name fields of the given rbd_dev, based
2348  * on the list of monitor addresses and other options provided via
2349  * /sys/bus/rbd/add.
2350  *
2351  * Note: rbd_dev is assumed to have been initially zero-filled.
2352  */
2353 static int rbd_add_parse_args(struct rbd_device *rbd_dev,
2354                               const char *buf,
2355                               const char **mon_addrs,
2356                               size_t *mon_addrs_size,
2357                               char *options,
2358                              size_t options_size)
2359 {
2360         size_t len;
2361         int ret;
2362
2363         /* The first four tokens are required */
2364
2365         len = next_token(&buf);
2366         if (!len)
2367                 return -EINVAL;
2368         *mon_addrs_size = len + 1;
2369         *mon_addrs = buf;
2370
2371         buf += len;
2372
2373         len = copy_token(&buf, options, options_size);
2374         if (!len || len >= options_size)
2375                 return -EINVAL;
2376
2377         ret = -ENOMEM;
2378         rbd_dev->pool_name = dup_token(&buf, NULL);
2379         if (!rbd_dev->pool_name)
2380                 goto out_err;
2381
2382         rbd_dev->image_name = dup_token(&buf, &rbd_dev->image_name_len);
2383         if (!rbd_dev->image_name)
2384                 goto out_err;
2385
2386         /* Create the name of the header object */
2387
2388         rbd_dev->header_name = kmalloc(rbd_dev->image_name_len
2389                                                 + sizeof (RBD_SUFFIX),
2390                                         GFP_KERNEL);
2391         if (!rbd_dev->header_name)
2392                 goto out_err;
2393         sprintf(rbd_dev->header_name, "%s%s", rbd_dev->image_name, RBD_SUFFIX);
2394
2395         /*
2396          * The snapshot name is optional.  If none is is supplied,
2397          * we use the default value.
2398          */
2399         rbd_dev->snap_name = dup_token(&buf, &len);
2400         if (!rbd_dev->snap_name)
2401                 goto out_err;
2402         if (!len) {
2403                 /* Replace the empty name with the default */
2404                 kfree(rbd_dev->snap_name);
2405                 rbd_dev->snap_name
2406                         = kmalloc(sizeof (RBD_SNAP_HEAD_NAME), GFP_KERNEL);
2407                 if (!rbd_dev->snap_name)
2408                         goto out_err;
2409
2410                 memcpy(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
2411                         sizeof (RBD_SNAP_HEAD_NAME));
2412         }
2413
2414         return 0;
2415
2416 out_err:
2417         kfree(rbd_dev->header_name);
2418         kfree(rbd_dev->image_name);
2419         kfree(rbd_dev->pool_name);
2420         rbd_dev->pool_name = NULL;
2421
2422         return ret;
2423 }
2424
2425 static ssize_t rbd_add(struct bus_type *bus,
2426                        const char *buf,
2427                        size_t count)
2428 {
2429         char *options;
2430         struct rbd_device *rbd_dev = NULL;
2431         const char *mon_addrs = NULL;
2432         size_t mon_addrs_size = 0;
2433         struct ceph_osd_client *osdc;
2434         int rc = -ENOMEM;
2435
2436         if (!try_module_get(THIS_MODULE))
2437                 return -ENODEV;
2438
2439         options = kmalloc(count, GFP_KERNEL);
2440         if (!options)
2441                 goto err_nomem;
2442         rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2443         if (!rbd_dev)
2444                 goto err_nomem;
2445
2446         /* static rbd_device initialization */
2447         spin_lock_init(&rbd_dev->lock);
2448         INIT_LIST_HEAD(&rbd_dev->node);
2449         INIT_LIST_HEAD(&rbd_dev->snaps);
2450         init_rwsem(&rbd_dev->header_rwsem);
2451
2452         init_rwsem(&rbd_dev->header_rwsem);
2453
2454         /* generate unique id: find highest unique id, add one */
2455         rbd_id_get(rbd_dev);
2456
2457         /* Fill in the device name, now that we have its id. */
2458         BUILD_BUG_ON(DEV_NAME_LEN
2459                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
2460         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->id);
2461
2462         /* parse add command */
2463         rc = rbd_add_parse_args(rbd_dev, buf, &mon_addrs, &mon_addrs_size,
2464                                 options, count);
2465         if (rc)
2466                 goto err_put_id;
2467
2468         rbd_dev->rbd_client = rbd_get_client(mon_addrs, mon_addrs_size - 1,
2469                                                 options);
2470         if (IS_ERR(rbd_dev->rbd_client)) {
2471                 rc = PTR_ERR(rbd_dev->rbd_client);
2472                 goto err_put_id;
2473         }
2474
2475         /* pick the pool */
2476         osdc = &rbd_dev->rbd_client->client->osdc;
2477         rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2478         if (rc < 0)
2479                 goto err_out_client;
2480         rbd_dev->pool_id = rc;
2481
2482         /* register our block device */
2483         rc = register_blkdev(0, rbd_dev->name);
2484         if (rc < 0)
2485                 goto err_out_client;
2486         rbd_dev->major = rc;
2487
2488         rc = rbd_bus_add_dev(rbd_dev);
2489         if (rc)
2490                 goto err_out_blkdev;
2491
2492         /*
2493          * At this point cleanup in the event of an error is the job
2494          * of the sysfs code (initiated by rbd_bus_del_dev()).
2495          *
2496          * Set up and announce blkdev mapping.
2497          */
2498         rc = rbd_init_disk(rbd_dev);
2499         if (rc)
2500                 goto err_out_bus;
2501
2502         rc = rbd_init_watch_dev(rbd_dev);
2503         if (rc)
2504                 goto err_out_bus;
2505
2506         return count;
2507
2508 err_out_bus:
2509         /* this will also clean up rest of rbd_dev stuff */
2510
2511         rbd_bus_del_dev(rbd_dev);
2512         kfree(options);
2513         return rc;
2514
2515 err_out_blkdev:
2516         unregister_blkdev(rbd_dev->major, rbd_dev->name);
2517 err_out_client:
2518         rbd_put_client(rbd_dev);
2519 err_put_id:
2520         if (rbd_dev->pool_name) {
2521                 kfree(rbd_dev->snap_name);
2522                 kfree(rbd_dev->header_name);
2523                 kfree(rbd_dev->image_name);
2524                 kfree(rbd_dev->pool_name);
2525         }
2526         rbd_id_put(rbd_dev);
2527 err_nomem:
2528         kfree(rbd_dev);
2529         kfree(options);
2530
2531         dout("Error adding device %s\n", buf);
2532         module_put(THIS_MODULE);
2533
2534         return (ssize_t) rc;
2535 }
2536
2537 static struct rbd_device *__rbd_get_dev(unsigned long id)
2538 {
2539         struct list_head *tmp;
2540         struct rbd_device *rbd_dev;
2541
2542         spin_lock(&rbd_dev_list_lock);
2543         list_for_each(tmp, &rbd_dev_list) {
2544                 rbd_dev = list_entry(tmp, struct rbd_device, node);
2545                 if (rbd_dev->id == id) {
2546                         spin_unlock(&rbd_dev_list_lock);
2547                         return rbd_dev;
2548                 }
2549         }
2550         spin_unlock(&rbd_dev_list_lock);
2551         return NULL;
2552 }
2553
2554 static void rbd_dev_release(struct device *dev)
2555 {
2556         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2557
2558         if (rbd_dev->watch_request) {
2559                 struct ceph_client *client = rbd_dev->rbd_client->client;
2560
2561                 ceph_osdc_unregister_linger_request(&client->osdc,
2562                                                     rbd_dev->watch_request);
2563         }
2564         if (rbd_dev->watch_event)
2565                 rbd_req_sync_unwatch(rbd_dev, rbd_dev->header_name);
2566
2567         rbd_put_client(rbd_dev);
2568
2569         /* clean up and free blkdev */
2570         rbd_free_disk(rbd_dev);
2571         unregister_blkdev(rbd_dev->major, rbd_dev->name);
2572
2573         /* done with the id, and with the rbd_dev */
2574         kfree(rbd_dev->snap_name);
2575         kfree(rbd_dev->header_name);
2576         kfree(rbd_dev->pool_name);
2577         kfree(rbd_dev->image_name);
2578         rbd_id_put(rbd_dev);
2579         kfree(rbd_dev);
2580
2581         /* release module ref */
2582         module_put(THIS_MODULE);
2583 }
2584
2585 static ssize_t rbd_remove(struct bus_type *bus,
2586                           const char *buf,
2587                           size_t count)
2588 {
2589         struct rbd_device *rbd_dev = NULL;
2590         int target_id, rc;
2591         unsigned long ul;
2592         int ret = count;
2593
2594         rc = strict_strtoul(buf, 10, &ul);
2595         if (rc)
2596                 return rc;
2597
2598         /* convert to int; abort if we lost anything in the conversion */
2599         target_id = (int) ul;
2600         if (target_id != ul)
2601                 return -EINVAL;
2602
2603         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2604
2605         rbd_dev = __rbd_get_dev(target_id);
2606         if (!rbd_dev) {
2607                 ret = -ENOENT;
2608                 goto done;
2609         }
2610
2611         __rbd_remove_all_snaps(rbd_dev);
2612         rbd_bus_del_dev(rbd_dev);
2613
2614 done:
2615         mutex_unlock(&ctl_mutex);
2616         return ret;
2617 }
2618
2619 static ssize_t rbd_snap_add(struct device *dev,
2620                             struct device_attribute *attr,
2621                             const char *buf,
2622                             size_t count)
2623 {
2624         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2625         int ret;
2626         char *name = kmalloc(count + 1, GFP_KERNEL);
2627         if (!name)
2628                 return -ENOMEM;
2629
2630         snprintf(name, count, "%s", buf);
2631
2632         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2633
2634         ret = rbd_header_add_snap(rbd_dev,
2635                                   name, GFP_KERNEL);
2636         if (ret < 0)
2637                 goto err_unlock;
2638
2639         ret = __rbd_refresh_header(rbd_dev);
2640         if (ret < 0)
2641                 goto err_unlock;
2642
2643         /* shouldn't hold ctl_mutex when notifying.. notify might
2644            trigger a watch callback that would need to get that mutex */
2645         mutex_unlock(&ctl_mutex);
2646
2647         /* make a best effort, don't error if failed */
2648         rbd_req_sync_notify(rbd_dev, rbd_dev->header_name);
2649
2650         ret = count;
2651         kfree(name);
2652         return ret;
2653
2654 err_unlock:
2655         mutex_unlock(&ctl_mutex);
2656         kfree(name);
2657         return ret;
2658 }
2659
2660 /*
2661  * create control files in sysfs
2662  * /sys/bus/rbd/...
2663  */
2664 static int rbd_sysfs_init(void)
2665 {
2666         int ret;
2667
2668         ret = device_register(&rbd_root_dev);
2669         if (ret < 0)
2670                 return ret;
2671
2672         ret = bus_register(&rbd_bus_type);
2673         if (ret < 0)
2674                 device_unregister(&rbd_root_dev);
2675
2676         return ret;
2677 }
2678
2679 static void rbd_sysfs_cleanup(void)
2680 {
2681         bus_unregister(&rbd_bus_type);
2682         device_unregister(&rbd_root_dev);
2683 }
2684
2685 int __init rbd_init(void)
2686 {
2687         int rc;
2688
2689         rc = rbd_sysfs_init();
2690         if (rc)
2691                 return rc;
2692         pr_info("loaded " RBD_DRV_NAME_LONG "\n");
2693         return 0;
2694 }
2695
2696 void __exit rbd_exit(void)
2697 {
2698         rbd_sysfs_cleanup();
2699 }
2700
2701 module_init(rbd_init);
2702 module_exit(rbd_exit);
2703
2704 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2705 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2706 MODULE_DESCRIPTION("rados block device");
2707
2708 /* following authorship retained from original osdblk.c */
2709 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2710
2711 MODULE_LICENSE("GPL");