rbd: preserve snapc->seq in rbd_header_set_snap()
[cascardo/linux.git] / drivers / block / rbd.c
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3
4
5    based on drivers/block/osdblk.c:
6
7    Copyright 2009 Red Hat, Inc.
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24    For usage instructions, please refer to:
25
26                  Documentation/ABI/testing/sysfs-bus-rbd
27
28  */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 /*
45  * The basic unit of block I/O is a sector.  It is interpreted in a
46  * number of contexts in Linux (blk, bio, genhd), but the default is
47  * universally 512 bytes.  These symbols are just slightly more
48  * meaningful than the bare numbers they represent.
49  */
50 #define SECTOR_SHIFT    9
51 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
52
53 #define RBD_DRV_NAME "rbd"
54 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
55
56 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
57
58 #define RBD_MAX_SNAP_NAME_LEN   32
59 #define RBD_MAX_OPT_LEN         1024
60
61 #define RBD_SNAP_HEAD_NAME      "-"
62
63 /*
64  * An RBD device name will be "rbd#", where the "rbd" comes from
65  * RBD_DRV_NAME above, and # is a unique integer identifier.
66  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
67  * enough to hold all possible device names.
68  */
69 #define DEV_NAME_LEN            32
70 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
71
72 #define RBD_NOTIFY_TIMEOUT_DEFAULT 10
73
74 /*
75  * block device image metadata (in-memory version)
76  */
77 struct rbd_image_header {
78         u64 image_size;
79         char *object_prefix;
80         __u8 obj_order;
81         __u8 crypt_type;
82         __u8 comp_type;
83         struct ceph_snap_context *snapc;
84         size_t snap_names_len;
85         u64 snap_seq;
86         u32 total_snaps;
87
88         char *snap_names;
89         u64 *snap_sizes;
90
91         u64 obj_version;
92 };
93
94 struct rbd_options {
95         int     notify_timeout;
96 };
97
98 /*
99  * an instance of the client.  multiple devices may share an rbd client.
100  */
101 struct rbd_client {
102         struct ceph_client      *client;
103         struct rbd_options      *rbd_opts;
104         struct kref             kref;
105         struct list_head        node;
106 };
107
108 /*
109  * a request completion status
110  */
111 struct rbd_req_status {
112         int done;
113         int rc;
114         u64 bytes;
115 };
116
117 /*
118  * a collection of requests
119  */
120 struct rbd_req_coll {
121         int                     total;
122         int                     num_done;
123         struct kref             kref;
124         struct rbd_req_status   status[0];
125 };
126
127 /*
128  * a single io request
129  */
130 struct rbd_request {
131         struct request          *rq;            /* blk layer request */
132         struct bio              *bio;           /* cloned bio */
133         struct page             **pages;        /* list of used pages */
134         u64                     len;
135         int                     coll_index;
136         struct rbd_req_coll     *coll;
137 };
138
139 struct rbd_snap {
140         struct  device          dev;
141         const char              *name;
142         u64                     size;
143         struct list_head        node;
144         u64                     id;
145 };
146
147 /*
148  * a single device
149  */
150 struct rbd_device {
151         int                     id;             /* blkdev unique id */
152
153         int                     major;          /* blkdev assigned major */
154         struct gendisk          *disk;          /* blkdev's gendisk and rq */
155         struct request_queue    *q;
156
157         struct rbd_client       *rbd_client;
158
159         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
160
161         spinlock_t              lock;           /* queue lock */
162
163         struct rbd_image_header header;
164         char                    *image_name;
165         size_t                  image_name_len;
166         char                    *header_name;
167         char                    *pool_name;
168         int                     pool_id;
169
170         struct ceph_osd_event   *watch_event;
171         struct ceph_osd_request *watch_request;
172
173         /* protects updating the header */
174         struct rw_semaphore     header_rwsem;
175         /* name of the snapshot this device reads from */
176         char                    *snap_name;
177         /* id of the snapshot this device reads from */
178         u64                     snap_id;        /* current snapshot id */
179         /* whether the snap_id this device reads from still exists */
180         bool                    snap_exists;
181         int                     read_only;
182
183         struct list_head        node;
184
185         /* list of snapshots */
186         struct list_head        snaps;
187
188         /* sysfs related */
189         struct device           dev;
190 };
191
192 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
193
194 static LIST_HEAD(rbd_dev_list);    /* devices */
195 static DEFINE_SPINLOCK(rbd_dev_list_lock);
196
197 static LIST_HEAD(rbd_client_list);              /* clients */
198 static DEFINE_SPINLOCK(rbd_client_list_lock);
199
200 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
201 static void rbd_dev_release(struct device *dev);
202 static ssize_t rbd_snap_add(struct device *dev,
203                             struct device_attribute *attr,
204                             const char *buf,
205                             size_t count);
206 static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
207                                   struct rbd_snap *snap);
208
209 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
210                        size_t count);
211 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
212                           size_t count);
213
214 static struct bus_attribute rbd_bus_attrs[] = {
215         __ATTR(add, S_IWUSR, NULL, rbd_add),
216         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
217         __ATTR_NULL
218 };
219
220 static struct bus_type rbd_bus_type = {
221         .name           = "rbd",
222         .bus_attrs      = rbd_bus_attrs,
223 };
224
225 static void rbd_root_dev_release(struct device *dev)
226 {
227 }
228
229 static struct device rbd_root_dev = {
230         .init_name =    "rbd",
231         .release =      rbd_root_dev_release,
232 };
233
234
235 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
236 {
237         return get_device(&rbd_dev->dev);
238 }
239
240 static void rbd_put_dev(struct rbd_device *rbd_dev)
241 {
242         put_device(&rbd_dev->dev);
243 }
244
245 static int __rbd_refresh_header(struct rbd_device *rbd_dev);
246
247 static int rbd_open(struct block_device *bdev, fmode_t mode)
248 {
249         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
250
251         rbd_get_dev(rbd_dev);
252
253         set_device_ro(bdev, rbd_dev->read_only);
254
255         if ((mode & FMODE_WRITE) && rbd_dev->read_only)
256                 return -EROFS;
257
258         return 0;
259 }
260
261 static int rbd_release(struct gendisk *disk, fmode_t mode)
262 {
263         struct rbd_device *rbd_dev = disk->private_data;
264
265         rbd_put_dev(rbd_dev);
266
267         return 0;
268 }
269
270 static const struct block_device_operations rbd_bd_ops = {
271         .owner                  = THIS_MODULE,
272         .open                   = rbd_open,
273         .release                = rbd_release,
274 };
275
276 /*
277  * Initialize an rbd client instance.
278  * We own *ceph_opts.
279  */
280 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts,
281                                             struct rbd_options *rbd_opts)
282 {
283         struct rbd_client *rbdc;
284         int ret = -ENOMEM;
285
286         dout("rbd_client_create\n");
287         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
288         if (!rbdc)
289                 goto out_opt;
290
291         kref_init(&rbdc->kref);
292         INIT_LIST_HEAD(&rbdc->node);
293
294         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
295
296         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
297         if (IS_ERR(rbdc->client))
298                 goto out_mutex;
299         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
300
301         ret = ceph_open_session(rbdc->client);
302         if (ret < 0)
303                 goto out_err;
304
305         rbdc->rbd_opts = rbd_opts;
306
307         spin_lock(&rbd_client_list_lock);
308         list_add_tail(&rbdc->node, &rbd_client_list);
309         spin_unlock(&rbd_client_list_lock);
310
311         mutex_unlock(&ctl_mutex);
312
313         dout("rbd_client_create created %p\n", rbdc);
314         return rbdc;
315
316 out_err:
317         ceph_destroy_client(rbdc->client);
318 out_mutex:
319         mutex_unlock(&ctl_mutex);
320         kfree(rbdc);
321 out_opt:
322         if (ceph_opts)
323                 ceph_destroy_options(ceph_opts);
324         return ERR_PTR(ret);
325 }
326
327 /*
328  * Find a ceph client with specific addr and configuration.
329  */
330 static struct rbd_client *__rbd_client_find(struct ceph_options *ceph_opts)
331 {
332         struct rbd_client *client_node;
333
334         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
335                 return NULL;
336
337         list_for_each_entry(client_node, &rbd_client_list, node)
338                 if (!ceph_compare_options(ceph_opts, client_node->client))
339                         return client_node;
340         return NULL;
341 }
342
343 /*
344  * mount options
345  */
346 enum {
347         Opt_notify_timeout,
348         Opt_last_int,
349         /* int args above */
350         Opt_last_string,
351         /* string args above */
352 };
353
354 static match_table_t rbd_opts_tokens = {
355         {Opt_notify_timeout, "notify_timeout=%d"},
356         /* int args above */
357         /* string args above */
358         {-1, NULL}
359 };
360
361 static int parse_rbd_opts_token(char *c, void *private)
362 {
363         struct rbd_options *rbd_opts = private;
364         substring_t argstr[MAX_OPT_ARGS];
365         int token, intval, ret;
366
367         token = match_token(c, rbd_opts_tokens, argstr);
368         if (token < 0)
369                 return -EINVAL;
370
371         if (token < Opt_last_int) {
372                 ret = match_int(&argstr[0], &intval);
373                 if (ret < 0) {
374                         pr_err("bad mount option arg (not int) "
375                                "at '%s'\n", c);
376                         return ret;
377                 }
378                 dout("got int token %d val %d\n", token, intval);
379         } else if (token > Opt_last_int && token < Opt_last_string) {
380                 dout("got string token %d val %s\n", token,
381                      argstr[0].from);
382         } else {
383                 dout("got token %d\n", token);
384         }
385
386         switch (token) {
387         case Opt_notify_timeout:
388                 rbd_opts->notify_timeout = intval;
389                 break;
390         default:
391                 BUG_ON(token);
392         }
393         return 0;
394 }
395
396 /*
397  * Get a ceph client with specific addr and configuration, if one does
398  * not exist create it.
399  */
400 static struct rbd_client *rbd_get_client(const char *mon_addr,
401                                          size_t mon_addr_len,
402                                          char *options)
403 {
404         struct rbd_client *rbdc;
405         struct ceph_options *ceph_opts;
406         struct rbd_options *rbd_opts;
407
408         rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
409         if (!rbd_opts)
410                 return ERR_PTR(-ENOMEM);
411
412         rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
413
414         ceph_opts = ceph_parse_options(options, mon_addr,
415                                         mon_addr + mon_addr_len,
416                                         parse_rbd_opts_token, rbd_opts);
417         if (IS_ERR(ceph_opts)) {
418                 kfree(rbd_opts);
419                 return ERR_CAST(ceph_opts);
420         }
421
422         spin_lock(&rbd_client_list_lock);
423         rbdc = __rbd_client_find(ceph_opts);
424         if (rbdc) {
425                 /* using an existing client */
426                 kref_get(&rbdc->kref);
427                 spin_unlock(&rbd_client_list_lock);
428
429                 ceph_destroy_options(ceph_opts);
430                 kfree(rbd_opts);
431
432                 return rbdc;
433         }
434         spin_unlock(&rbd_client_list_lock);
435
436         rbdc = rbd_client_create(ceph_opts, rbd_opts);
437
438         if (IS_ERR(rbdc))
439                 kfree(rbd_opts);
440
441         return rbdc;
442 }
443
444 /*
445  * Destroy ceph client
446  *
447  * Caller must hold rbd_client_list_lock.
448  */
449 static void rbd_client_release(struct kref *kref)
450 {
451         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
452
453         dout("rbd_release_client %p\n", rbdc);
454         spin_lock(&rbd_client_list_lock);
455         list_del(&rbdc->node);
456         spin_unlock(&rbd_client_list_lock);
457
458         ceph_destroy_client(rbdc->client);
459         kfree(rbdc->rbd_opts);
460         kfree(rbdc);
461 }
462
463 /*
464  * Drop reference to ceph client node. If it's not referenced anymore, release
465  * it.
466  */
467 static void rbd_put_client(struct rbd_device *rbd_dev)
468 {
469         kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
470         rbd_dev->rbd_client = NULL;
471 }
472
473 /*
474  * Destroy requests collection
475  */
476 static void rbd_coll_release(struct kref *kref)
477 {
478         struct rbd_req_coll *coll =
479                 container_of(kref, struct rbd_req_coll, kref);
480
481         dout("rbd_coll_release %p\n", coll);
482         kfree(coll);
483 }
484
485 /*
486  * Create a new header structure, translate header format from the on-disk
487  * header.
488  */
489 static int rbd_header_from_disk(struct rbd_image_header *header,
490                                  struct rbd_image_header_ondisk *ondisk,
491                                  u32 allocated_snaps,
492                                  gfp_t gfp_flags)
493 {
494         u32 i, snap_count;
495
496         if (memcmp(ondisk, RBD_HEADER_TEXT, sizeof(RBD_HEADER_TEXT)))
497                 return -ENXIO;
498
499         snap_count = le32_to_cpu(ondisk->snap_count);
500         if (snap_count > (UINT_MAX - sizeof(struct ceph_snap_context))
501                          / sizeof (*ondisk))
502                 return -EINVAL;
503         header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
504                                 snap_count * sizeof(u64),
505                                 gfp_flags);
506         if (!header->snapc)
507                 return -ENOMEM;
508
509         header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
510         if (snap_count) {
511                 header->snap_names = kmalloc(header->snap_names_len,
512                                              gfp_flags);
513                 if (!header->snap_names)
514                         goto err_snapc;
515                 header->snap_sizes = kmalloc(snap_count * sizeof(u64),
516                                              gfp_flags);
517                 if (!header->snap_sizes)
518                         goto err_names;
519         } else {
520                 header->snap_names = NULL;
521                 header->snap_sizes = NULL;
522         }
523
524         header->object_prefix = kmalloc(sizeof (ondisk->block_name) + 1,
525                                         gfp_flags);
526         if (!header->object_prefix)
527                 goto err_sizes;
528
529         memcpy(header->object_prefix, ondisk->block_name,
530                sizeof(ondisk->block_name));
531         header->object_prefix[sizeof (ondisk->block_name)] = '\0';
532
533         header->image_size = le64_to_cpu(ondisk->image_size);
534         header->obj_order = ondisk->options.order;
535         header->crypt_type = ondisk->options.crypt_type;
536         header->comp_type = ondisk->options.comp_type;
537
538         atomic_set(&header->snapc->nref, 1);
539         header->snap_seq = le64_to_cpu(ondisk->snap_seq);
540         header->snapc->num_snaps = snap_count;
541         header->total_snaps = snap_count;
542
543         if (snap_count && allocated_snaps == snap_count) {
544                 for (i = 0; i < snap_count; i++) {
545                         header->snapc->snaps[i] =
546                                 le64_to_cpu(ondisk->snaps[i].id);
547                         header->snap_sizes[i] =
548                                 le64_to_cpu(ondisk->snaps[i].image_size);
549                 }
550
551                 /* copy snapshot names */
552                 memcpy(header->snap_names, &ondisk->snaps[i],
553                         header->snap_names_len);
554         }
555
556         return 0;
557
558 err_sizes:
559         kfree(header->snap_sizes);
560 err_names:
561         kfree(header->snap_names);
562 err_snapc:
563         kfree(header->snapc);
564         return -ENOMEM;
565 }
566
567 static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
568                         u64 *seq, u64 *size)
569 {
570         int i;
571         char *p = header->snap_names;
572
573         for (i = 0; i < header->total_snaps; i++) {
574                 if (!strcmp(snap_name, p)) {
575
576                         /* Found it.  Pass back its id and/or size */
577
578                         if (seq)
579                                 *seq = header->snapc->snaps[i];
580                         if (size)
581                                 *size = header->snap_sizes[i];
582                         return i;
583                 }
584                 p += strlen(p) + 1;     /* Skip ahead to the next name */
585         }
586         return -ENOENT;
587 }
588
589 static int rbd_header_set_snap(struct rbd_device *rbd_dev, u64 *size)
590 {
591         int ret;
592
593         down_write(&rbd_dev->header_rwsem);
594
595         if (!memcmp(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
596                     sizeof (RBD_SNAP_HEAD_NAME))) {
597                 rbd_dev->snap_id = CEPH_NOSNAP;
598                 rbd_dev->snap_exists = false;
599                 rbd_dev->read_only = 0;
600                 if (size)
601                         *size = rbd_dev->header.image_size;
602         } else {
603                 u64 snap_id = 0;
604
605                 ret = snap_by_name(&rbd_dev->header, rbd_dev->snap_name,
606                                         &snap_id, size);
607                 if (ret < 0)
608                         goto done;
609                 rbd_dev->snap_id = snap_id;
610                 rbd_dev->snap_exists = true;
611                 rbd_dev->read_only = 1;
612         }
613
614         ret = 0;
615 done:
616         up_write(&rbd_dev->header_rwsem);
617         return ret;
618 }
619
620 static void rbd_header_free(struct rbd_image_header *header)
621 {
622         kfree(header->object_prefix);
623         kfree(header->snap_sizes);
624         kfree(header->snap_names);
625         ceph_put_snap_context(header->snapc);
626 }
627
628 /*
629  * get the actual striped segment name, offset and length
630  */
631 static u64 rbd_get_segment(struct rbd_image_header *header,
632                            const char *object_prefix,
633                            u64 ofs, u64 len,
634                            char *seg_name, u64 *segofs)
635 {
636         u64 seg = ofs >> header->obj_order;
637
638         if (seg_name)
639                 snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
640                          "%s.%012llx", object_prefix, seg);
641
642         ofs = ofs & ((1 << header->obj_order) - 1);
643         len = min_t(u64, len, (1 << header->obj_order) - ofs);
644
645         if (segofs)
646                 *segofs = ofs;
647
648         return len;
649 }
650
651 static int rbd_get_num_segments(struct rbd_image_header *header,
652                                 u64 ofs, u64 len)
653 {
654         u64 start_seg = ofs >> header->obj_order;
655         u64 end_seg = (ofs + len - 1) >> header->obj_order;
656         return end_seg - start_seg + 1;
657 }
658
659 /*
660  * returns the size of an object in the image
661  */
662 static u64 rbd_obj_bytes(struct rbd_image_header *header)
663 {
664         return 1 << header->obj_order;
665 }
666
667 /*
668  * bio helpers
669  */
670
671 static void bio_chain_put(struct bio *chain)
672 {
673         struct bio *tmp;
674
675         while (chain) {
676                 tmp = chain;
677                 chain = chain->bi_next;
678                 bio_put(tmp);
679         }
680 }
681
682 /*
683  * zeros a bio chain, starting at specific offset
684  */
685 static void zero_bio_chain(struct bio *chain, int start_ofs)
686 {
687         struct bio_vec *bv;
688         unsigned long flags;
689         void *buf;
690         int i;
691         int pos = 0;
692
693         while (chain) {
694                 bio_for_each_segment(bv, chain, i) {
695                         if (pos + bv->bv_len > start_ofs) {
696                                 int remainder = max(start_ofs - pos, 0);
697                                 buf = bvec_kmap_irq(bv, &flags);
698                                 memset(buf + remainder, 0,
699                                        bv->bv_len - remainder);
700                                 bvec_kunmap_irq(buf, &flags);
701                         }
702                         pos += bv->bv_len;
703                 }
704
705                 chain = chain->bi_next;
706         }
707 }
708
709 /*
710  * bio_chain_clone - clone a chain of bios up to a certain length.
711  * might return a bio_pair that will need to be released.
712  */
713 static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
714                                    struct bio_pair **bp,
715                                    int len, gfp_t gfpmask)
716 {
717         struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
718         int total = 0;
719
720         if (*bp) {
721                 bio_pair_release(*bp);
722                 *bp = NULL;
723         }
724
725         while (old_chain && (total < len)) {
726                 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
727                 if (!tmp)
728                         goto err_out;
729
730                 if (total + old_chain->bi_size > len) {
731                         struct bio_pair *bp;
732
733                         /*
734                          * this split can only happen with a single paged bio,
735                          * split_bio will BUG_ON if this is not the case
736                          */
737                         dout("bio_chain_clone split! total=%d remaining=%d"
738                              "bi_size=%d\n",
739                              (int)total, (int)len-total,
740                              (int)old_chain->bi_size);
741
742                         /* split the bio. We'll release it either in the next
743                            call, or it will have to be released outside */
744                         bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
745                         if (!bp)
746                                 goto err_out;
747
748                         __bio_clone(tmp, &bp->bio1);
749
750                         *next = &bp->bio2;
751                 } else {
752                         __bio_clone(tmp, old_chain);
753                         *next = old_chain->bi_next;
754                 }
755
756                 tmp->bi_bdev = NULL;
757                 gfpmask &= ~__GFP_WAIT;
758                 tmp->bi_next = NULL;
759
760                 if (!new_chain) {
761                         new_chain = tail = tmp;
762                 } else {
763                         tail->bi_next = tmp;
764                         tail = tmp;
765                 }
766                 old_chain = old_chain->bi_next;
767
768                 total += tmp->bi_size;
769         }
770
771         BUG_ON(total < len);
772
773         if (tail)
774                 tail->bi_next = NULL;
775
776         *old = old_chain;
777
778         return new_chain;
779
780 err_out:
781         dout("bio_chain_clone with err\n");
782         bio_chain_put(new_chain);
783         return NULL;
784 }
785
786 /*
787  * helpers for osd request op vectors.
788  */
789 static int rbd_create_rw_ops(struct ceph_osd_req_op **ops,
790                             int num_ops,
791                             int opcode,
792                             u32 payload_len)
793 {
794         *ops = kzalloc(sizeof(struct ceph_osd_req_op) * (num_ops + 1),
795                        GFP_NOIO);
796         if (!*ops)
797                 return -ENOMEM;
798         (*ops)[0].op = opcode;
799         /*
800          * op extent offset and length will be set later on
801          * in calc_raw_layout()
802          */
803         (*ops)[0].payload_len = payload_len;
804         return 0;
805 }
806
807 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
808 {
809         kfree(ops);
810 }
811
812 static void rbd_coll_end_req_index(struct request *rq,
813                                    struct rbd_req_coll *coll,
814                                    int index,
815                                    int ret, u64 len)
816 {
817         struct request_queue *q;
818         int min, max, i;
819
820         dout("rbd_coll_end_req_index %p index %d ret %d len %lld\n",
821              coll, index, ret, len);
822
823         if (!rq)
824                 return;
825
826         if (!coll) {
827                 blk_end_request(rq, ret, len);
828                 return;
829         }
830
831         q = rq->q;
832
833         spin_lock_irq(q->queue_lock);
834         coll->status[index].done = 1;
835         coll->status[index].rc = ret;
836         coll->status[index].bytes = len;
837         max = min = coll->num_done;
838         while (max < coll->total && coll->status[max].done)
839                 max++;
840
841         for (i = min; i<max; i++) {
842                 __blk_end_request(rq, coll->status[i].rc,
843                                   coll->status[i].bytes);
844                 coll->num_done++;
845                 kref_put(&coll->kref, rbd_coll_release);
846         }
847         spin_unlock_irq(q->queue_lock);
848 }
849
850 static void rbd_coll_end_req(struct rbd_request *req,
851                              int ret, u64 len)
852 {
853         rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
854 }
855
856 /*
857  * Send ceph osd request
858  */
859 static int rbd_do_request(struct request *rq,
860                           struct rbd_device *rbd_dev,
861                           struct ceph_snap_context *snapc,
862                           u64 snapid,
863                           const char *object_name, u64 ofs, u64 len,
864                           struct bio *bio,
865                           struct page **pages,
866                           int num_pages,
867                           int flags,
868                           struct ceph_osd_req_op *ops,
869                           struct rbd_req_coll *coll,
870                           int coll_index,
871                           void (*rbd_cb)(struct ceph_osd_request *req,
872                                          struct ceph_msg *msg),
873                           struct ceph_osd_request **linger_req,
874                           u64 *ver)
875 {
876         struct ceph_osd_request *req;
877         struct ceph_file_layout *layout;
878         int ret;
879         u64 bno;
880         struct timespec mtime = CURRENT_TIME;
881         struct rbd_request *req_data;
882         struct ceph_osd_request_head *reqhead;
883         struct ceph_osd_client *osdc;
884
885         req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
886         if (!req_data) {
887                 if (coll)
888                         rbd_coll_end_req_index(rq, coll, coll_index,
889                                                -ENOMEM, len);
890                 return -ENOMEM;
891         }
892
893         if (coll) {
894                 req_data->coll = coll;
895                 req_data->coll_index = coll_index;
896         }
897
898         dout("rbd_do_request object_name=%s ofs=%lld len=%lld\n",
899                 object_name, len, ofs);
900
901         osdc = &rbd_dev->rbd_client->client->osdc;
902         req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
903                                         false, GFP_NOIO, pages, bio);
904         if (!req) {
905                 ret = -ENOMEM;
906                 goto done_pages;
907         }
908
909         req->r_callback = rbd_cb;
910
911         req_data->rq = rq;
912         req_data->bio = bio;
913         req_data->pages = pages;
914         req_data->len = len;
915
916         req->r_priv = req_data;
917
918         reqhead = req->r_request->front.iov_base;
919         reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
920
921         strncpy(req->r_oid, object_name, sizeof(req->r_oid));
922         req->r_oid_len = strlen(req->r_oid);
923
924         layout = &req->r_file_layout;
925         memset(layout, 0, sizeof(*layout));
926         layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
927         layout->fl_stripe_count = cpu_to_le32(1);
928         layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
929         layout->fl_pg_pool = cpu_to_le32(rbd_dev->pool_id);
930         ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
931                                 req, ops);
932
933         ceph_osdc_build_request(req, ofs, &len,
934                                 ops,
935                                 snapc,
936                                 &mtime,
937                                 req->r_oid, req->r_oid_len);
938
939         if (linger_req) {
940                 ceph_osdc_set_request_linger(osdc, req);
941                 *linger_req = req;
942         }
943
944         ret = ceph_osdc_start_request(osdc, req, false);
945         if (ret < 0)
946                 goto done_err;
947
948         if (!rbd_cb) {
949                 ret = ceph_osdc_wait_request(osdc, req);
950                 if (ver)
951                         *ver = le64_to_cpu(req->r_reassert_version.version);
952                 dout("reassert_ver=%lld\n",
953                      le64_to_cpu(req->r_reassert_version.version));
954                 ceph_osdc_put_request(req);
955         }
956         return ret;
957
958 done_err:
959         bio_chain_put(req_data->bio);
960         ceph_osdc_put_request(req);
961 done_pages:
962         rbd_coll_end_req(req_data, ret, len);
963         kfree(req_data);
964         return ret;
965 }
966
967 /*
968  * Ceph osd op callback
969  */
970 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
971 {
972         struct rbd_request *req_data = req->r_priv;
973         struct ceph_osd_reply_head *replyhead;
974         struct ceph_osd_op *op;
975         __s32 rc;
976         u64 bytes;
977         int read_op;
978
979         /* parse reply */
980         replyhead = msg->front.iov_base;
981         WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
982         op = (void *)(replyhead + 1);
983         rc = le32_to_cpu(replyhead->result);
984         bytes = le64_to_cpu(op->extent.length);
985         read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
986
987         dout("rbd_req_cb bytes=%lld readop=%d rc=%d\n", bytes, read_op, rc);
988
989         if (rc == -ENOENT && read_op) {
990                 zero_bio_chain(req_data->bio, 0);
991                 rc = 0;
992         } else if (rc == 0 && read_op && bytes < req_data->len) {
993                 zero_bio_chain(req_data->bio, bytes);
994                 bytes = req_data->len;
995         }
996
997         rbd_coll_end_req(req_data, rc, bytes);
998
999         if (req_data->bio)
1000                 bio_chain_put(req_data->bio);
1001
1002         ceph_osdc_put_request(req);
1003         kfree(req_data);
1004 }
1005
1006 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1007 {
1008         ceph_osdc_put_request(req);
1009 }
1010
1011 /*
1012  * Do a synchronous ceph osd operation
1013  */
1014 static int rbd_req_sync_op(struct rbd_device *rbd_dev,
1015                            struct ceph_snap_context *snapc,
1016                            u64 snapid,
1017                            int opcode,
1018                            int flags,
1019                            struct ceph_osd_req_op *orig_ops,
1020                            const char *object_name,
1021                            u64 ofs, u64 len,
1022                            char *buf,
1023                            struct ceph_osd_request **linger_req,
1024                            u64 *ver)
1025 {
1026         int ret;
1027         struct page **pages;
1028         int num_pages;
1029         struct ceph_osd_req_op *ops = orig_ops;
1030         u32 payload_len;
1031
1032         num_pages = calc_pages_for(ofs , len);
1033         pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1034         if (IS_ERR(pages))
1035                 return PTR_ERR(pages);
1036
1037         if (!orig_ops) {
1038                 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? len : 0);
1039                 ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1040                 if (ret < 0)
1041                         goto done;
1042
1043                 if ((flags & CEPH_OSD_FLAG_WRITE) && buf) {
1044                         ret = ceph_copy_to_page_vector(pages, buf, ofs, len);
1045                         if (ret < 0)
1046                                 goto done_ops;
1047                 }
1048         }
1049
1050         ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
1051                           object_name, ofs, len, NULL,
1052                           pages, num_pages,
1053                           flags,
1054                           ops,
1055                           NULL, 0,
1056                           NULL,
1057                           linger_req, ver);
1058         if (ret < 0)
1059                 goto done_ops;
1060
1061         if ((flags & CEPH_OSD_FLAG_READ) && buf)
1062                 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1063
1064 done_ops:
1065         if (!orig_ops)
1066                 rbd_destroy_ops(ops);
1067 done:
1068         ceph_release_page_vector(pages, num_pages);
1069         return ret;
1070 }
1071
1072 /*
1073  * Do an asynchronous ceph osd operation
1074  */
1075 static int rbd_do_op(struct request *rq,
1076                      struct rbd_device *rbd_dev,
1077                      struct ceph_snap_context *snapc,
1078                      u64 snapid,
1079                      int opcode, int flags,
1080                      u64 ofs, u64 len,
1081                      struct bio *bio,
1082                      struct rbd_req_coll *coll,
1083                      int coll_index)
1084 {
1085         char *seg_name;
1086         u64 seg_ofs;
1087         u64 seg_len;
1088         int ret;
1089         struct ceph_osd_req_op *ops;
1090         u32 payload_len;
1091
1092         seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1093         if (!seg_name)
1094                 return -ENOMEM;
1095
1096         seg_len = rbd_get_segment(&rbd_dev->header,
1097                                   rbd_dev->header.object_prefix,
1098                                   ofs, len,
1099                                   seg_name, &seg_ofs);
1100
1101         payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1102
1103         ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1104         if (ret < 0)
1105                 goto done;
1106
1107         /* we've taken care of segment sizes earlier when we
1108            cloned the bios. We should never have a segment
1109            truncated at this point */
1110         BUG_ON(seg_len < len);
1111
1112         ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1113                              seg_name, seg_ofs, seg_len,
1114                              bio,
1115                              NULL, 0,
1116                              flags,
1117                              ops,
1118                              coll, coll_index,
1119                              rbd_req_cb, 0, NULL);
1120
1121         rbd_destroy_ops(ops);
1122 done:
1123         kfree(seg_name);
1124         return ret;
1125 }
1126
1127 /*
1128  * Request async osd write
1129  */
1130 static int rbd_req_write(struct request *rq,
1131                          struct rbd_device *rbd_dev,
1132                          struct ceph_snap_context *snapc,
1133                          u64 ofs, u64 len,
1134                          struct bio *bio,
1135                          struct rbd_req_coll *coll,
1136                          int coll_index)
1137 {
1138         return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1139                          CEPH_OSD_OP_WRITE,
1140                          CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1141                          ofs, len, bio, coll, coll_index);
1142 }
1143
1144 /*
1145  * Request async osd read
1146  */
1147 static int rbd_req_read(struct request *rq,
1148                          struct rbd_device *rbd_dev,
1149                          u64 snapid,
1150                          u64 ofs, u64 len,
1151                          struct bio *bio,
1152                          struct rbd_req_coll *coll,
1153                          int coll_index)
1154 {
1155         return rbd_do_op(rq, rbd_dev, NULL,
1156                          snapid,
1157                          CEPH_OSD_OP_READ,
1158                          CEPH_OSD_FLAG_READ,
1159                          ofs, len, bio, coll, coll_index);
1160 }
1161
1162 /*
1163  * Request sync osd read
1164  */
1165 static int rbd_req_sync_read(struct rbd_device *rbd_dev,
1166                           struct ceph_snap_context *snapc,
1167                           u64 snapid,
1168                           const char *object_name,
1169                           u64 ofs, u64 len,
1170                           char *buf,
1171                           u64 *ver)
1172 {
1173         return rbd_req_sync_op(rbd_dev, NULL,
1174                                snapid,
1175                                CEPH_OSD_OP_READ,
1176                                CEPH_OSD_FLAG_READ,
1177                                NULL,
1178                                object_name, ofs, len, buf, NULL, ver);
1179 }
1180
1181 /*
1182  * Request sync osd watch
1183  */
1184 static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
1185                                    u64 ver,
1186                                    u64 notify_id,
1187                                    const char *object_name)
1188 {
1189         struct ceph_osd_req_op *ops;
1190         int ret;
1191
1192         ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1193         if (ret < 0)
1194                 return ret;
1195
1196         ops[0].watch.ver = cpu_to_le64(ver);
1197         ops[0].watch.cookie = notify_id;
1198         ops[0].watch.flag = 0;
1199
1200         ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
1201                           object_name, 0, 0, NULL,
1202                           NULL, 0,
1203                           CEPH_OSD_FLAG_READ,
1204                           ops,
1205                           NULL, 0,
1206                           rbd_simple_req_cb, 0, NULL);
1207
1208         rbd_destroy_ops(ops);
1209         return ret;
1210 }
1211
1212 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1213 {
1214         struct rbd_device *rbd_dev = (struct rbd_device *)data;
1215         u64 hver;
1216         int rc;
1217
1218         if (!rbd_dev)
1219                 return;
1220
1221         dout("rbd_watch_cb %s notify_id=%lld opcode=%d\n",
1222                 rbd_dev->header_name, notify_id, (int) opcode);
1223         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1224         rc = __rbd_refresh_header(rbd_dev);
1225         hver = rbd_dev->header.obj_version;
1226         mutex_unlock(&ctl_mutex);
1227         if (rc)
1228                 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1229                            " update snaps: %d\n", rbd_dev->major, rc);
1230
1231         rbd_req_sync_notify_ack(rbd_dev, hver, notify_id, rbd_dev->header_name);
1232 }
1233
1234 /*
1235  * Request sync osd watch
1236  */
1237 static int rbd_req_sync_watch(struct rbd_device *rbd_dev,
1238                               const char *object_name,
1239                               u64 ver)
1240 {
1241         struct ceph_osd_req_op *ops;
1242         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1243
1244         int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1245         if (ret < 0)
1246                 return ret;
1247
1248         ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1249                                      (void *)rbd_dev, &rbd_dev->watch_event);
1250         if (ret < 0)
1251                 goto fail;
1252
1253         ops[0].watch.ver = cpu_to_le64(ver);
1254         ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1255         ops[0].watch.flag = 1;
1256
1257         ret = rbd_req_sync_op(rbd_dev, NULL,
1258                               CEPH_NOSNAP,
1259                               0,
1260                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1261                               ops,
1262                               object_name, 0, 0, NULL,
1263                               &rbd_dev->watch_request, NULL);
1264
1265         if (ret < 0)
1266                 goto fail_event;
1267
1268         rbd_destroy_ops(ops);
1269         return 0;
1270
1271 fail_event:
1272         ceph_osdc_cancel_event(rbd_dev->watch_event);
1273         rbd_dev->watch_event = NULL;
1274 fail:
1275         rbd_destroy_ops(ops);
1276         return ret;
1277 }
1278
1279 /*
1280  * Request sync osd unwatch
1281  */
1282 static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev,
1283                                 const char *object_name)
1284 {
1285         struct ceph_osd_req_op *ops;
1286
1287         int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1288         if (ret < 0)
1289                 return ret;
1290
1291         ops[0].watch.ver = 0;
1292         ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1293         ops[0].watch.flag = 0;
1294
1295         ret = rbd_req_sync_op(rbd_dev, NULL,
1296                               CEPH_NOSNAP,
1297                               0,
1298                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1299                               ops,
1300                               object_name, 0, 0, NULL, NULL, NULL);
1301
1302         rbd_destroy_ops(ops);
1303         ceph_osdc_cancel_event(rbd_dev->watch_event);
1304         rbd_dev->watch_event = NULL;
1305         return ret;
1306 }
1307
1308 struct rbd_notify_info {
1309         struct rbd_device *rbd_dev;
1310 };
1311
1312 static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1313 {
1314         struct rbd_device *rbd_dev = (struct rbd_device *)data;
1315         if (!rbd_dev)
1316                 return;
1317
1318         dout("rbd_notify_cb %s notify_id=%lld opcode=%d\n",
1319                                 rbd_dev->header_name,
1320                 notify_id, (int)opcode);
1321 }
1322
1323 /*
1324  * Request sync osd notify
1325  */
1326 static int rbd_req_sync_notify(struct rbd_device *rbd_dev,
1327                           const char *object_name)
1328 {
1329         struct ceph_osd_req_op *ops;
1330         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1331         struct ceph_osd_event *event;
1332         struct rbd_notify_info info;
1333         int payload_len = sizeof(u32) + sizeof(u32);
1334         int ret;
1335
1336         ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY, payload_len);
1337         if (ret < 0)
1338                 return ret;
1339
1340         info.rbd_dev = rbd_dev;
1341
1342         ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1343                                      (void *)&info, &event);
1344         if (ret < 0)
1345                 goto fail;
1346
1347         ops[0].watch.ver = 1;
1348         ops[0].watch.flag = 1;
1349         ops[0].watch.cookie = event->cookie;
1350         ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1351         ops[0].watch.timeout = 12;
1352
1353         ret = rbd_req_sync_op(rbd_dev, NULL,
1354                                CEPH_NOSNAP,
1355                                0,
1356                                CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1357                                ops,
1358                                object_name, 0, 0, NULL, NULL, NULL);
1359         if (ret < 0)
1360                 goto fail_event;
1361
1362         ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1363         dout("ceph_osdc_wait_event returned %d\n", ret);
1364         rbd_destroy_ops(ops);
1365         return 0;
1366
1367 fail_event:
1368         ceph_osdc_cancel_event(event);
1369 fail:
1370         rbd_destroy_ops(ops);
1371         return ret;
1372 }
1373
1374 /*
1375  * Request sync osd read
1376  */
1377 static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
1378                              const char *object_name,
1379                              const char *class_name,
1380                              const char *method_name,
1381                              const char *data,
1382                              int len,
1383                              u64 *ver)
1384 {
1385         struct ceph_osd_req_op *ops;
1386         int class_name_len = strlen(class_name);
1387         int method_name_len = strlen(method_name);
1388         int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_CALL,
1389                                     class_name_len + method_name_len + len);
1390         if (ret < 0)
1391                 return ret;
1392
1393         ops[0].cls.class_name = class_name;
1394         ops[0].cls.class_len = (__u8) class_name_len;
1395         ops[0].cls.method_name = method_name;
1396         ops[0].cls.method_len = (__u8) method_name_len;
1397         ops[0].cls.argc = 0;
1398         ops[0].cls.indata = data;
1399         ops[0].cls.indata_len = len;
1400
1401         ret = rbd_req_sync_op(rbd_dev, NULL,
1402                                CEPH_NOSNAP,
1403                                0,
1404                                CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1405                                ops,
1406                                object_name, 0, 0, NULL, NULL, ver);
1407
1408         rbd_destroy_ops(ops);
1409
1410         dout("cls_exec returned %d\n", ret);
1411         return ret;
1412 }
1413
1414 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1415 {
1416         struct rbd_req_coll *coll =
1417                         kzalloc(sizeof(struct rbd_req_coll) +
1418                                 sizeof(struct rbd_req_status) * num_reqs,
1419                                 GFP_ATOMIC);
1420
1421         if (!coll)
1422                 return NULL;
1423         coll->total = num_reqs;
1424         kref_init(&coll->kref);
1425         return coll;
1426 }
1427
1428 /*
1429  * block device queue callback
1430  */
1431 static void rbd_rq_fn(struct request_queue *q)
1432 {
1433         struct rbd_device *rbd_dev = q->queuedata;
1434         struct request *rq;
1435         struct bio_pair *bp = NULL;
1436
1437         while ((rq = blk_fetch_request(q))) {
1438                 struct bio *bio;
1439                 struct bio *rq_bio, *next_bio = NULL;
1440                 bool do_write;
1441                 int size, op_size = 0;
1442                 u64 ofs;
1443                 int num_segs, cur_seg = 0;
1444                 struct rbd_req_coll *coll;
1445                 struct ceph_snap_context *snapc;
1446
1447                 /* peek at request from block layer */
1448                 if (!rq)
1449                         break;
1450
1451                 dout("fetched request\n");
1452
1453                 /* filter out block requests we don't understand */
1454                 if ((rq->cmd_type != REQ_TYPE_FS)) {
1455                         __blk_end_request_all(rq, 0);
1456                         continue;
1457                 }
1458
1459                 /* deduce our operation (read, write) */
1460                 do_write = (rq_data_dir(rq) == WRITE);
1461
1462                 size = blk_rq_bytes(rq);
1463                 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1464                 rq_bio = rq->bio;
1465                 if (do_write && rbd_dev->read_only) {
1466                         __blk_end_request_all(rq, -EROFS);
1467                         continue;
1468                 }
1469
1470                 spin_unlock_irq(q->queue_lock);
1471
1472                 down_read(&rbd_dev->header_rwsem);
1473
1474                 if (rbd_dev->snap_id != CEPH_NOSNAP && !rbd_dev->snap_exists) {
1475                         up_read(&rbd_dev->header_rwsem);
1476                         dout("request for non-existent snapshot");
1477                         spin_lock_irq(q->queue_lock);
1478                         __blk_end_request_all(rq, -ENXIO);
1479                         continue;
1480                 }
1481
1482                 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1483
1484                 up_read(&rbd_dev->header_rwsem);
1485
1486                 dout("%s 0x%x bytes at 0x%llx\n",
1487                      do_write ? "write" : "read",
1488                      size, blk_rq_pos(rq) * SECTOR_SIZE);
1489
1490                 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1491                 coll = rbd_alloc_coll(num_segs);
1492                 if (!coll) {
1493                         spin_lock_irq(q->queue_lock);
1494                         __blk_end_request_all(rq, -ENOMEM);
1495                         ceph_put_snap_context(snapc);
1496                         continue;
1497                 }
1498
1499                 do {
1500                         /* a bio clone to be passed down to OSD req */
1501                         dout("rq->bio->bi_vcnt=%d\n", rq->bio->bi_vcnt);
1502                         op_size = rbd_get_segment(&rbd_dev->header,
1503                                                   rbd_dev->header.object_prefix,
1504                                                   ofs, size,
1505                                                   NULL, NULL);
1506                         kref_get(&coll->kref);
1507                         bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1508                                               op_size, GFP_ATOMIC);
1509                         if (!bio) {
1510                                 rbd_coll_end_req_index(rq, coll, cur_seg,
1511                                                        -ENOMEM, op_size);
1512                                 goto next_seg;
1513                         }
1514
1515
1516                         /* init OSD command: write or read */
1517                         if (do_write)
1518                                 rbd_req_write(rq, rbd_dev,
1519                                               snapc,
1520                                               ofs,
1521                                               op_size, bio,
1522                                               coll, cur_seg);
1523                         else
1524                                 rbd_req_read(rq, rbd_dev,
1525                                              rbd_dev->snap_id,
1526                                              ofs,
1527                                              op_size, bio,
1528                                              coll, cur_seg);
1529
1530 next_seg:
1531                         size -= op_size;
1532                         ofs += op_size;
1533
1534                         cur_seg++;
1535                         rq_bio = next_bio;
1536                 } while (size > 0);
1537                 kref_put(&coll->kref, rbd_coll_release);
1538
1539                 if (bp)
1540                         bio_pair_release(bp);
1541                 spin_lock_irq(q->queue_lock);
1542
1543                 ceph_put_snap_context(snapc);
1544         }
1545 }
1546
1547 /*
1548  * a queue callback. Makes sure that we don't create a bio that spans across
1549  * multiple osd objects. One exception would be with a single page bios,
1550  * which we handle later at bio_chain_clone
1551  */
1552 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1553                           struct bio_vec *bvec)
1554 {
1555         struct rbd_device *rbd_dev = q->queuedata;
1556         unsigned int chunk_sectors;
1557         sector_t sector;
1558         unsigned int bio_sectors;
1559         int max;
1560
1561         chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1562         sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1563         bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
1564
1565         max =  (chunk_sectors - ((sector & (chunk_sectors - 1))
1566                                  + bio_sectors)) << SECTOR_SHIFT;
1567         if (max < 0)
1568                 max = 0; /* bio_add cannot handle a negative return */
1569         if (max <= bvec->bv_len && bio_sectors == 0)
1570                 return bvec->bv_len;
1571         return max;
1572 }
1573
1574 static void rbd_free_disk(struct rbd_device *rbd_dev)
1575 {
1576         struct gendisk *disk = rbd_dev->disk;
1577
1578         if (!disk)
1579                 return;
1580
1581         rbd_header_free(&rbd_dev->header);
1582
1583         if (disk->flags & GENHD_FL_UP)
1584                 del_gendisk(disk);
1585         if (disk->queue)
1586                 blk_cleanup_queue(disk->queue);
1587         put_disk(disk);
1588 }
1589
1590 /*
1591  * reload the ondisk the header 
1592  */
1593 static int rbd_read_header(struct rbd_device *rbd_dev,
1594                            struct rbd_image_header *header)
1595 {
1596         ssize_t rc;
1597         struct rbd_image_header_ondisk *dh;
1598         u32 snap_count = 0;
1599         u64 ver;
1600         size_t len;
1601
1602         /*
1603          * First reads the fixed-size header to determine the number
1604          * of snapshots, then re-reads it, along with all snapshot
1605          * records as well as their stored names.
1606          */
1607         len = sizeof (*dh);
1608         while (1) {
1609                 dh = kmalloc(len, GFP_KERNEL);
1610                 if (!dh)
1611                         return -ENOMEM;
1612
1613                 rc = rbd_req_sync_read(rbd_dev,
1614                                        NULL, CEPH_NOSNAP,
1615                                        rbd_dev->header_name,
1616                                        0, len,
1617                                        (char *)dh, &ver);
1618                 if (rc < 0)
1619                         goto out_dh;
1620
1621                 rc = rbd_header_from_disk(header, dh, snap_count, GFP_KERNEL);
1622                 if (rc < 0) {
1623                         if (rc == -ENXIO)
1624                                 pr_warning("unrecognized header format"
1625                                            " for image %s\n",
1626                                            rbd_dev->image_name);
1627                         goto out_dh;
1628                 }
1629
1630                 if (snap_count == header->total_snaps)
1631                         break;
1632
1633                 snap_count = header->total_snaps;
1634                 len = sizeof (*dh) +
1635                         snap_count * sizeof(struct rbd_image_snap_ondisk) +
1636                         header->snap_names_len;
1637
1638                 rbd_header_free(header);
1639                 kfree(dh);
1640         }
1641         header->obj_version = ver;
1642
1643 out_dh:
1644         kfree(dh);
1645         return rc;
1646 }
1647
1648 /*
1649  * create a snapshot
1650  */
1651 static int rbd_header_add_snap(struct rbd_device *rbd_dev,
1652                                const char *snap_name,
1653                                gfp_t gfp_flags)
1654 {
1655         int name_len = strlen(snap_name);
1656         u64 new_snapid;
1657         int ret;
1658         void *data, *p, *e;
1659         u64 ver;
1660         struct ceph_mon_client *monc;
1661
1662         /* we should create a snapshot only if we're pointing at the head */
1663         if (rbd_dev->snap_id != CEPH_NOSNAP)
1664                 return -EINVAL;
1665
1666         monc = &rbd_dev->rbd_client->client->monc;
1667         ret = ceph_monc_create_snapid(monc, rbd_dev->pool_id, &new_snapid);
1668         dout("created snapid=%lld\n", new_snapid);
1669         if (ret < 0)
1670                 return ret;
1671
1672         data = kmalloc(name_len + 16, gfp_flags);
1673         if (!data)
1674                 return -ENOMEM;
1675
1676         p = data;
1677         e = data + name_len + 16;
1678
1679         ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
1680         ceph_encode_64_safe(&p, e, new_snapid, bad);
1681
1682         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
1683                                 "rbd", "snap_add",
1684                                 data, p - data, &ver);
1685
1686         kfree(data);
1687
1688         if (ret < 0)
1689                 return ret;
1690
1691         down_write(&rbd_dev->header_rwsem);
1692         rbd_dev->header.snapc->seq = new_snapid;
1693         up_write(&rbd_dev->header_rwsem);
1694
1695         return 0;
1696 bad:
1697         return -ERANGE;
1698 }
1699
1700 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1701 {
1702         struct rbd_snap *snap;
1703
1704         while (!list_empty(&rbd_dev->snaps)) {
1705                 snap = list_first_entry(&rbd_dev->snaps, struct rbd_snap, node);
1706                 __rbd_remove_snap_dev(rbd_dev, snap);
1707         }
1708 }
1709
1710 /*
1711  * only read the first part of the ondisk header, without the snaps info
1712  */
1713 static int __rbd_refresh_header(struct rbd_device *rbd_dev)
1714 {
1715         int ret;
1716         struct rbd_image_header h;
1717
1718         ret = rbd_read_header(rbd_dev, &h);
1719         if (ret < 0)
1720                 return ret;
1721
1722         down_write(&rbd_dev->header_rwsem);
1723
1724         /* resized? */
1725         if (rbd_dev->snap_id == CEPH_NOSNAP) {
1726                 sector_t size = (sector_t) h.image_size / SECTOR_SIZE;
1727
1728                 dout("setting size to %llu sectors", (unsigned long long) size);
1729                 set_capacity(rbd_dev->disk, size);
1730         }
1731
1732         /* rbd_dev->header.object_prefix shouldn't change */
1733         kfree(rbd_dev->header.snap_sizes);
1734         kfree(rbd_dev->header.snap_names);
1735         /* osd requests may still refer to snapc */
1736         ceph_put_snap_context(rbd_dev->header.snapc);
1737
1738         rbd_dev->header.obj_version = h.obj_version;
1739         rbd_dev->header.image_size = h.image_size;
1740         rbd_dev->header.total_snaps = h.total_snaps;
1741         rbd_dev->header.snapc = h.snapc;
1742         rbd_dev->header.snap_names = h.snap_names;
1743         rbd_dev->header.snap_names_len = h.snap_names_len;
1744         rbd_dev->header.snap_sizes = h.snap_sizes;
1745         /* Free the extra copy of the object prefix */
1746         WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1747         kfree(h.object_prefix);
1748
1749         ret = __rbd_init_snaps_header(rbd_dev);
1750
1751         up_write(&rbd_dev->header_rwsem);
1752
1753         return ret;
1754 }
1755
1756 static int rbd_init_disk(struct rbd_device *rbd_dev)
1757 {
1758         struct gendisk *disk;
1759         struct request_queue *q;
1760         int rc;
1761         u64 segment_size;
1762         u64 total_size = 0;
1763
1764         /* contact OSD, request size info about the object being mapped */
1765         rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1766         if (rc)
1767                 return rc;
1768
1769         /* no need to lock here, as rbd_dev is not registered yet */
1770         rc = __rbd_init_snaps_header(rbd_dev);
1771         if (rc)
1772                 return rc;
1773
1774         rc = rbd_header_set_snap(rbd_dev, &total_size);
1775         if (rc)
1776                 return rc;
1777
1778         /* create gendisk info */
1779         rc = -ENOMEM;
1780         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1781         if (!disk)
1782                 goto out;
1783
1784         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1785                  rbd_dev->id);
1786         disk->major = rbd_dev->major;
1787         disk->first_minor = 0;
1788         disk->fops = &rbd_bd_ops;
1789         disk->private_data = rbd_dev;
1790
1791         /* init rq */
1792         rc = -ENOMEM;
1793         q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1794         if (!q)
1795                 goto out_disk;
1796
1797         /* We use the default size, but let's be explicit about it. */
1798         blk_queue_physical_block_size(q, SECTOR_SIZE);
1799
1800         /* set io sizes to object size */
1801         segment_size = rbd_obj_bytes(&rbd_dev->header);
1802         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1803         blk_queue_max_segment_size(q, segment_size);
1804         blk_queue_io_min(q, segment_size);
1805         blk_queue_io_opt(q, segment_size);
1806
1807         blk_queue_merge_bvec(q, rbd_merge_bvec);
1808         disk->queue = q;
1809
1810         q->queuedata = rbd_dev;
1811
1812         rbd_dev->disk = disk;
1813         rbd_dev->q = q;
1814
1815         /* finally, announce the disk to the world */
1816         set_capacity(disk, total_size / SECTOR_SIZE);
1817         add_disk(disk);
1818
1819         pr_info("%s: added with size 0x%llx\n",
1820                 disk->disk_name, (unsigned long long)total_size);
1821         return 0;
1822
1823 out_disk:
1824         put_disk(disk);
1825 out:
1826         return rc;
1827 }
1828
1829 /*
1830   sysfs
1831 */
1832
1833 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1834 {
1835         return container_of(dev, struct rbd_device, dev);
1836 }
1837
1838 static ssize_t rbd_size_show(struct device *dev,
1839                              struct device_attribute *attr, char *buf)
1840 {
1841         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1842         sector_t size;
1843
1844         down_read(&rbd_dev->header_rwsem);
1845         size = get_capacity(rbd_dev->disk);
1846         up_read(&rbd_dev->header_rwsem);
1847
1848         return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
1849 }
1850
1851 static ssize_t rbd_major_show(struct device *dev,
1852                               struct device_attribute *attr, char *buf)
1853 {
1854         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1855
1856         return sprintf(buf, "%d\n", rbd_dev->major);
1857 }
1858
1859 static ssize_t rbd_client_id_show(struct device *dev,
1860                                   struct device_attribute *attr, char *buf)
1861 {
1862         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1863
1864         return sprintf(buf, "client%lld\n",
1865                         ceph_client_id(rbd_dev->rbd_client->client));
1866 }
1867
1868 static ssize_t rbd_pool_show(struct device *dev,
1869                              struct device_attribute *attr, char *buf)
1870 {
1871         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1872
1873         return sprintf(buf, "%s\n", rbd_dev->pool_name);
1874 }
1875
1876 static ssize_t rbd_pool_id_show(struct device *dev,
1877                              struct device_attribute *attr, char *buf)
1878 {
1879         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1880
1881         return sprintf(buf, "%d\n", rbd_dev->pool_id);
1882 }
1883
1884 static ssize_t rbd_name_show(struct device *dev,
1885                              struct device_attribute *attr, char *buf)
1886 {
1887         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1888
1889         return sprintf(buf, "%s\n", rbd_dev->image_name);
1890 }
1891
1892 static ssize_t rbd_snap_show(struct device *dev,
1893                              struct device_attribute *attr,
1894                              char *buf)
1895 {
1896         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1897
1898         return sprintf(buf, "%s\n", rbd_dev->snap_name);
1899 }
1900
1901 static ssize_t rbd_image_refresh(struct device *dev,
1902                                  struct device_attribute *attr,
1903                                  const char *buf,
1904                                  size_t size)
1905 {
1906         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1907         int rc;
1908         int ret = size;
1909
1910         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1911
1912         rc = __rbd_refresh_header(rbd_dev);
1913         if (rc < 0)
1914                 ret = rc;
1915
1916         mutex_unlock(&ctl_mutex);
1917         return ret;
1918 }
1919
1920 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1921 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1922 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1923 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1924 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
1925 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1926 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1927 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1928 static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
1929
1930 static struct attribute *rbd_attrs[] = {
1931         &dev_attr_size.attr,
1932         &dev_attr_major.attr,
1933         &dev_attr_client_id.attr,
1934         &dev_attr_pool.attr,
1935         &dev_attr_pool_id.attr,
1936         &dev_attr_name.attr,
1937         &dev_attr_current_snap.attr,
1938         &dev_attr_refresh.attr,
1939         &dev_attr_create_snap.attr,
1940         NULL
1941 };
1942
1943 static struct attribute_group rbd_attr_group = {
1944         .attrs = rbd_attrs,
1945 };
1946
1947 static const struct attribute_group *rbd_attr_groups[] = {
1948         &rbd_attr_group,
1949         NULL
1950 };
1951
1952 static void rbd_sysfs_dev_release(struct device *dev)
1953 {
1954 }
1955
1956 static struct device_type rbd_device_type = {
1957         .name           = "rbd",
1958         .groups         = rbd_attr_groups,
1959         .release        = rbd_sysfs_dev_release,
1960 };
1961
1962
1963 /*
1964   sysfs - snapshots
1965 */
1966
1967 static ssize_t rbd_snap_size_show(struct device *dev,
1968                                   struct device_attribute *attr,
1969                                   char *buf)
1970 {
1971         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1972
1973         return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
1974 }
1975
1976 static ssize_t rbd_snap_id_show(struct device *dev,
1977                                 struct device_attribute *attr,
1978                                 char *buf)
1979 {
1980         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1981
1982         return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
1983 }
1984
1985 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
1986 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
1987
1988 static struct attribute *rbd_snap_attrs[] = {
1989         &dev_attr_snap_size.attr,
1990         &dev_attr_snap_id.attr,
1991         NULL,
1992 };
1993
1994 static struct attribute_group rbd_snap_attr_group = {
1995         .attrs = rbd_snap_attrs,
1996 };
1997
1998 static void rbd_snap_dev_release(struct device *dev)
1999 {
2000         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2001         kfree(snap->name);
2002         kfree(snap);
2003 }
2004
2005 static const struct attribute_group *rbd_snap_attr_groups[] = {
2006         &rbd_snap_attr_group,
2007         NULL
2008 };
2009
2010 static struct device_type rbd_snap_device_type = {
2011         .groups         = rbd_snap_attr_groups,
2012         .release        = rbd_snap_dev_release,
2013 };
2014
2015 static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
2016                                   struct rbd_snap *snap)
2017 {
2018         list_del(&snap->node);
2019         device_unregister(&snap->dev);
2020 }
2021
2022 static int rbd_register_snap_dev(struct rbd_device *rbd_dev,
2023                                   struct rbd_snap *snap,
2024                                   struct device *parent)
2025 {
2026         struct device *dev = &snap->dev;
2027         int ret;
2028
2029         dev->type = &rbd_snap_device_type;
2030         dev->parent = parent;
2031         dev->release = rbd_snap_dev_release;
2032         dev_set_name(dev, "snap_%s", snap->name);
2033         ret = device_register(dev);
2034
2035         return ret;
2036 }
2037
2038 static int __rbd_add_snap_dev(struct rbd_device *rbd_dev,
2039                               int i, const char *name,
2040                               struct rbd_snap **snapp)
2041 {
2042         int ret;
2043         struct rbd_snap *snap = kzalloc(sizeof(*snap), GFP_KERNEL);
2044         if (!snap)
2045                 return -ENOMEM;
2046         snap->name = kstrdup(name, GFP_KERNEL);
2047         snap->size = rbd_dev->header.snap_sizes[i];
2048         snap->id = rbd_dev->header.snapc->snaps[i];
2049         if (device_is_registered(&rbd_dev->dev)) {
2050                 ret = rbd_register_snap_dev(rbd_dev, snap,
2051                                              &rbd_dev->dev);
2052                 if (ret < 0)
2053                         goto err;
2054         }
2055         *snapp = snap;
2056         return 0;
2057 err:
2058         kfree(snap->name);
2059         kfree(snap);
2060         return ret;
2061 }
2062
2063 /*
2064  * search for the previous snap in a null delimited string list
2065  */
2066 const char *rbd_prev_snap_name(const char *name, const char *start)
2067 {
2068         if (name < start + 2)
2069                 return NULL;
2070
2071         name -= 2;
2072         while (*name) {
2073                 if (name == start)
2074                         return start;
2075                 name--;
2076         }
2077         return name + 1;
2078 }
2079
2080 /*
2081  * compare the old list of snapshots that we have to what's in the header
2082  * and update it accordingly. Note that the header holds the snapshots
2083  * in a reverse order (from newest to oldest) and we need to go from
2084  * older to new so that we don't get a duplicate snap name when
2085  * doing the process (e.g., removed snapshot and recreated a new
2086  * one with the same name.
2087  */
2088 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
2089 {
2090         const char *name, *first_name;
2091         int i = rbd_dev->header.total_snaps;
2092         struct rbd_snap *snap, *old_snap = NULL;
2093         int ret;
2094         struct list_head *p, *n;
2095
2096         first_name = rbd_dev->header.snap_names;
2097         name = first_name + rbd_dev->header.snap_names_len;
2098
2099         list_for_each_prev_safe(p, n, &rbd_dev->snaps) {
2100                 u64 cur_id;
2101
2102                 old_snap = list_entry(p, struct rbd_snap, node);
2103
2104                 if (i)
2105                         cur_id = rbd_dev->header.snapc->snaps[i - 1];
2106
2107                 if (!i || old_snap->id < cur_id) {
2108                         /*
2109                          * old_snap->id was skipped, thus was
2110                          * removed.  If this rbd_dev is mapped to
2111                          * the removed snapshot, record that it no
2112                          * longer exists, to prevent further I/O.
2113                          */
2114                         if (rbd_dev->snap_id == old_snap->id)
2115                                 rbd_dev->snap_exists = false;
2116                         __rbd_remove_snap_dev(rbd_dev, old_snap);
2117                         continue;
2118                 }
2119                 if (old_snap->id == cur_id) {
2120                         /* we have this snapshot already */
2121                         i--;
2122                         name = rbd_prev_snap_name(name, first_name);
2123                         continue;
2124                 }
2125                 for (; i > 0;
2126                      i--, name = rbd_prev_snap_name(name, first_name)) {
2127                         if (!name) {
2128                                 WARN_ON(1);
2129                                 return -EINVAL;
2130                         }
2131                         cur_id = rbd_dev->header.snapc->snaps[i];
2132                         /* snapshot removal? handle it above */
2133                         if (cur_id >= old_snap->id)
2134                                 break;
2135                         /* a new snapshot */
2136                         ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2137                         if (ret < 0)
2138                                 return ret;
2139
2140                         /* note that we add it backward so using n and not p */
2141                         list_add(&snap->node, n);
2142                         p = &snap->node;
2143                 }
2144         }
2145         /* we're done going over the old snap list, just add what's left */
2146         for (; i > 0; i--) {
2147                 name = rbd_prev_snap_name(name, first_name);
2148                 if (!name) {
2149                         WARN_ON(1);
2150                         return -EINVAL;
2151                 }
2152                 ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2153                 if (ret < 0)
2154                         return ret;
2155                 list_add(&snap->node, &rbd_dev->snaps);
2156         }
2157
2158         return 0;
2159 }
2160
2161 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2162 {
2163         int ret;
2164         struct device *dev;
2165         struct rbd_snap *snap;
2166
2167         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2168         dev = &rbd_dev->dev;
2169
2170         dev->bus = &rbd_bus_type;
2171         dev->type = &rbd_device_type;
2172         dev->parent = &rbd_root_dev;
2173         dev->release = rbd_dev_release;
2174         dev_set_name(dev, "%d", rbd_dev->id);
2175         ret = device_register(dev);
2176         if (ret < 0)
2177                 goto out;
2178
2179         list_for_each_entry(snap, &rbd_dev->snaps, node) {
2180                 ret = rbd_register_snap_dev(rbd_dev, snap,
2181                                              &rbd_dev->dev);
2182                 if (ret < 0)
2183                         break;
2184         }
2185 out:
2186         mutex_unlock(&ctl_mutex);
2187         return ret;
2188 }
2189
2190 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2191 {
2192         device_unregister(&rbd_dev->dev);
2193 }
2194
2195 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2196 {
2197         int ret, rc;
2198
2199         do {
2200                 ret = rbd_req_sync_watch(rbd_dev, rbd_dev->header_name,
2201                                          rbd_dev->header.obj_version);
2202                 if (ret == -ERANGE) {
2203                         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2204                         rc = __rbd_refresh_header(rbd_dev);
2205                         mutex_unlock(&ctl_mutex);
2206                         if (rc < 0)
2207                                 return rc;
2208                 }
2209         } while (ret == -ERANGE);
2210
2211         return ret;
2212 }
2213
2214 static atomic64_t rbd_id_max = ATOMIC64_INIT(0);
2215
2216 /*
2217  * Get a unique rbd identifier for the given new rbd_dev, and add
2218  * the rbd_dev to the global list.  The minimum rbd id is 1.
2219  */
2220 static void rbd_id_get(struct rbd_device *rbd_dev)
2221 {
2222         rbd_dev->id = atomic64_inc_return(&rbd_id_max);
2223
2224         spin_lock(&rbd_dev_list_lock);
2225         list_add_tail(&rbd_dev->node, &rbd_dev_list);
2226         spin_unlock(&rbd_dev_list_lock);
2227 }
2228
2229 /*
2230  * Remove an rbd_dev from the global list, and record that its
2231  * identifier is no longer in use.
2232  */
2233 static void rbd_id_put(struct rbd_device *rbd_dev)
2234 {
2235         struct list_head *tmp;
2236         int rbd_id = rbd_dev->id;
2237         int max_id;
2238
2239         BUG_ON(rbd_id < 1);
2240
2241         spin_lock(&rbd_dev_list_lock);
2242         list_del_init(&rbd_dev->node);
2243
2244         /*
2245          * If the id being "put" is not the current maximum, there
2246          * is nothing special we need to do.
2247          */
2248         if (rbd_id != atomic64_read(&rbd_id_max)) {
2249                 spin_unlock(&rbd_dev_list_lock);
2250                 return;
2251         }
2252
2253         /*
2254          * We need to update the current maximum id.  Search the
2255          * list to find out what it is.  We're more likely to find
2256          * the maximum at the end, so search the list backward.
2257          */
2258         max_id = 0;
2259         list_for_each_prev(tmp, &rbd_dev_list) {
2260                 struct rbd_device *rbd_dev;
2261
2262                 rbd_dev = list_entry(tmp, struct rbd_device, node);
2263                 if (rbd_id > max_id)
2264                         max_id = rbd_id;
2265         }
2266         spin_unlock(&rbd_dev_list_lock);
2267
2268         /*
2269          * The max id could have been updated by rbd_id_get(), in
2270          * which case it now accurately reflects the new maximum.
2271          * Be careful not to overwrite the maximum value in that
2272          * case.
2273          */
2274         atomic64_cmpxchg(&rbd_id_max, rbd_id, max_id);
2275 }
2276
2277 /*
2278  * Skips over white space at *buf, and updates *buf to point to the
2279  * first found non-space character (if any). Returns the length of
2280  * the token (string of non-white space characters) found.  Note
2281  * that *buf must be terminated with '\0'.
2282  */
2283 static inline size_t next_token(const char **buf)
2284 {
2285         /*
2286         * These are the characters that produce nonzero for
2287         * isspace() in the "C" and "POSIX" locales.
2288         */
2289         const char *spaces = " \f\n\r\t\v";
2290
2291         *buf += strspn(*buf, spaces);   /* Find start of token */
2292
2293         return strcspn(*buf, spaces);   /* Return token length */
2294 }
2295
2296 /*
2297  * Finds the next token in *buf, and if the provided token buffer is
2298  * big enough, copies the found token into it.  The result, if
2299  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
2300  * must be terminated with '\0' on entry.
2301  *
2302  * Returns the length of the token found (not including the '\0').
2303  * Return value will be 0 if no token is found, and it will be >=
2304  * token_size if the token would not fit.
2305  *
2306  * The *buf pointer will be updated to point beyond the end of the
2307  * found token.  Note that this occurs even if the token buffer is
2308  * too small to hold it.
2309  */
2310 static inline size_t copy_token(const char **buf,
2311                                 char *token,
2312                                 size_t token_size)
2313 {
2314         size_t len;
2315
2316         len = next_token(buf);
2317         if (len < token_size) {
2318                 memcpy(token, *buf, len);
2319                 *(token + len) = '\0';
2320         }
2321         *buf += len;
2322
2323         return len;
2324 }
2325
2326 /*
2327  * Finds the next token in *buf, dynamically allocates a buffer big
2328  * enough to hold a copy of it, and copies the token into the new
2329  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
2330  * that a duplicate buffer is created even for a zero-length token.
2331  *
2332  * Returns a pointer to the newly-allocated duplicate, or a null
2333  * pointer if memory for the duplicate was not available.  If
2334  * the lenp argument is a non-null pointer, the length of the token
2335  * (not including the '\0') is returned in *lenp.
2336  *
2337  * If successful, the *buf pointer will be updated to point beyond
2338  * the end of the found token.
2339  *
2340  * Note: uses GFP_KERNEL for allocation.
2341  */
2342 static inline char *dup_token(const char **buf, size_t *lenp)
2343 {
2344         char *dup;
2345         size_t len;
2346
2347         len = next_token(buf);
2348         dup = kmalloc(len + 1, GFP_KERNEL);
2349         if (!dup)
2350                 return NULL;
2351
2352         memcpy(dup, *buf, len);
2353         *(dup + len) = '\0';
2354         *buf += len;
2355
2356         if (lenp)
2357                 *lenp = len;
2358
2359         return dup;
2360 }
2361
2362 /*
2363  * This fills in the pool_name, image_name, image_name_len, snap_name,
2364  * rbd_dev, rbd_md_name, and name fields of the given rbd_dev, based
2365  * on the list of monitor addresses and other options provided via
2366  * /sys/bus/rbd/add.
2367  *
2368  * Note: rbd_dev is assumed to have been initially zero-filled.
2369  */
2370 static int rbd_add_parse_args(struct rbd_device *rbd_dev,
2371                               const char *buf,
2372                               const char **mon_addrs,
2373                               size_t *mon_addrs_size,
2374                               char *options,
2375                              size_t options_size)
2376 {
2377         size_t len;
2378         int ret;
2379
2380         /* The first four tokens are required */
2381
2382         len = next_token(&buf);
2383         if (!len)
2384                 return -EINVAL;
2385         *mon_addrs_size = len + 1;
2386         *mon_addrs = buf;
2387
2388         buf += len;
2389
2390         len = copy_token(&buf, options, options_size);
2391         if (!len || len >= options_size)
2392                 return -EINVAL;
2393
2394         ret = -ENOMEM;
2395         rbd_dev->pool_name = dup_token(&buf, NULL);
2396         if (!rbd_dev->pool_name)
2397                 goto out_err;
2398
2399         rbd_dev->image_name = dup_token(&buf, &rbd_dev->image_name_len);
2400         if (!rbd_dev->image_name)
2401                 goto out_err;
2402
2403         /* Create the name of the header object */
2404
2405         rbd_dev->header_name = kmalloc(rbd_dev->image_name_len
2406                                                 + sizeof (RBD_SUFFIX),
2407                                         GFP_KERNEL);
2408         if (!rbd_dev->header_name)
2409                 goto out_err;
2410         sprintf(rbd_dev->header_name, "%s%s", rbd_dev->image_name, RBD_SUFFIX);
2411
2412         /*
2413          * The snapshot name is optional.  If none is is supplied,
2414          * we use the default value.
2415          */
2416         rbd_dev->snap_name = dup_token(&buf, &len);
2417         if (!rbd_dev->snap_name)
2418                 goto out_err;
2419         if (!len) {
2420                 /* Replace the empty name with the default */
2421                 kfree(rbd_dev->snap_name);
2422                 rbd_dev->snap_name
2423                         = kmalloc(sizeof (RBD_SNAP_HEAD_NAME), GFP_KERNEL);
2424                 if (!rbd_dev->snap_name)
2425                         goto out_err;
2426
2427                 memcpy(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
2428                         sizeof (RBD_SNAP_HEAD_NAME));
2429         }
2430
2431         return 0;
2432
2433 out_err:
2434         kfree(rbd_dev->header_name);
2435         kfree(rbd_dev->image_name);
2436         kfree(rbd_dev->pool_name);
2437         rbd_dev->pool_name = NULL;
2438
2439         return ret;
2440 }
2441
2442 static ssize_t rbd_add(struct bus_type *bus,
2443                        const char *buf,
2444                        size_t count)
2445 {
2446         char *options;
2447         struct rbd_device *rbd_dev = NULL;
2448         const char *mon_addrs = NULL;
2449         size_t mon_addrs_size = 0;
2450         struct ceph_osd_client *osdc;
2451         int rc = -ENOMEM;
2452
2453         if (!try_module_get(THIS_MODULE))
2454                 return -ENODEV;
2455
2456         options = kmalloc(count, GFP_KERNEL);
2457         if (!options)
2458                 goto err_nomem;
2459         rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2460         if (!rbd_dev)
2461                 goto err_nomem;
2462
2463         /* static rbd_device initialization */
2464         spin_lock_init(&rbd_dev->lock);
2465         INIT_LIST_HEAD(&rbd_dev->node);
2466         INIT_LIST_HEAD(&rbd_dev->snaps);
2467         init_rwsem(&rbd_dev->header_rwsem);
2468
2469         init_rwsem(&rbd_dev->header_rwsem);
2470
2471         /* generate unique id: find highest unique id, add one */
2472         rbd_id_get(rbd_dev);
2473
2474         /* Fill in the device name, now that we have its id. */
2475         BUILD_BUG_ON(DEV_NAME_LEN
2476                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
2477         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->id);
2478
2479         /* parse add command */
2480         rc = rbd_add_parse_args(rbd_dev, buf, &mon_addrs, &mon_addrs_size,
2481                                 options, count);
2482         if (rc)
2483                 goto err_put_id;
2484
2485         rbd_dev->rbd_client = rbd_get_client(mon_addrs, mon_addrs_size - 1,
2486                                                 options);
2487         if (IS_ERR(rbd_dev->rbd_client)) {
2488                 rc = PTR_ERR(rbd_dev->rbd_client);
2489                 goto err_put_id;
2490         }
2491
2492         /* pick the pool */
2493         osdc = &rbd_dev->rbd_client->client->osdc;
2494         rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2495         if (rc < 0)
2496                 goto err_out_client;
2497         rbd_dev->pool_id = rc;
2498
2499         /* register our block device */
2500         rc = register_blkdev(0, rbd_dev->name);
2501         if (rc < 0)
2502                 goto err_out_client;
2503         rbd_dev->major = rc;
2504
2505         rc = rbd_bus_add_dev(rbd_dev);
2506         if (rc)
2507                 goto err_out_blkdev;
2508
2509         /*
2510          * At this point cleanup in the event of an error is the job
2511          * of the sysfs code (initiated by rbd_bus_del_dev()).
2512          *
2513          * Set up and announce blkdev mapping.
2514          */
2515         rc = rbd_init_disk(rbd_dev);
2516         if (rc)
2517                 goto err_out_bus;
2518
2519         rc = rbd_init_watch_dev(rbd_dev);
2520         if (rc)
2521                 goto err_out_bus;
2522
2523         return count;
2524
2525 err_out_bus:
2526         /* this will also clean up rest of rbd_dev stuff */
2527
2528         rbd_bus_del_dev(rbd_dev);
2529         kfree(options);
2530         return rc;
2531
2532 err_out_blkdev:
2533         unregister_blkdev(rbd_dev->major, rbd_dev->name);
2534 err_out_client:
2535         rbd_put_client(rbd_dev);
2536 err_put_id:
2537         if (rbd_dev->pool_name) {
2538                 kfree(rbd_dev->snap_name);
2539                 kfree(rbd_dev->header_name);
2540                 kfree(rbd_dev->image_name);
2541                 kfree(rbd_dev->pool_name);
2542         }
2543         rbd_id_put(rbd_dev);
2544 err_nomem:
2545         kfree(rbd_dev);
2546         kfree(options);
2547
2548         dout("Error adding device %s\n", buf);
2549         module_put(THIS_MODULE);
2550
2551         return (ssize_t) rc;
2552 }
2553
2554 static struct rbd_device *__rbd_get_dev(unsigned long id)
2555 {
2556         struct list_head *tmp;
2557         struct rbd_device *rbd_dev;
2558
2559         spin_lock(&rbd_dev_list_lock);
2560         list_for_each(tmp, &rbd_dev_list) {
2561                 rbd_dev = list_entry(tmp, struct rbd_device, node);
2562                 if (rbd_dev->id == id) {
2563                         spin_unlock(&rbd_dev_list_lock);
2564                         return rbd_dev;
2565                 }
2566         }
2567         spin_unlock(&rbd_dev_list_lock);
2568         return NULL;
2569 }
2570
2571 static void rbd_dev_release(struct device *dev)
2572 {
2573         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2574
2575         if (rbd_dev->watch_request) {
2576                 struct ceph_client *client = rbd_dev->rbd_client->client;
2577
2578                 ceph_osdc_unregister_linger_request(&client->osdc,
2579                                                     rbd_dev->watch_request);
2580         }
2581         if (rbd_dev->watch_event)
2582                 rbd_req_sync_unwatch(rbd_dev, rbd_dev->header_name);
2583
2584         rbd_put_client(rbd_dev);
2585
2586         /* clean up and free blkdev */
2587         rbd_free_disk(rbd_dev);
2588         unregister_blkdev(rbd_dev->major, rbd_dev->name);
2589
2590         /* done with the id, and with the rbd_dev */
2591         kfree(rbd_dev->snap_name);
2592         kfree(rbd_dev->header_name);
2593         kfree(rbd_dev->pool_name);
2594         kfree(rbd_dev->image_name);
2595         rbd_id_put(rbd_dev);
2596         kfree(rbd_dev);
2597
2598         /* release module ref */
2599         module_put(THIS_MODULE);
2600 }
2601
2602 static ssize_t rbd_remove(struct bus_type *bus,
2603                           const char *buf,
2604                           size_t count)
2605 {
2606         struct rbd_device *rbd_dev = NULL;
2607         int target_id, rc;
2608         unsigned long ul;
2609         int ret = count;
2610
2611         rc = strict_strtoul(buf, 10, &ul);
2612         if (rc)
2613                 return rc;
2614
2615         /* convert to int; abort if we lost anything in the conversion */
2616         target_id = (int) ul;
2617         if (target_id != ul)
2618                 return -EINVAL;
2619
2620         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2621
2622         rbd_dev = __rbd_get_dev(target_id);
2623         if (!rbd_dev) {
2624                 ret = -ENOENT;
2625                 goto done;
2626         }
2627
2628         __rbd_remove_all_snaps(rbd_dev);
2629         rbd_bus_del_dev(rbd_dev);
2630
2631 done:
2632         mutex_unlock(&ctl_mutex);
2633         return ret;
2634 }
2635
2636 static ssize_t rbd_snap_add(struct device *dev,
2637                             struct device_attribute *attr,
2638                             const char *buf,
2639                             size_t count)
2640 {
2641         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2642         int ret;
2643         char *name = kmalloc(count + 1, GFP_KERNEL);
2644         if (!name)
2645                 return -ENOMEM;
2646
2647         snprintf(name, count, "%s", buf);
2648
2649         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2650
2651         ret = rbd_header_add_snap(rbd_dev,
2652                                   name, GFP_KERNEL);
2653         if (ret < 0)
2654                 goto err_unlock;
2655
2656         ret = __rbd_refresh_header(rbd_dev);
2657         if (ret < 0)
2658                 goto err_unlock;
2659
2660         /* shouldn't hold ctl_mutex when notifying.. notify might
2661            trigger a watch callback that would need to get that mutex */
2662         mutex_unlock(&ctl_mutex);
2663
2664         /* make a best effort, don't error if failed */
2665         rbd_req_sync_notify(rbd_dev, rbd_dev->header_name);
2666
2667         ret = count;
2668         kfree(name);
2669         return ret;
2670
2671 err_unlock:
2672         mutex_unlock(&ctl_mutex);
2673         kfree(name);
2674         return ret;
2675 }
2676
2677 /*
2678  * create control files in sysfs
2679  * /sys/bus/rbd/...
2680  */
2681 static int rbd_sysfs_init(void)
2682 {
2683         int ret;
2684
2685         ret = device_register(&rbd_root_dev);
2686         if (ret < 0)
2687                 return ret;
2688
2689         ret = bus_register(&rbd_bus_type);
2690         if (ret < 0)
2691                 device_unregister(&rbd_root_dev);
2692
2693         return ret;
2694 }
2695
2696 static void rbd_sysfs_cleanup(void)
2697 {
2698         bus_unregister(&rbd_bus_type);
2699         device_unregister(&rbd_root_dev);
2700 }
2701
2702 int __init rbd_init(void)
2703 {
2704         int rc;
2705
2706         rc = rbd_sysfs_init();
2707         if (rc)
2708                 return rc;
2709         pr_info("loaded " RBD_DRV_NAME_LONG "\n");
2710         return 0;
2711 }
2712
2713 void __exit rbd_exit(void)
2714 {
2715         rbd_sysfs_cleanup();
2716 }
2717
2718 module_init(rbd_init);
2719 module_exit(rbd_exit);
2720
2721 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2722 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2723 MODULE_DESCRIPTION("rados block device");
2724
2725 /* following authorship retained from original osdblk.c */
2726 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2727
2728 MODULE_LICENSE("GPL");