Merge tag 'stable/for-linus-3.6-rc7-tag' of git://git.kernel.org/pub/scm/linux/kernel...
[cascardo/linux.git] / drivers / block / rbd.c
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3
4
5    based on drivers/block/osdblk.c:
6
7    Copyright 2009 Red Hat, Inc.
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24    For usage instructions, please refer to:
25
26                  Documentation/ABI/testing/sysfs-bus-rbd
27
28  */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 /*
45  * The basic unit of block I/O is a sector.  It is interpreted in a
46  * number of contexts in Linux (blk, bio, genhd), but the default is
47  * universally 512 bytes.  These symbols are just slightly more
48  * meaningful than the bare numbers they represent.
49  */
50 #define SECTOR_SHIFT    9
51 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
52
53 #define RBD_DRV_NAME "rbd"
54 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
55
56 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
57
58 #define RBD_MAX_SNAP_NAME_LEN   32
59 #define RBD_MAX_OPT_LEN         1024
60
61 #define RBD_SNAP_HEAD_NAME      "-"
62
63 /*
64  * An RBD device name will be "rbd#", where the "rbd" comes from
65  * RBD_DRV_NAME above, and # is a unique integer identifier.
66  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
67  * enough to hold all possible device names.
68  */
69 #define DEV_NAME_LEN            32
70 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
71
72 #define RBD_NOTIFY_TIMEOUT_DEFAULT 10
73
74 /*
75  * block device image metadata (in-memory version)
76  */
77 struct rbd_image_header {
78         u64 image_size;
79         char *object_prefix;
80         __u8 obj_order;
81         __u8 crypt_type;
82         __u8 comp_type;
83         struct ceph_snap_context *snapc;
84         size_t snap_names_len;
85         u32 total_snaps;
86
87         char *snap_names;
88         u64 *snap_sizes;
89
90         u64 obj_version;
91 };
92
93 struct rbd_options {
94         int     notify_timeout;
95 };
96
97 /*
98  * an instance of the client.  multiple devices may share an rbd client.
99  */
100 struct rbd_client {
101         struct ceph_client      *client;
102         struct rbd_options      *rbd_opts;
103         struct kref             kref;
104         struct list_head        node;
105 };
106
107 /*
108  * a request completion status
109  */
110 struct rbd_req_status {
111         int done;
112         int rc;
113         u64 bytes;
114 };
115
116 /*
117  * a collection of requests
118  */
119 struct rbd_req_coll {
120         int                     total;
121         int                     num_done;
122         struct kref             kref;
123         struct rbd_req_status   status[0];
124 };
125
126 /*
127  * a single io request
128  */
129 struct rbd_request {
130         struct request          *rq;            /* blk layer request */
131         struct bio              *bio;           /* cloned bio */
132         struct page             **pages;        /* list of used pages */
133         u64                     len;
134         int                     coll_index;
135         struct rbd_req_coll     *coll;
136 };
137
138 struct rbd_snap {
139         struct  device          dev;
140         const char              *name;
141         u64                     size;
142         struct list_head        node;
143         u64                     id;
144 };
145
146 /*
147  * a single device
148  */
149 struct rbd_device {
150         int                     dev_id;         /* blkdev unique id */
151
152         int                     major;          /* blkdev assigned major */
153         struct gendisk          *disk;          /* blkdev's gendisk and rq */
154         struct request_queue    *q;
155
156         struct rbd_client       *rbd_client;
157
158         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
159
160         spinlock_t              lock;           /* queue lock */
161
162         struct rbd_image_header header;
163         char                    *image_name;
164         size_t                  image_name_len;
165         char                    *header_name;
166         char                    *pool_name;
167         int                     pool_id;
168
169         struct ceph_osd_event   *watch_event;
170         struct ceph_osd_request *watch_request;
171
172         /* protects updating the header */
173         struct rw_semaphore     header_rwsem;
174         /* name of the snapshot this device reads from */
175         char                    *snap_name;
176         /* id of the snapshot this device reads from */
177         u64                     snap_id;        /* current snapshot id */
178         /* whether the snap_id this device reads from still exists */
179         bool                    snap_exists;
180         int                     read_only;
181
182         struct list_head        node;
183
184         /* list of snapshots */
185         struct list_head        snaps;
186
187         /* sysfs related */
188         struct device           dev;
189 };
190
191 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
192
193 static LIST_HEAD(rbd_dev_list);    /* devices */
194 static DEFINE_SPINLOCK(rbd_dev_list_lock);
195
196 static LIST_HEAD(rbd_client_list);              /* clients */
197 static DEFINE_SPINLOCK(rbd_client_list_lock);
198
199 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
200 static void rbd_dev_release(struct device *dev);
201 static ssize_t rbd_snap_add(struct device *dev,
202                             struct device_attribute *attr,
203                             const char *buf,
204                             size_t count);
205 static void __rbd_remove_snap_dev(struct rbd_snap *snap);
206
207 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
208                        size_t count);
209 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
210                           size_t count);
211
212 static struct bus_attribute rbd_bus_attrs[] = {
213         __ATTR(add, S_IWUSR, NULL, rbd_add),
214         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
215         __ATTR_NULL
216 };
217
218 static struct bus_type rbd_bus_type = {
219         .name           = "rbd",
220         .bus_attrs      = rbd_bus_attrs,
221 };
222
223 static void rbd_root_dev_release(struct device *dev)
224 {
225 }
226
227 static struct device rbd_root_dev = {
228         .init_name =    "rbd",
229         .release =      rbd_root_dev_release,
230 };
231
232
233 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
234 {
235         return get_device(&rbd_dev->dev);
236 }
237
238 static void rbd_put_dev(struct rbd_device *rbd_dev)
239 {
240         put_device(&rbd_dev->dev);
241 }
242
243 static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver);
244
245 static int rbd_open(struct block_device *bdev, fmode_t mode)
246 {
247         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
248
249         if ((mode & FMODE_WRITE) && rbd_dev->read_only)
250                 return -EROFS;
251
252         rbd_get_dev(rbd_dev);
253         set_device_ro(bdev, rbd_dev->read_only);
254
255         return 0;
256 }
257
258 static int rbd_release(struct gendisk *disk, fmode_t mode)
259 {
260         struct rbd_device *rbd_dev = disk->private_data;
261
262         rbd_put_dev(rbd_dev);
263
264         return 0;
265 }
266
267 static const struct block_device_operations rbd_bd_ops = {
268         .owner                  = THIS_MODULE,
269         .open                   = rbd_open,
270         .release                = rbd_release,
271 };
272
273 /*
274  * Initialize an rbd client instance.
275  * We own *ceph_opts.
276  */
277 static struct rbd_client *rbd_client_create(struct ceph_options *ceph_opts,
278                                             struct rbd_options *rbd_opts)
279 {
280         struct rbd_client *rbdc;
281         int ret = -ENOMEM;
282
283         dout("rbd_client_create\n");
284         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
285         if (!rbdc)
286                 goto out_opt;
287
288         kref_init(&rbdc->kref);
289         INIT_LIST_HEAD(&rbdc->node);
290
291         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
292
293         rbdc->client = ceph_create_client(ceph_opts, rbdc, 0, 0);
294         if (IS_ERR(rbdc->client))
295                 goto out_mutex;
296         ceph_opts = NULL; /* Now rbdc->client is responsible for ceph_opts */
297
298         ret = ceph_open_session(rbdc->client);
299         if (ret < 0)
300                 goto out_err;
301
302         rbdc->rbd_opts = rbd_opts;
303
304         spin_lock(&rbd_client_list_lock);
305         list_add_tail(&rbdc->node, &rbd_client_list);
306         spin_unlock(&rbd_client_list_lock);
307
308         mutex_unlock(&ctl_mutex);
309
310         dout("rbd_client_create created %p\n", rbdc);
311         return rbdc;
312
313 out_err:
314         ceph_destroy_client(rbdc->client);
315 out_mutex:
316         mutex_unlock(&ctl_mutex);
317         kfree(rbdc);
318 out_opt:
319         if (ceph_opts)
320                 ceph_destroy_options(ceph_opts);
321         return ERR_PTR(ret);
322 }
323
324 /*
325  * Find a ceph client with specific addr and configuration.
326  */
327 static struct rbd_client *__rbd_client_find(struct ceph_options *ceph_opts)
328 {
329         struct rbd_client *client_node;
330
331         if (ceph_opts->flags & CEPH_OPT_NOSHARE)
332                 return NULL;
333
334         list_for_each_entry(client_node, &rbd_client_list, node)
335                 if (!ceph_compare_options(ceph_opts, client_node->client))
336                         return client_node;
337         return NULL;
338 }
339
340 /*
341  * mount options
342  */
343 enum {
344         Opt_notify_timeout,
345         Opt_last_int,
346         /* int args above */
347         Opt_last_string,
348         /* string args above */
349 };
350
351 static match_table_t rbd_opts_tokens = {
352         {Opt_notify_timeout, "notify_timeout=%d"},
353         /* int args above */
354         /* string args above */
355         {-1, NULL}
356 };
357
358 static int parse_rbd_opts_token(char *c, void *private)
359 {
360         struct rbd_options *rbd_opts = private;
361         substring_t argstr[MAX_OPT_ARGS];
362         int token, intval, ret;
363
364         token = match_token(c, rbd_opts_tokens, argstr);
365         if (token < 0)
366                 return -EINVAL;
367
368         if (token < Opt_last_int) {
369                 ret = match_int(&argstr[0], &intval);
370                 if (ret < 0) {
371                         pr_err("bad mount option arg (not int) "
372                                "at '%s'\n", c);
373                         return ret;
374                 }
375                 dout("got int token %d val %d\n", token, intval);
376         } else if (token > Opt_last_int && token < Opt_last_string) {
377                 dout("got string token %d val %s\n", token,
378                      argstr[0].from);
379         } else {
380                 dout("got token %d\n", token);
381         }
382
383         switch (token) {
384         case Opt_notify_timeout:
385                 rbd_opts->notify_timeout = intval;
386                 break;
387         default:
388                 BUG_ON(token);
389         }
390         return 0;
391 }
392
393 /*
394  * Get a ceph client with specific addr and configuration, if one does
395  * not exist create it.
396  */
397 static struct rbd_client *rbd_get_client(const char *mon_addr,
398                                          size_t mon_addr_len,
399                                          char *options)
400 {
401         struct rbd_client *rbdc;
402         struct ceph_options *ceph_opts;
403         struct rbd_options *rbd_opts;
404
405         rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
406         if (!rbd_opts)
407                 return ERR_PTR(-ENOMEM);
408
409         rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
410
411         ceph_opts = ceph_parse_options(options, mon_addr,
412                                         mon_addr + mon_addr_len,
413                                         parse_rbd_opts_token, rbd_opts);
414         if (IS_ERR(ceph_opts)) {
415                 kfree(rbd_opts);
416                 return ERR_CAST(ceph_opts);
417         }
418
419         spin_lock(&rbd_client_list_lock);
420         rbdc = __rbd_client_find(ceph_opts);
421         if (rbdc) {
422                 /* using an existing client */
423                 kref_get(&rbdc->kref);
424                 spin_unlock(&rbd_client_list_lock);
425
426                 ceph_destroy_options(ceph_opts);
427                 kfree(rbd_opts);
428
429                 return rbdc;
430         }
431         spin_unlock(&rbd_client_list_lock);
432
433         rbdc = rbd_client_create(ceph_opts, rbd_opts);
434
435         if (IS_ERR(rbdc))
436                 kfree(rbd_opts);
437
438         return rbdc;
439 }
440
441 /*
442  * Destroy ceph client
443  *
444  * Caller must hold rbd_client_list_lock.
445  */
446 static void rbd_client_release(struct kref *kref)
447 {
448         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
449
450         dout("rbd_release_client %p\n", rbdc);
451         spin_lock(&rbd_client_list_lock);
452         list_del(&rbdc->node);
453         spin_unlock(&rbd_client_list_lock);
454
455         ceph_destroy_client(rbdc->client);
456         kfree(rbdc->rbd_opts);
457         kfree(rbdc);
458 }
459
460 /*
461  * Drop reference to ceph client node. If it's not referenced anymore, release
462  * it.
463  */
464 static void rbd_put_client(struct rbd_device *rbd_dev)
465 {
466         kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
467         rbd_dev->rbd_client = NULL;
468 }
469
470 /*
471  * Destroy requests collection
472  */
473 static void rbd_coll_release(struct kref *kref)
474 {
475         struct rbd_req_coll *coll =
476                 container_of(kref, struct rbd_req_coll, kref);
477
478         dout("rbd_coll_release %p\n", coll);
479         kfree(coll);
480 }
481
482 static bool rbd_dev_ondisk_valid(struct rbd_image_header_ondisk *ondisk)
483 {
484         return !memcmp(&ondisk->text,
485                         RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT));
486 }
487
488 /*
489  * Create a new header structure, translate header format from the on-disk
490  * header.
491  */
492 static int rbd_header_from_disk(struct rbd_image_header *header,
493                                  struct rbd_image_header_ondisk *ondisk,
494                                  u32 allocated_snaps)
495 {
496         u32 snap_count;
497
498         if (!rbd_dev_ondisk_valid(ondisk))
499                 return -ENXIO;
500
501         snap_count = le32_to_cpu(ondisk->snap_count);
502         if (snap_count > (SIZE_MAX - sizeof(struct ceph_snap_context))
503                                  / sizeof (u64))
504                 return -EINVAL;
505         header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
506                                 snap_count * sizeof(u64),
507                                 GFP_KERNEL);
508         if (!header->snapc)
509                 return -ENOMEM;
510
511         if (snap_count) {
512                 header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
513                 header->snap_names = kmalloc(header->snap_names_len,
514                                              GFP_KERNEL);
515                 if (!header->snap_names)
516                         goto err_snapc;
517                 header->snap_sizes = kmalloc(snap_count * sizeof(u64),
518                                              GFP_KERNEL);
519                 if (!header->snap_sizes)
520                         goto err_names;
521         } else {
522                 WARN_ON(ondisk->snap_names_len);
523                 header->snap_names_len = 0;
524                 header->snap_names = NULL;
525                 header->snap_sizes = NULL;
526         }
527
528         header->object_prefix = kmalloc(sizeof (ondisk->block_name) + 1,
529                                         GFP_KERNEL);
530         if (!header->object_prefix)
531                 goto err_sizes;
532
533         memcpy(header->object_prefix, ondisk->block_name,
534                sizeof(ondisk->block_name));
535         header->object_prefix[sizeof (ondisk->block_name)] = '\0';
536
537         header->image_size = le64_to_cpu(ondisk->image_size);
538         header->obj_order = ondisk->options.order;
539         header->crypt_type = ondisk->options.crypt_type;
540         header->comp_type = ondisk->options.comp_type;
541
542         atomic_set(&header->snapc->nref, 1);
543         header->snapc->seq = le64_to_cpu(ondisk->snap_seq);
544         header->snapc->num_snaps = snap_count;
545         header->total_snaps = snap_count;
546
547         if (snap_count && allocated_snaps == snap_count) {
548                 int i;
549
550                 for (i = 0; i < snap_count; i++) {
551                         header->snapc->snaps[i] =
552                                 le64_to_cpu(ondisk->snaps[i].id);
553                         header->snap_sizes[i] =
554                                 le64_to_cpu(ondisk->snaps[i].image_size);
555                 }
556
557                 /* copy snapshot names */
558                 memcpy(header->snap_names, &ondisk->snaps[snap_count],
559                         header->snap_names_len);
560         }
561
562         return 0;
563
564 err_sizes:
565         kfree(header->snap_sizes);
566         header->snap_sizes = NULL;
567 err_names:
568         kfree(header->snap_names);
569         header->snap_names = NULL;
570 err_snapc:
571         kfree(header->snapc);
572         header->snapc = NULL;
573
574         return -ENOMEM;
575 }
576
577 static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
578                         u64 *seq, u64 *size)
579 {
580         int i;
581         char *p = header->snap_names;
582
583         for (i = 0; i < header->total_snaps; i++) {
584                 if (!strcmp(snap_name, p)) {
585
586                         /* Found it.  Pass back its id and/or size */
587
588                         if (seq)
589                                 *seq = header->snapc->snaps[i];
590                         if (size)
591                                 *size = header->snap_sizes[i];
592                         return i;
593                 }
594                 p += strlen(p) + 1;     /* Skip ahead to the next name */
595         }
596         return -ENOENT;
597 }
598
599 static int rbd_header_set_snap(struct rbd_device *rbd_dev, u64 *size)
600 {
601         int ret;
602
603         down_write(&rbd_dev->header_rwsem);
604
605         if (!memcmp(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
606                     sizeof (RBD_SNAP_HEAD_NAME))) {
607                 rbd_dev->snap_id = CEPH_NOSNAP;
608                 rbd_dev->snap_exists = false;
609                 rbd_dev->read_only = 0;
610                 if (size)
611                         *size = rbd_dev->header.image_size;
612         } else {
613                 u64 snap_id = 0;
614
615                 ret = snap_by_name(&rbd_dev->header, rbd_dev->snap_name,
616                                         &snap_id, size);
617                 if (ret < 0)
618                         goto done;
619                 rbd_dev->snap_id = snap_id;
620                 rbd_dev->snap_exists = true;
621                 rbd_dev->read_only = 1;
622         }
623
624         ret = 0;
625 done:
626         up_write(&rbd_dev->header_rwsem);
627         return ret;
628 }
629
630 static void rbd_header_free(struct rbd_image_header *header)
631 {
632         kfree(header->object_prefix);
633         kfree(header->snap_sizes);
634         kfree(header->snap_names);
635         ceph_put_snap_context(header->snapc);
636 }
637
638 /*
639  * get the actual striped segment name, offset and length
640  */
641 static u64 rbd_get_segment(struct rbd_image_header *header,
642                            const char *object_prefix,
643                            u64 ofs, u64 len,
644                            char *seg_name, u64 *segofs)
645 {
646         u64 seg = ofs >> header->obj_order;
647
648         if (seg_name)
649                 snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
650                          "%s.%012llx", object_prefix, seg);
651
652         ofs = ofs & ((1 << header->obj_order) - 1);
653         len = min_t(u64, len, (1 << header->obj_order) - ofs);
654
655         if (segofs)
656                 *segofs = ofs;
657
658         return len;
659 }
660
661 static int rbd_get_num_segments(struct rbd_image_header *header,
662                                 u64 ofs, u64 len)
663 {
664         u64 start_seg = ofs >> header->obj_order;
665         u64 end_seg = (ofs + len - 1) >> header->obj_order;
666         return end_seg - start_seg + 1;
667 }
668
669 /*
670  * returns the size of an object in the image
671  */
672 static u64 rbd_obj_bytes(struct rbd_image_header *header)
673 {
674         return 1 << header->obj_order;
675 }
676
677 /*
678  * bio helpers
679  */
680
681 static void bio_chain_put(struct bio *chain)
682 {
683         struct bio *tmp;
684
685         while (chain) {
686                 tmp = chain;
687                 chain = chain->bi_next;
688                 bio_put(tmp);
689         }
690 }
691
692 /*
693  * zeros a bio chain, starting at specific offset
694  */
695 static void zero_bio_chain(struct bio *chain, int start_ofs)
696 {
697         struct bio_vec *bv;
698         unsigned long flags;
699         void *buf;
700         int i;
701         int pos = 0;
702
703         while (chain) {
704                 bio_for_each_segment(bv, chain, i) {
705                         if (pos + bv->bv_len > start_ofs) {
706                                 int remainder = max(start_ofs - pos, 0);
707                                 buf = bvec_kmap_irq(bv, &flags);
708                                 memset(buf + remainder, 0,
709                                        bv->bv_len - remainder);
710                                 bvec_kunmap_irq(buf, &flags);
711                         }
712                         pos += bv->bv_len;
713                 }
714
715                 chain = chain->bi_next;
716         }
717 }
718
719 /*
720  * bio_chain_clone - clone a chain of bios up to a certain length.
721  * might return a bio_pair that will need to be released.
722  */
723 static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
724                                    struct bio_pair **bp,
725                                    int len, gfp_t gfpmask)
726 {
727         struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
728         int total = 0;
729
730         if (*bp) {
731                 bio_pair_release(*bp);
732                 *bp = NULL;
733         }
734
735         while (old_chain && (total < len)) {
736                 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
737                 if (!tmp)
738                         goto err_out;
739
740                 if (total + old_chain->bi_size > len) {
741                         struct bio_pair *bp;
742
743                         /*
744                          * this split can only happen with a single paged bio,
745                          * split_bio will BUG_ON if this is not the case
746                          */
747                         dout("bio_chain_clone split! total=%d remaining=%d"
748                              "bi_size=%u\n",
749                              total, len - total, old_chain->bi_size);
750
751                         /* split the bio. We'll release it either in the next
752                            call, or it will have to be released outside */
753                         bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
754                         if (!bp)
755                                 goto err_out;
756
757                         __bio_clone(tmp, &bp->bio1);
758
759                         *next = &bp->bio2;
760                 } else {
761                         __bio_clone(tmp, old_chain);
762                         *next = old_chain->bi_next;
763                 }
764
765                 tmp->bi_bdev = NULL;
766                 gfpmask &= ~__GFP_WAIT;
767                 tmp->bi_next = NULL;
768
769                 if (!new_chain) {
770                         new_chain = tail = tmp;
771                 } else {
772                         tail->bi_next = tmp;
773                         tail = tmp;
774                 }
775                 old_chain = old_chain->bi_next;
776
777                 total += tmp->bi_size;
778         }
779
780         BUG_ON(total < len);
781
782         if (tail)
783                 tail->bi_next = NULL;
784
785         *old = old_chain;
786
787         return new_chain;
788
789 err_out:
790         dout("bio_chain_clone with err\n");
791         bio_chain_put(new_chain);
792         return NULL;
793 }
794
795 /*
796  * helpers for osd request op vectors.
797  */
798 static struct ceph_osd_req_op *rbd_create_rw_ops(int num_ops,
799                                         int opcode, u32 payload_len)
800 {
801         struct ceph_osd_req_op *ops;
802
803         ops = kzalloc(sizeof (*ops) * (num_ops + 1), GFP_NOIO);
804         if (!ops)
805                 return NULL;
806
807         ops[0].op = opcode;
808
809         /*
810          * op extent offset and length will be set later on
811          * in calc_raw_layout()
812          */
813         ops[0].payload_len = payload_len;
814
815         return ops;
816 }
817
818 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
819 {
820         kfree(ops);
821 }
822
823 static void rbd_coll_end_req_index(struct request *rq,
824                                    struct rbd_req_coll *coll,
825                                    int index,
826                                    int ret, u64 len)
827 {
828         struct request_queue *q;
829         int min, max, i;
830
831         dout("rbd_coll_end_req_index %p index %d ret %d len %llu\n",
832              coll, index, ret, (unsigned long long) len);
833
834         if (!rq)
835                 return;
836
837         if (!coll) {
838                 blk_end_request(rq, ret, len);
839                 return;
840         }
841
842         q = rq->q;
843
844         spin_lock_irq(q->queue_lock);
845         coll->status[index].done = 1;
846         coll->status[index].rc = ret;
847         coll->status[index].bytes = len;
848         max = min = coll->num_done;
849         while (max < coll->total && coll->status[max].done)
850                 max++;
851
852         for (i = min; i<max; i++) {
853                 __blk_end_request(rq, coll->status[i].rc,
854                                   coll->status[i].bytes);
855                 coll->num_done++;
856                 kref_put(&coll->kref, rbd_coll_release);
857         }
858         spin_unlock_irq(q->queue_lock);
859 }
860
861 static void rbd_coll_end_req(struct rbd_request *req,
862                              int ret, u64 len)
863 {
864         rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
865 }
866
867 /*
868  * Send ceph osd request
869  */
870 static int rbd_do_request(struct request *rq,
871                           struct rbd_device *rbd_dev,
872                           struct ceph_snap_context *snapc,
873                           u64 snapid,
874                           const char *object_name, u64 ofs, u64 len,
875                           struct bio *bio,
876                           struct page **pages,
877                           int num_pages,
878                           int flags,
879                           struct ceph_osd_req_op *ops,
880                           struct rbd_req_coll *coll,
881                           int coll_index,
882                           void (*rbd_cb)(struct ceph_osd_request *req,
883                                          struct ceph_msg *msg),
884                           struct ceph_osd_request **linger_req,
885                           u64 *ver)
886 {
887         struct ceph_osd_request *req;
888         struct ceph_file_layout *layout;
889         int ret;
890         u64 bno;
891         struct timespec mtime = CURRENT_TIME;
892         struct rbd_request *req_data;
893         struct ceph_osd_request_head *reqhead;
894         struct ceph_osd_client *osdc;
895
896         req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
897         if (!req_data) {
898                 if (coll)
899                         rbd_coll_end_req_index(rq, coll, coll_index,
900                                                -ENOMEM, len);
901                 return -ENOMEM;
902         }
903
904         if (coll) {
905                 req_data->coll = coll;
906                 req_data->coll_index = coll_index;
907         }
908
909         dout("rbd_do_request object_name=%s ofs=%llu len=%llu\n", object_name,
910                 (unsigned long long) ofs, (unsigned long long) len);
911
912         osdc = &rbd_dev->rbd_client->client->osdc;
913         req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
914                                         false, GFP_NOIO, pages, bio);
915         if (!req) {
916                 ret = -ENOMEM;
917                 goto done_pages;
918         }
919
920         req->r_callback = rbd_cb;
921
922         req_data->rq = rq;
923         req_data->bio = bio;
924         req_data->pages = pages;
925         req_data->len = len;
926
927         req->r_priv = req_data;
928
929         reqhead = req->r_request->front.iov_base;
930         reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
931
932         strncpy(req->r_oid, object_name, sizeof(req->r_oid));
933         req->r_oid_len = strlen(req->r_oid);
934
935         layout = &req->r_file_layout;
936         memset(layout, 0, sizeof(*layout));
937         layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
938         layout->fl_stripe_count = cpu_to_le32(1);
939         layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
940         layout->fl_pg_pool = cpu_to_le32(rbd_dev->pool_id);
941         ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
942                                 req, ops);
943
944         ceph_osdc_build_request(req, ofs, &len,
945                                 ops,
946                                 snapc,
947                                 &mtime,
948                                 req->r_oid, req->r_oid_len);
949
950         if (linger_req) {
951                 ceph_osdc_set_request_linger(osdc, req);
952                 *linger_req = req;
953         }
954
955         ret = ceph_osdc_start_request(osdc, req, false);
956         if (ret < 0)
957                 goto done_err;
958
959         if (!rbd_cb) {
960                 ret = ceph_osdc_wait_request(osdc, req);
961                 if (ver)
962                         *ver = le64_to_cpu(req->r_reassert_version.version);
963                 dout("reassert_ver=%llu\n",
964                         (unsigned long long)
965                                 le64_to_cpu(req->r_reassert_version.version));
966                 ceph_osdc_put_request(req);
967         }
968         return ret;
969
970 done_err:
971         bio_chain_put(req_data->bio);
972         ceph_osdc_put_request(req);
973 done_pages:
974         rbd_coll_end_req(req_data, ret, len);
975         kfree(req_data);
976         return ret;
977 }
978
979 /*
980  * Ceph osd op callback
981  */
982 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
983 {
984         struct rbd_request *req_data = req->r_priv;
985         struct ceph_osd_reply_head *replyhead;
986         struct ceph_osd_op *op;
987         __s32 rc;
988         u64 bytes;
989         int read_op;
990
991         /* parse reply */
992         replyhead = msg->front.iov_base;
993         WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
994         op = (void *)(replyhead + 1);
995         rc = le32_to_cpu(replyhead->result);
996         bytes = le64_to_cpu(op->extent.length);
997         read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
998
999         dout("rbd_req_cb bytes=%llu readop=%d rc=%d\n",
1000                 (unsigned long long) bytes, read_op, (int) rc);
1001
1002         if (rc == -ENOENT && read_op) {
1003                 zero_bio_chain(req_data->bio, 0);
1004                 rc = 0;
1005         } else if (rc == 0 && read_op && bytes < req_data->len) {
1006                 zero_bio_chain(req_data->bio, bytes);
1007                 bytes = req_data->len;
1008         }
1009
1010         rbd_coll_end_req(req_data, rc, bytes);
1011
1012         if (req_data->bio)
1013                 bio_chain_put(req_data->bio);
1014
1015         ceph_osdc_put_request(req);
1016         kfree(req_data);
1017 }
1018
1019 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1020 {
1021         ceph_osdc_put_request(req);
1022 }
1023
1024 /*
1025  * Do a synchronous ceph osd operation
1026  */
1027 static int rbd_req_sync_op(struct rbd_device *rbd_dev,
1028                            struct ceph_snap_context *snapc,
1029                            u64 snapid,
1030                            int flags,
1031                            struct ceph_osd_req_op *ops,
1032                            const char *object_name,
1033                            u64 ofs, u64 len,
1034                            char *buf,
1035                            struct ceph_osd_request **linger_req,
1036                            u64 *ver)
1037 {
1038         int ret;
1039         struct page **pages;
1040         int num_pages;
1041
1042         BUG_ON(ops == NULL);
1043
1044         num_pages = calc_pages_for(ofs , len);
1045         pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1046         if (IS_ERR(pages))
1047                 return PTR_ERR(pages);
1048
1049         ret = rbd_do_request(NULL, rbd_dev, snapc, snapid,
1050                           object_name, ofs, len, NULL,
1051                           pages, num_pages,
1052                           flags,
1053                           ops,
1054                           NULL, 0,
1055                           NULL,
1056                           linger_req, ver);
1057         if (ret < 0)
1058                 goto done;
1059
1060         if ((flags & CEPH_OSD_FLAG_READ) && buf)
1061                 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1062
1063 done:
1064         ceph_release_page_vector(pages, num_pages);
1065         return ret;
1066 }
1067
1068 /*
1069  * Do an asynchronous ceph osd operation
1070  */
1071 static int rbd_do_op(struct request *rq,
1072                      struct rbd_device *rbd_dev,
1073                      struct ceph_snap_context *snapc,
1074                      u64 snapid,
1075                      int opcode, int flags,
1076                      u64 ofs, u64 len,
1077                      struct bio *bio,
1078                      struct rbd_req_coll *coll,
1079                      int coll_index)
1080 {
1081         char *seg_name;
1082         u64 seg_ofs;
1083         u64 seg_len;
1084         int ret;
1085         struct ceph_osd_req_op *ops;
1086         u32 payload_len;
1087
1088         seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1089         if (!seg_name)
1090                 return -ENOMEM;
1091
1092         seg_len = rbd_get_segment(&rbd_dev->header,
1093                                   rbd_dev->header.object_prefix,
1094                                   ofs, len,
1095                                   seg_name, &seg_ofs);
1096
1097         payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1098
1099         ret = -ENOMEM;
1100         ops = rbd_create_rw_ops(1, opcode, payload_len);
1101         if (!ops)
1102                 goto done;
1103
1104         /* we've taken care of segment sizes earlier when we
1105            cloned the bios. We should never have a segment
1106            truncated at this point */
1107         BUG_ON(seg_len < len);
1108
1109         ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1110                              seg_name, seg_ofs, seg_len,
1111                              bio,
1112                              NULL, 0,
1113                              flags,
1114                              ops,
1115                              coll, coll_index,
1116                              rbd_req_cb, 0, NULL);
1117
1118         rbd_destroy_ops(ops);
1119 done:
1120         kfree(seg_name);
1121         return ret;
1122 }
1123
1124 /*
1125  * Request async osd write
1126  */
1127 static int rbd_req_write(struct request *rq,
1128                          struct rbd_device *rbd_dev,
1129                          struct ceph_snap_context *snapc,
1130                          u64 ofs, u64 len,
1131                          struct bio *bio,
1132                          struct rbd_req_coll *coll,
1133                          int coll_index)
1134 {
1135         return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1136                          CEPH_OSD_OP_WRITE,
1137                          CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1138                          ofs, len, bio, coll, coll_index);
1139 }
1140
1141 /*
1142  * Request async osd read
1143  */
1144 static int rbd_req_read(struct request *rq,
1145                          struct rbd_device *rbd_dev,
1146                          u64 snapid,
1147                          u64 ofs, u64 len,
1148                          struct bio *bio,
1149                          struct rbd_req_coll *coll,
1150                          int coll_index)
1151 {
1152         return rbd_do_op(rq, rbd_dev, NULL,
1153                          snapid,
1154                          CEPH_OSD_OP_READ,
1155                          CEPH_OSD_FLAG_READ,
1156                          ofs, len, bio, coll, coll_index);
1157 }
1158
1159 /*
1160  * Request sync osd read
1161  */
1162 static int rbd_req_sync_read(struct rbd_device *rbd_dev,
1163                           u64 snapid,
1164                           const char *object_name,
1165                           u64 ofs, u64 len,
1166                           char *buf,
1167                           u64 *ver)
1168 {
1169         struct ceph_osd_req_op *ops;
1170         int ret;
1171
1172         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_READ, 0);
1173         if (!ops)
1174                 return -ENOMEM;
1175
1176         ret = rbd_req_sync_op(rbd_dev, NULL,
1177                                snapid,
1178                                CEPH_OSD_FLAG_READ,
1179                                ops, object_name, ofs, len, buf, NULL, ver);
1180         rbd_destroy_ops(ops);
1181
1182         return ret;
1183 }
1184
1185 /*
1186  * Request sync osd watch
1187  */
1188 static int rbd_req_sync_notify_ack(struct rbd_device *rbd_dev,
1189                                    u64 ver,
1190                                    u64 notify_id)
1191 {
1192         struct ceph_osd_req_op *ops;
1193         int ret;
1194
1195         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1196         if (!ops)
1197                 return -ENOMEM;
1198
1199         ops[0].watch.ver = cpu_to_le64(ver);
1200         ops[0].watch.cookie = notify_id;
1201         ops[0].watch.flag = 0;
1202
1203         ret = rbd_do_request(NULL, rbd_dev, NULL, CEPH_NOSNAP,
1204                           rbd_dev->header_name, 0, 0, NULL,
1205                           NULL, 0,
1206                           CEPH_OSD_FLAG_READ,
1207                           ops,
1208                           NULL, 0,
1209                           rbd_simple_req_cb, 0, NULL);
1210
1211         rbd_destroy_ops(ops);
1212         return ret;
1213 }
1214
1215 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1216 {
1217         struct rbd_device *rbd_dev = (struct rbd_device *)data;
1218         u64 hver;
1219         int rc;
1220
1221         if (!rbd_dev)
1222                 return;
1223
1224         dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
1225                 rbd_dev->header_name, (unsigned long long) notify_id,
1226                 (unsigned int) opcode);
1227         rc = rbd_refresh_header(rbd_dev, &hver);
1228         if (rc)
1229                 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1230                            " update snaps: %d\n", rbd_dev->major, rc);
1231
1232         rbd_req_sync_notify_ack(rbd_dev, hver, notify_id);
1233 }
1234
1235 /*
1236  * Request sync osd watch
1237  */
1238 static int rbd_req_sync_watch(struct rbd_device *rbd_dev)
1239 {
1240         struct ceph_osd_req_op *ops;
1241         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1242         int ret;
1243
1244         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1245         if (!ops)
1246                 return -ENOMEM;
1247
1248         ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1249                                      (void *)rbd_dev, &rbd_dev->watch_event);
1250         if (ret < 0)
1251                 goto fail;
1252
1253         ops[0].watch.ver = cpu_to_le64(rbd_dev->header.obj_version);
1254         ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1255         ops[0].watch.flag = 1;
1256
1257         ret = rbd_req_sync_op(rbd_dev, NULL,
1258                               CEPH_NOSNAP,
1259                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1260                               ops,
1261                               rbd_dev->header_name,
1262                               0, 0, NULL,
1263                               &rbd_dev->watch_request, NULL);
1264
1265         if (ret < 0)
1266                 goto fail_event;
1267
1268         rbd_destroy_ops(ops);
1269         return 0;
1270
1271 fail_event:
1272         ceph_osdc_cancel_event(rbd_dev->watch_event);
1273         rbd_dev->watch_event = NULL;
1274 fail:
1275         rbd_destroy_ops(ops);
1276         return ret;
1277 }
1278
1279 /*
1280  * Request sync osd unwatch
1281  */
1282 static int rbd_req_sync_unwatch(struct rbd_device *rbd_dev)
1283 {
1284         struct ceph_osd_req_op *ops;
1285         int ret;
1286
1287         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_WATCH, 0);
1288         if (!ops)
1289                 return -ENOMEM;
1290
1291         ops[0].watch.ver = 0;
1292         ops[0].watch.cookie = cpu_to_le64(rbd_dev->watch_event->cookie);
1293         ops[0].watch.flag = 0;
1294
1295         ret = rbd_req_sync_op(rbd_dev, NULL,
1296                               CEPH_NOSNAP,
1297                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1298                               ops,
1299                               rbd_dev->header_name,
1300                               0, 0, NULL, NULL, NULL);
1301
1302
1303         rbd_destroy_ops(ops);
1304         ceph_osdc_cancel_event(rbd_dev->watch_event);
1305         rbd_dev->watch_event = NULL;
1306         return ret;
1307 }
1308
1309 struct rbd_notify_info {
1310         struct rbd_device *rbd_dev;
1311 };
1312
1313 static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1314 {
1315         struct rbd_device *rbd_dev = (struct rbd_device *)data;
1316         if (!rbd_dev)
1317                 return;
1318
1319         dout("rbd_notify_cb %s notify_id=%llu opcode=%u\n",
1320                         rbd_dev->header_name, (unsigned long long) notify_id,
1321                         (unsigned int) opcode);
1322 }
1323
1324 /*
1325  * Request sync osd notify
1326  */
1327 static int rbd_req_sync_notify(struct rbd_device *rbd_dev)
1328 {
1329         struct ceph_osd_req_op *ops;
1330         struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
1331         struct ceph_osd_event *event;
1332         struct rbd_notify_info info;
1333         int payload_len = sizeof(u32) + sizeof(u32);
1334         int ret;
1335
1336         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_NOTIFY, payload_len);
1337         if (!ops)
1338                 return -ENOMEM;
1339
1340         info.rbd_dev = rbd_dev;
1341
1342         ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1343                                      (void *)&info, &event);
1344         if (ret < 0)
1345                 goto fail;
1346
1347         ops[0].watch.ver = 1;
1348         ops[0].watch.flag = 1;
1349         ops[0].watch.cookie = event->cookie;
1350         ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1351         ops[0].watch.timeout = 12;
1352
1353         ret = rbd_req_sync_op(rbd_dev, NULL,
1354                                CEPH_NOSNAP,
1355                                CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1356                                ops,
1357                                rbd_dev->header_name,
1358                                0, 0, NULL, NULL, NULL);
1359         if (ret < 0)
1360                 goto fail_event;
1361
1362         ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1363         dout("ceph_osdc_wait_event returned %d\n", ret);
1364         rbd_destroy_ops(ops);
1365         return 0;
1366
1367 fail_event:
1368         ceph_osdc_cancel_event(event);
1369 fail:
1370         rbd_destroy_ops(ops);
1371         return ret;
1372 }
1373
1374 /*
1375  * Request sync osd read
1376  */
1377 static int rbd_req_sync_exec(struct rbd_device *rbd_dev,
1378                              const char *object_name,
1379                              const char *class_name,
1380                              const char *method_name,
1381                              const char *data,
1382                              int len,
1383                              u64 *ver)
1384 {
1385         struct ceph_osd_req_op *ops;
1386         int class_name_len = strlen(class_name);
1387         int method_name_len = strlen(method_name);
1388         int ret;
1389
1390         ops = rbd_create_rw_ops(1, CEPH_OSD_OP_CALL,
1391                                     class_name_len + method_name_len + len);
1392         if (!ops)
1393                 return -ENOMEM;
1394
1395         ops[0].cls.class_name = class_name;
1396         ops[0].cls.class_len = (__u8) class_name_len;
1397         ops[0].cls.method_name = method_name;
1398         ops[0].cls.method_len = (__u8) method_name_len;
1399         ops[0].cls.argc = 0;
1400         ops[0].cls.indata = data;
1401         ops[0].cls.indata_len = len;
1402
1403         ret = rbd_req_sync_op(rbd_dev, NULL,
1404                                CEPH_NOSNAP,
1405                                CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1406                                ops,
1407                                object_name, 0, 0, NULL, NULL, ver);
1408
1409         rbd_destroy_ops(ops);
1410
1411         dout("cls_exec returned %d\n", ret);
1412         return ret;
1413 }
1414
1415 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1416 {
1417         struct rbd_req_coll *coll =
1418                         kzalloc(sizeof(struct rbd_req_coll) +
1419                                 sizeof(struct rbd_req_status) * num_reqs,
1420                                 GFP_ATOMIC);
1421
1422         if (!coll)
1423                 return NULL;
1424         coll->total = num_reqs;
1425         kref_init(&coll->kref);
1426         return coll;
1427 }
1428
1429 /*
1430  * block device queue callback
1431  */
1432 static void rbd_rq_fn(struct request_queue *q)
1433 {
1434         struct rbd_device *rbd_dev = q->queuedata;
1435         struct request *rq;
1436         struct bio_pair *bp = NULL;
1437
1438         while ((rq = blk_fetch_request(q))) {
1439                 struct bio *bio;
1440                 struct bio *rq_bio, *next_bio = NULL;
1441                 bool do_write;
1442                 unsigned int size;
1443                 u64 op_size = 0;
1444                 u64 ofs;
1445                 int num_segs, cur_seg = 0;
1446                 struct rbd_req_coll *coll;
1447                 struct ceph_snap_context *snapc;
1448
1449                 /* peek at request from block layer */
1450                 if (!rq)
1451                         break;
1452
1453                 dout("fetched request\n");
1454
1455                 /* filter out block requests we don't understand */
1456                 if ((rq->cmd_type != REQ_TYPE_FS)) {
1457                         __blk_end_request_all(rq, 0);
1458                         continue;
1459                 }
1460
1461                 /* deduce our operation (read, write) */
1462                 do_write = (rq_data_dir(rq) == WRITE);
1463
1464                 size = blk_rq_bytes(rq);
1465                 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1466                 rq_bio = rq->bio;
1467                 if (do_write && rbd_dev->read_only) {
1468                         __blk_end_request_all(rq, -EROFS);
1469                         continue;
1470                 }
1471
1472                 spin_unlock_irq(q->queue_lock);
1473
1474                 down_read(&rbd_dev->header_rwsem);
1475
1476                 if (rbd_dev->snap_id != CEPH_NOSNAP && !rbd_dev->snap_exists) {
1477                         up_read(&rbd_dev->header_rwsem);
1478                         dout("request for non-existent snapshot");
1479                         spin_lock_irq(q->queue_lock);
1480                         __blk_end_request_all(rq, -ENXIO);
1481                         continue;
1482                 }
1483
1484                 snapc = ceph_get_snap_context(rbd_dev->header.snapc);
1485
1486                 up_read(&rbd_dev->header_rwsem);
1487
1488                 dout("%s 0x%x bytes at 0x%llx\n",
1489                      do_write ? "write" : "read",
1490                      size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
1491
1492                 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1493                 coll = rbd_alloc_coll(num_segs);
1494                 if (!coll) {
1495                         spin_lock_irq(q->queue_lock);
1496                         __blk_end_request_all(rq, -ENOMEM);
1497                         ceph_put_snap_context(snapc);
1498                         continue;
1499                 }
1500
1501                 do {
1502                         /* a bio clone to be passed down to OSD req */
1503                         dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
1504                         op_size = rbd_get_segment(&rbd_dev->header,
1505                                                   rbd_dev->header.object_prefix,
1506                                                   ofs, size,
1507                                                   NULL, NULL);
1508                         kref_get(&coll->kref);
1509                         bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1510                                               op_size, GFP_ATOMIC);
1511                         if (!bio) {
1512                                 rbd_coll_end_req_index(rq, coll, cur_seg,
1513                                                        -ENOMEM, op_size);
1514                                 goto next_seg;
1515                         }
1516
1517
1518                         /* init OSD command: write or read */
1519                         if (do_write)
1520                                 rbd_req_write(rq, rbd_dev,
1521                                               snapc,
1522                                               ofs,
1523                                               op_size, bio,
1524                                               coll, cur_seg);
1525                         else
1526                                 rbd_req_read(rq, rbd_dev,
1527                                              rbd_dev->snap_id,
1528                                              ofs,
1529                                              op_size, bio,
1530                                              coll, cur_seg);
1531
1532 next_seg:
1533                         size -= op_size;
1534                         ofs += op_size;
1535
1536                         cur_seg++;
1537                         rq_bio = next_bio;
1538                 } while (size > 0);
1539                 kref_put(&coll->kref, rbd_coll_release);
1540
1541                 if (bp)
1542                         bio_pair_release(bp);
1543                 spin_lock_irq(q->queue_lock);
1544
1545                 ceph_put_snap_context(snapc);
1546         }
1547 }
1548
1549 /*
1550  * a queue callback. Makes sure that we don't create a bio that spans across
1551  * multiple osd objects. One exception would be with a single page bios,
1552  * which we handle later at bio_chain_clone
1553  */
1554 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1555                           struct bio_vec *bvec)
1556 {
1557         struct rbd_device *rbd_dev = q->queuedata;
1558         unsigned int chunk_sectors;
1559         sector_t sector;
1560         unsigned int bio_sectors;
1561         int max;
1562
1563         chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1564         sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1565         bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
1566
1567         max =  (chunk_sectors - ((sector & (chunk_sectors - 1))
1568                                  + bio_sectors)) << SECTOR_SHIFT;
1569         if (max < 0)
1570                 max = 0; /* bio_add cannot handle a negative return */
1571         if (max <= bvec->bv_len && bio_sectors == 0)
1572                 return bvec->bv_len;
1573         return max;
1574 }
1575
1576 static void rbd_free_disk(struct rbd_device *rbd_dev)
1577 {
1578         struct gendisk *disk = rbd_dev->disk;
1579
1580         if (!disk)
1581                 return;
1582
1583         rbd_header_free(&rbd_dev->header);
1584
1585         if (disk->flags & GENHD_FL_UP)
1586                 del_gendisk(disk);
1587         if (disk->queue)
1588                 blk_cleanup_queue(disk->queue);
1589         put_disk(disk);
1590 }
1591
1592 /*
1593  * reload the ondisk the header 
1594  */
1595 static int rbd_read_header(struct rbd_device *rbd_dev,
1596                            struct rbd_image_header *header)
1597 {
1598         ssize_t rc;
1599         struct rbd_image_header_ondisk *dh;
1600         u32 snap_count = 0;
1601         u64 ver;
1602         size_t len;
1603
1604         /*
1605          * First reads the fixed-size header to determine the number
1606          * of snapshots, then re-reads it, along with all snapshot
1607          * records as well as their stored names.
1608          */
1609         len = sizeof (*dh);
1610         while (1) {
1611                 dh = kmalloc(len, GFP_KERNEL);
1612                 if (!dh)
1613                         return -ENOMEM;
1614
1615                 rc = rbd_req_sync_read(rbd_dev,
1616                                        CEPH_NOSNAP,
1617                                        rbd_dev->header_name,
1618                                        0, len,
1619                                        (char *)dh, &ver);
1620                 if (rc < 0)
1621                         goto out_dh;
1622
1623                 rc = rbd_header_from_disk(header, dh, snap_count);
1624                 if (rc < 0) {
1625                         if (rc == -ENXIO)
1626                                 pr_warning("unrecognized header format"
1627                                            " for image %s\n",
1628                                            rbd_dev->image_name);
1629                         goto out_dh;
1630                 }
1631
1632                 if (snap_count == header->total_snaps)
1633                         break;
1634
1635                 snap_count = header->total_snaps;
1636                 len = sizeof (*dh) +
1637                         snap_count * sizeof(struct rbd_image_snap_ondisk) +
1638                         header->snap_names_len;
1639
1640                 rbd_header_free(header);
1641                 kfree(dh);
1642         }
1643         header->obj_version = ver;
1644
1645 out_dh:
1646         kfree(dh);
1647         return rc;
1648 }
1649
1650 /*
1651  * create a snapshot
1652  */
1653 static int rbd_header_add_snap(struct rbd_device *rbd_dev,
1654                                const char *snap_name,
1655                                gfp_t gfp_flags)
1656 {
1657         int name_len = strlen(snap_name);
1658         u64 new_snapid;
1659         int ret;
1660         void *data, *p, *e;
1661         struct ceph_mon_client *monc;
1662
1663         /* we should create a snapshot only if we're pointing at the head */
1664         if (rbd_dev->snap_id != CEPH_NOSNAP)
1665                 return -EINVAL;
1666
1667         monc = &rbd_dev->rbd_client->client->monc;
1668         ret = ceph_monc_create_snapid(monc, rbd_dev->pool_id, &new_snapid);
1669         dout("created snapid=%llu\n", (unsigned long long) new_snapid);
1670         if (ret < 0)
1671                 return ret;
1672
1673         data = kmalloc(name_len + 16, gfp_flags);
1674         if (!data)
1675                 return -ENOMEM;
1676
1677         p = data;
1678         e = data + name_len + 16;
1679
1680         ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
1681         ceph_encode_64_safe(&p, e, new_snapid, bad);
1682
1683         ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
1684                                 "rbd", "snap_add",
1685                                 data, p - data, NULL);
1686
1687         kfree(data);
1688
1689         return ret < 0 ? ret : 0;
1690 bad:
1691         return -ERANGE;
1692 }
1693
1694 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1695 {
1696         struct rbd_snap *snap;
1697         struct rbd_snap *next;
1698
1699         list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
1700                 __rbd_remove_snap_dev(snap);
1701 }
1702
1703 /*
1704  * only read the first part of the ondisk header, without the snaps info
1705  */
1706 static int __rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
1707 {
1708         int ret;
1709         struct rbd_image_header h;
1710
1711         ret = rbd_read_header(rbd_dev, &h);
1712         if (ret < 0)
1713                 return ret;
1714
1715         down_write(&rbd_dev->header_rwsem);
1716
1717         /* resized? */
1718         if (rbd_dev->snap_id == CEPH_NOSNAP) {
1719                 sector_t size = (sector_t) h.image_size / SECTOR_SIZE;
1720
1721                 dout("setting size to %llu sectors", (unsigned long long) size);
1722                 set_capacity(rbd_dev->disk, size);
1723         }
1724
1725         /* rbd_dev->header.object_prefix shouldn't change */
1726         kfree(rbd_dev->header.snap_sizes);
1727         kfree(rbd_dev->header.snap_names);
1728         /* osd requests may still refer to snapc */
1729         ceph_put_snap_context(rbd_dev->header.snapc);
1730
1731         if (hver)
1732                 *hver = h.obj_version;
1733         rbd_dev->header.obj_version = h.obj_version;
1734         rbd_dev->header.image_size = h.image_size;
1735         rbd_dev->header.total_snaps = h.total_snaps;
1736         rbd_dev->header.snapc = h.snapc;
1737         rbd_dev->header.snap_names = h.snap_names;
1738         rbd_dev->header.snap_names_len = h.snap_names_len;
1739         rbd_dev->header.snap_sizes = h.snap_sizes;
1740         /* Free the extra copy of the object prefix */
1741         WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1742         kfree(h.object_prefix);
1743
1744         ret = __rbd_init_snaps_header(rbd_dev);
1745
1746         up_write(&rbd_dev->header_rwsem);
1747
1748         return ret;
1749 }
1750
1751 static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
1752 {
1753         int ret;
1754
1755         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1756         ret = __rbd_refresh_header(rbd_dev, hver);
1757         mutex_unlock(&ctl_mutex);
1758
1759         return ret;
1760 }
1761
1762 static int rbd_init_disk(struct rbd_device *rbd_dev)
1763 {
1764         struct gendisk *disk;
1765         struct request_queue *q;
1766         int rc;
1767         u64 segment_size;
1768         u64 total_size = 0;
1769
1770         /* contact OSD, request size info about the object being mapped */
1771         rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1772         if (rc)
1773                 return rc;
1774
1775         /* no need to lock here, as rbd_dev is not registered yet */
1776         rc = __rbd_init_snaps_header(rbd_dev);
1777         if (rc)
1778                 return rc;
1779
1780         rc = rbd_header_set_snap(rbd_dev, &total_size);
1781         if (rc)
1782                 return rc;
1783
1784         /* create gendisk info */
1785         rc = -ENOMEM;
1786         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1787         if (!disk)
1788                 goto out;
1789
1790         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1791                  rbd_dev->dev_id);
1792         disk->major = rbd_dev->major;
1793         disk->first_minor = 0;
1794         disk->fops = &rbd_bd_ops;
1795         disk->private_data = rbd_dev;
1796
1797         /* init rq */
1798         rc = -ENOMEM;
1799         q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1800         if (!q)
1801                 goto out_disk;
1802
1803         /* We use the default size, but let's be explicit about it. */
1804         blk_queue_physical_block_size(q, SECTOR_SIZE);
1805
1806         /* set io sizes to object size */
1807         segment_size = rbd_obj_bytes(&rbd_dev->header);
1808         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1809         blk_queue_max_segment_size(q, segment_size);
1810         blk_queue_io_min(q, segment_size);
1811         blk_queue_io_opt(q, segment_size);
1812
1813         blk_queue_merge_bvec(q, rbd_merge_bvec);
1814         disk->queue = q;
1815
1816         q->queuedata = rbd_dev;
1817
1818         rbd_dev->disk = disk;
1819         rbd_dev->q = q;
1820
1821         /* finally, announce the disk to the world */
1822         set_capacity(disk, total_size / SECTOR_SIZE);
1823         add_disk(disk);
1824
1825         pr_info("%s: added with size 0x%llx\n",
1826                 disk->disk_name, (unsigned long long)total_size);
1827         return 0;
1828
1829 out_disk:
1830         put_disk(disk);
1831 out:
1832         return rc;
1833 }
1834
1835 /*
1836   sysfs
1837 */
1838
1839 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1840 {
1841         return container_of(dev, struct rbd_device, dev);
1842 }
1843
1844 static ssize_t rbd_size_show(struct device *dev,
1845                              struct device_attribute *attr, char *buf)
1846 {
1847         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1848         sector_t size;
1849
1850         down_read(&rbd_dev->header_rwsem);
1851         size = get_capacity(rbd_dev->disk);
1852         up_read(&rbd_dev->header_rwsem);
1853
1854         return sprintf(buf, "%llu\n", (unsigned long long) size * SECTOR_SIZE);
1855 }
1856
1857 static ssize_t rbd_major_show(struct device *dev,
1858                               struct device_attribute *attr, char *buf)
1859 {
1860         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1861
1862         return sprintf(buf, "%d\n", rbd_dev->major);
1863 }
1864
1865 static ssize_t rbd_client_id_show(struct device *dev,
1866                                   struct device_attribute *attr, char *buf)
1867 {
1868         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1869
1870         return sprintf(buf, "client%lld\n",
1871                         ceph_client_id(rbd_dev->rbd_client->client));
1872 }
1873
1874 static ssize_t rbd_pool_show(struct device *dev,
1875                              struct device_attribute *attr, char *buf)
1876 {
1877         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1878
1879         return sprintf(buf, "%s\n", rbd_dev->pool_name);
1880 }
1881
1882 static ssize_t rbd_pool_id_show(struct device *dev,
1883                              struct device_attribute *attr, char *buf)
1884 {
1885         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1886
1887         return sprintf(buf, "%d\n", rbd_dev->pool_id);
1888 }
1889
1890 static ssize_t rbd_name_show(struct device *dev,
1891                              struct device_attribute *attr, char *buf)
1892 {
1893         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1894
1895         return sprintf(buf, "%s\n", rbd_dev->image_name);
1896 }
1897
1898 static ssize_t rbd_snap_show(struct device *dev,
1899                              struct device_attribute *attr,
1900                              char *buf)
1901 {
1902         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1903
1904         return sprintf(buf, "%s\n", rbd_dev->snap_name);
1905 }
1906
1907 static ssize_t rbd_image_refresh(struct device *dev,
1908                                  struct device_attribute *attr,
1909                                  const char *buf,
1910                                  size_t size)
1911 {
1912         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1913         int ret;
1914
1915         ret = rbd_refresh_header(rbd_dev, NULL);
1916
1917         return ret < 0 ? ret : size;
1918 }
1919
1920 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1921 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1922 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1923 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1924 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
1925 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1926 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1927 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1928 static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
1929
1930 static struct attribute *rbd_attrs[] = {
1931         &dev_attr_size.attr,
1932         &dev_attr_major.attr,
1933         &dev_attr_client_id.attr,
1934         &dev_attr_pool.attr,
1935         &dev_attr_pool_id.attr,
1936         &dev_attr_name.attr,
1937         &dev_attr_current_snap.attr,
1938         &dev_attr_refresh.attr,
1939         &dev_attr_create_snap.attr,
1940         NULL
1941 };
1942
1943 static struct attribute_group rbd_attr_group = {
1944         .attrs = rbd_attrs,
1945 };
1946
1947 static const struct attribute_group *rbd_attr_groups[] = {
1948         &rbd_attr_group,
1949         NULL
1950 };
1951
1952 static void rbd_sysfs_dev_release(struct device *dev)
1953 {
1954 }
1955
1956 static struct device_type rbd_device_type = {
1957         .name           = "rbd",
1958         .groups         = rbd_attr_groups,
1959         .release        = rbd_sysfs_dev_release,
1960 };
1961
1962
1963 /*
1964   sysfs - snapshots
1965 */
1966
1967 static ssize_t rbd_snap_size_show(struct device *dev,
1968                                   struct device_attribute *attr,
1969                                   char *buf)
1970 {
1971         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1972
1973         return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
1974 }
1975
1976 static ssize_t rbd_snap_id_show(struct device *dev,
1977                                 struct device_attribute *attr,
1978                                 char *buf)
1979 {
1980         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1981
1982         return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
1983 }
1984
1985 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
1986 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
1987
1988 static struct attribute *rbd_snap_attrs[] = {
1989         &dev_attr_snap_size.attr,
1990         &dev_attr_snap_id.attr,
1991         NULL,
1992 };
1993
1994 static struct attribute_group rbd_snap_attr_group = {
1995         .attrs = rbd_snap_attrs,
1996 };
1997
1998 static void rbd_snap_dev_release(struct device *dev)
1999 {
2000         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
2001         kfree(snap->name);
2002         kfree(snap);
2003 }
2004
2005 static const struct attribute_group *rbd_snap_attr_groups[] = {
2006         &rbd_snap_attr_group,
2007         NULL
2008 };
2009
2010 static struct device_type rbd_snap_device_type = {
2011         .groups         = rbd_snap_attr_groups,
2012         .release        = rbd_snap_dev_release,
2013 };
2014
2015 static void __rbd_remove_snap_dev(struct rbd_snap *snap)
2016 {
2017         list_del(&snap->node);
2018         device_unregister(&snap->dev);
2019 }
2020
2021 static int rbd_register_snap_dev(struct rbd_snap *snap,
2022                                   struct device *parent)
2023 {
2024         struct device *dev = &snap->dev;
2025         int ret;
2026
2027         dev->type = &rbd_snap_device_type;
2028         dev->parent = parent;
2029         dev->release = rbd_snap_dev_release;
2030         dev_set_name(dev, "snap_%s", snap->name);
2031         ret = device_register(dev);
2032
2033         return ret;
2034 }
2035
2036 static struct rbd_snap *__rbd_add_snap_dev(struct rbd_device *rbd_dev,
2037                                               int i, const char *name)
2038 {
2039         struct rbd_snap *snap;
2040         int ret;
2041
2042         snap = kzalloc(sizeof (*snap), GFP_KERNEL);
2043         if (!snap)
2044                 return ERR_PTR(-ENOMEM);
2045
2046         ret = -ENOMEM;
2047         snap->name = kstrdup(name, GFP_KERNEL);
2048         if (!snap->name)
2049                 goto err;
2050
2051         snap->size = rbd_dev->header.snap_sizes[i];
2052         snap->id = rbd_dev->header.snapc->snaps[i];
2053         if (device_is_registered(&rbd_dev->dev)) {
2054                 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2055                 if (ret < 0)
2056                         goto err;
2057         }
2058
2059         return snap;
2060
2061 err:
2062         kfree(snap->name);
2063         kfree(snap);
2064
2065         return ERR_PTR(ret);
2066 }
2067
2068 /*
2069  * search for the previous snap in a null delimited string list
2070  */
2071 const char *rbd_prev_snap_name(const char *name, const char *start)
2072 {
2073         if (name < start + 2)
2074                 return NULL;
2075
2076         name -= 2;
2077         while (*name) {
2078                 if (name == start)
2079                         return start;
2080                 name--;
2081         }
2082         return name + 1;
2083 }
2084
2085 /*
2086  * compare the old list of snapshots that we have to what's in the header
2087  * and update it accordingly. Note that the header holds the snapshots
2088  * in a reverse order (from newest to oldest) and we need to go from
2089  * older to new so that we don't get a duplicate snap name when
2090  * doing the process (e.g., removed snapshot and recreated a new
2091  * one with the same name.
2092  */
2093 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
2094 {
2095         const char *name, *first_name;
2096         int i = rbd_dev->header.total_snaps;
2097         struct rbd_snap *snap, *old_snap = NULL;
2098         struct list_head *p, *n;
2099
2100         first_name = rbd_dev->header.snap_names;
2101         name = first_name + rbd_dev->header.snap_names_len;
2102
2103         list_for_each_prev_safe(p, n, &rbd_dev->snaps) {
2104                 u64 cur_id;
2105
2106                 old_snap = list_entry(p, struct rbd_snap, node);
2107
2108                 if (i)
2109                         cur_id = rbd_dev->header.snapc->snaps[i - 1];
2110
2111                 if (!i || old_snap->id < cur_id) {
2112                         /*
2113                          * old_snap->id was skipped, thus was
2114                          * removed.  If this rbd_dev is mapped to
2115                          * the removed snapshot, record that it no
2116                          * longer exists, to prevent further I/O.
2117                          */
2118                         if (rbd_dev->snap_id == old_snap->id)
2119                                 rbd_dev->snap_exists = false;
2120                         __rbd_remove_snap_dev(old_snap);
2121                         continue;
2122                 }
2123                 if (old_snap->id == cur_id) {
2124                         /* we have this snapshot already */
2125                         i--;
2126                         name = rbd_prev_snap_name(name, first_name);
2127                         continue;
2128                 }
2129                 for (; i > 0;
2130                      i--, name = rbd_prev_snap_name(name, first_name)) {
2131                         if (!name) {
2132                                 WARN_ON(1);
2133                                 return -EINVAL;
2134                         }
2135                         cur_id = rbd_dev->header.snapc->snaps[i];
2136                         /* snapshot removal? handle it above */
2137                         if (cur_id >= old_snap->id)
2138                                 break;
2139                         /* a new snapshot */
2140                         snap = __rbd_add_snap_dev(rbd_dev, i - 1, name);
2141                         if (IS_ERR(snap))
2142                                 return PTR_ERR(snap);
2143
2144                         /* note that we add it backward so using n and not p */
2145                         list_add(&snap->node, n);
2146                         p = &snap->node;
2147                 }
2148         }
2149         /* we're done going over the old snap list, just add what's left */
2150         for (; i > 0; i--) {
2151                 name = rbd_prev_snap_name(name, first_name);
2152                 if (!name) {
2153                         WARN_ON(1);
2154                         return -EINVAL;
2155                 }
2156                 snap = __rbd_add_snap_dev(rbd_dev, i - 1, name);
2157                 if (IS_ERR(snap))
2158                         return PTR_ERR(snap);
2159                 list_add(&snap->node, &rbd_dev->snaps);
2160         }
2161
2162         return 0;
2163 }
2164
2165 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2166 {
2167         int ret;
2168         struct device *dev;
2169         struct rbd_snap *snap;
2170
2171         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2172         dev = &rbd_dev->dev;
2173
2174         dev->bus = &rbd_bus_type;
2175         dev->type = &rbd_device_type;
2176         dev->parent = &rbd_root_dev;
2177         dev->release = rbd_dev_release;
2178         dev_set_name(dev, "%d", rbd_dev->dev_id);
2179         ret = device_register(dev);
2180         if (ret < 0)
2181                 goto out;
2182
2183         list_for_each_entry(snap, &rbd_dev->snaps, node) {
2184                 ret = rbd_register_snap_dev(snap, &rbd_dev->dev);
2185                 if (ret < 0)
2186                         break;
2187         }
2188 out:
2189         mutex_unlock(&ctl_mutex);
2190         return ret;
2191 }
2192
2193 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2194 {
2195         device_unregister(&rbd_dev->dev);
2196 }
2197
2198 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2199 {
2200         int ret, rc;
2201
2202         do {
2203                 ret = rbd_req_sync_watch(rbd_dev);
2204                 if (ret == -ERANGE) {
2205                         rc = rbd_refresh_header(rbd_dev, NULL);
2206                         if (rc < 0)
2207                                 return rc;
2208                 }
2209         } while (ret == -ERANGE);
2210
2211         return ret;
2212 }
2213
2214 static atomic64_t rbd_id_max = ATOMIC64_INIT(0);
2215
2216 /*
2217  * Get a unique rbd identifier for the given new rbd_dev, and add
2218  * the rbd_dev to the global list.  The minimum rbd id is 1.
2219  */
2220 static void rbd_id_get(struct rbd_device *rbd_dev)
2221 {
2222         rbd_dev->dev_id = atomic64_inc_return(&rbd_id_max);
2223
2224         spin_lock(&rbd_dev_list_lock);
2225         list_add_tail(&rbd_dev->node, &rbd_dev_list);
2226         spin_unlock(&rbd_dev_list_lock);
2227 }
2228
2229 /*
2230  * Remove an rbd_dev from the global list, and record that its
2231  * identifier is no longer in use.
2232  */
2233 static void rbd_id_put(struct rbd_device *rbd_dev)
2234 {
2235         struct list_head *tmp;
2236         int rbd_id = rbd_dev->dev_id;
2237         int max_id;
2238
2239         BUG_ON(rbd_id < 1);
2240
2241         spin_lock(&rbd_dev_list_lock);
2242         list_del_init(&rbd_dev->node);
2243
2244         /*
2245          * If the id being "put" is not the current maximum, there
2246          * is nothing special we need to do.
2247          */
2248         if (rbd_id != atomic64_read(&rbd_id_max)) {
2249                 spin_unlock(&rbd_dev_list_lock);
2250                 return;
2251         }
2252
2253         /*
2254          * We need to update the current maximum id.  Search the
2255          * list to find out what it is.  We're more likely to find
2256          * the maximum at the end, so search the list backward.
2257          */
2258         max_id = 0;
2259         list_for_each_prev(tmp, &rbd_dev_list) {
2260                 struct rbd_device *rbd_dev;
2261
2262                 rbd_dev = list_entry(tmp, struct rbd_device, node);
2263                 if (rbd_id > max_id)
2264                         max_id = rbd_id;
2265         }
2266         spin_unlock(&rbd_dev_list_lock);
2267
2268         /*
2269          * The max id could have been updated by rbd_id_get(), in
2270          * which case it now accurately reflects the new maximum.
2271          * Be careful not to overwrite the maximum value in that
2272          * case.
2273          */
2274         atomic64_cmpxchg(&rbd_id_max, rbd_id, max_id);
2275 }
2276
2277 /*
2278  * Skips over white space at *buf, and updates *buf to point to the
2279  * first found non-space character (if any). Returns the length of
2280  * the token (string of non-white space characters) found.  Note
2281  * that *buf must be terminated with '\0'.
2282  */
2283 static inline size_t next_token(const char **buf)
2284 {
2285         /*
2286         * These are the characters that produce nonzero for
2287         * isspace() in the "C" and "POSIX" locales.
2288         */
2289         const char *spaces = " \f\n\r\t\v";
2290
2291         *buf += strspn(*buf, spaces);   /* Find start of token */
2292
2293         return strcspn(*buf, spaces);   /* Return token length */
2294 }
2295
2296 /*
2297  * Finds the next token in *buf, and if the provided token buffer is
2298  * big enough, copies the found token into it.  The result, if
2299  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
2300  * must be terminated with '\0' on entry.
2301  *
2302  * Returns the length of the token found (not including the '\0').
2303  * Return value will be 0 if no token is found, and it will be >=
2304  * token_size if the token would not fit.
2305  *
2306  * The *buf pointer will be updated to point beyond the end of the
2307  * found token.  Note that this occurs even if the token buffer is
2308  * too small to hold it.
2309  */
2310 static inline size_t copy_token(const char **buf,
2311                                 char *token,
2312                                 size_t token_size)
2313 {
2314         size_t len;
2315
2316         len = next_token(buf);
2317         if (len < token_size) {
2318                 memcpy(token, *buf, len);
2319                 *(token + len) = '\0';
2320         }
2321         *buf += len;
2322
2323         return len;
2324 }
2325
2326 /*
2327  * Finds the next token in *buf, dynamically allocates a buffer big
2328  * enough to hold a copy of it, and copies the token into the new
2329  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
2330  * that a duplicate buffer is created even for a zero-length token.
2331  *
2332  * Returns a pointer to the newly-allocated duplicate, or a null
2333  * pointer if memory for the duplicate was not available.  If
2334  * the lenp argument is a non-null pointer, the length of the token
2335  * (not including the '\0') is returned in *lenp.
2336  *
2337  * If successful, the *buf pointer will be updated to point beyond
2338  * the end of the found token.
2339  *
2340  * Note: uses GFP_KERNEL for allocation.
2341  */
2342 static inline char *dup_token(const char **buf, size_t *lenp)
2343 {
2344         char *dup;
2345         size_t len;
2346
2347         len = next_token(buf);
2348         dup = kmalloc(len + 1, GFP_KERNEL);
2349         if (!dup)
2350                 return NULL;
2351
2352         memcpy(dup, *buf, len);
2353         *(dup + len) = '\0';
2354         *buf += len;
2355
2356         if (lenp)
2357                 *lenp = len;
2358
2359         return dup;
2360 }
2361
2362 /*
2363  * This fills in the pool_name, image_name, image_name_len, snap_name,
2364  * rbd_dev, rbd_md_name, and name fields of the given rbd_dev, based
2365  * on the list of monitor addresses and other options provided via
2366  * /sys/bus/rbd/add.
2367  *
2368  * Note: rbd_dev is assumed to have been initially zero-filled.
2369  */
2370 static int rbd_add_parse_args(struct rbd_device *rbd_dev,
2371                               const char *buf,
2372                               const char **mon_addrs,
2373                               size_t *mon_addrs_size,
2374                               char *options,
2375                              size_t options_size)
2376 {
2377         size_t len;
2378         int ret;
2379
2380         /* The first four tokens are required */
2381
2382         len = next_token(&buf);
2383         if (!len)
2384                 return -EINVAL;
2385         *mon_addrs_size = len + 1;
2386         *mon_addrs = buf;
2387
2388         buf += len;
2389
2390         len = copy_token(&buf, options, options_size);
2391         if (!len || len >= options_size)
2392                 return -EINVAL;
2393
2394         ret = -ENOMEM;
2395         rbd_dev->pool_name = dup_token(&buf, NULL);
2396         if (!rbd_dev->pool_name)
2397                 goto out_err;
2398
2399         rbd_dev->image_name = dup_token(&buf, &rbd_dev->image_name_len);
2400         if (!rbd_dev->image_name)
2401                 goto out_err;
2402
2403         /* Create the name of the header object */
2404
2405         rbd_dev->header_name = kmalloc(rbd_dev->image_name_len
2406                                                 + sizeof (RBD_SUFFIX),
2407                                         GFP_KERNEL);
2408         if (!rbd_dev->header_name)
2409                 goto out_err;
2410         sprintf(rbd_dev->header_name, "%s%s", rbd_dev->image_name, RBD_SUFFIX);
2411
2412         /*
2413          * The snapshot name is optional.  If none is is supplied,
2414          * we use the default value.
2415          */
2416         rbd_dev->snap_name = dup_token(&buf, &len);
2417         if (!rbd_dev->snap_name)
2418                 goto out_err;
2419         if (!len) {
2420                 /* Replace the empty name with the default */
2421                 kfree(rbd_dev->snap_name);
2422                 rbd_dev->snap_name
2423                         = kmalloc(sizeof (RBD_SNAP_HEAD_NAME), GFP_KERNEL);
2424                 if (!rbd_dev->snap_name)
2425                         goto out_err;
2426
2427                 memcpy(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
2428                         sizeof (RBD_SNAP_HEAD_NAME));
2429         }
2430
2431         return 0;
2432
2433 out_err:
2434         kfree(rbd_dev->header_name);
2435         kfree(rbd_dev->image_name);
2436         kfree(rbd_dev->pool_name);
2437         rbd_dev->pool_name = NULL;
2438
2439         return ret;
2440 }
2441
2442 static ssize_t rbd_add(struct bus_type *bus,
2443                        const char *buf,
2444                        size_t count)
2445 {
2446         char *options;
2447         struct rbd_device *rbd_dev = NULL;
2448         const char *mon_addrs = NULL;
2449         size_t mon_addrs_size = 0;
2450         struct ceph_osd_client *osdc;
2451         int rc = -ENOMEM;
2452
2453         if (!try_module_get(THIS_MODULE))
2454                 return -ENODEV;
2455
2456         options = kmalloc(count, GFP_KERNEL);
2457         if (!options)
2458                 goto err_nomem;
2459         rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2460         if (!rbd_dev)
2461                 goto err_nomem;
2462
2463         /* static rbd_device initialization */
2464         spin_lock_init(&rbd_dev->lock);
2465         INIT_LIST_HEAD(&rbd_dev->node);
2466         INIT_LIST_HEAD(&rbd_dev->snaps);
2467         init_rwsem(&rbd_dev->header_rwsem);
2468
2469         /* generate unique id: find highest unique id, add one */
2470         rbd_id_get(rbd_dev);
2471
2472         /* Fill in the device name, now that we have its id. */
2473         BUILD_BUG_ON(DEV_NAME_LEN
2474                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
2475         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
2476
2477         /* parse add command */
2478         rc = rbd_add_parse_args(rbd_dev, buf, &mon_addrs, &mon_addrs_size,
2479                                 options, count);
2480         if (rc)
2481                 goto err_put_id;
2482
2483         rbd_dev->rbd_client = rbd_get_client(mon_addrs, mon_addrs_size - 1,
2484                                                 options);
2485         if (IS_ERR(rbd_dev->rbd_client)) {
2486                 rc = PTR_ERR(rbd_dev->rbd_client);
2487                 goto err_put_id;
2488         }
2489
2490         /* pick the pool */
2491         osdc = &rbd_dev->rbd_client->client->osdc;
2492         rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2493         if (rc < 0)
2494                 goto err_out_client;
2495         rbd_dev->pool_id = rc;
2496
2497         /* register our block device */
2498         rc = register_blkdev(0, rbd_dev->name);
2499         if (rc < 0)
2500                 goto err_out_client;
2501         rbd_dev->major = rc;
2502
2503         rc = rbd_bus_add_dev(rbd_dev);
2504         if (rc)
2505                 goto err_out_blkdev;
2506
2507         /*
2508          * At this point cleanup in the event of an error is the job
2509          * of the sysfs code (initiated by rbd_bus_del_dev()).
2510          *
2511          * Set up and announce blkdev mapping.
2512          */
2513         rc = rbd_init_disk(rbd_dev);
2514         if (rc)
2515                 goto err_out_bus;
2516
2517         rc = rbd_init_watch_dev(rbd_dev);
2518         if (rc)
2519                 goto err_out_bus;
2520
2521         return count;
2522
2523 err_out_bus:
2524         /* this will also clean up rest of rbd_dev stuff */
2525
2526         rbd_bus_del_dev(rbd_dev);
2527         kfree(options);
2528         return rc;
2529
2530 err_out_blkdev:
2531         unregister_blkdev(rbd_dev->major, rbd_dev->name);
2532 err_out_client:
2533         rbd_put_client(rbd_dev);
2534 err_put_id:
2535         if (rbd_dev->pool_name) {
2536                 kfree(rbd_dev->snap_name);
2537                 kfree(rbd_dev->header_name);
2538                 kfree(rbd_dev->image_name);
2539                 kfree(rbd_dev->pool_name);
2540         }
2541         rbd_id_put(rbd_dev);
2542 err_nomem:
2543         kfree(rbd_dev);
2544         kfree(options);
2545
2546         dout("Error adding device %s\n", buf);
2547         module_put(THIS_MODULE);
2548
2549         return (ssize_t) rc;
2550 }
2551
2552 static struct rbd_device *__rbd_get_dev(unsigned long dev_id)
2553 {
2554         struct list_head *tmp;
2555         struct rbd_device *rbd_dev;
2556
2557         spin_lock(&rbd_dev_list_lock);
2558         list_for_each(tmp, &rbd_dev_list) {
2559                 rbd_dev = list_entry(tmp, struct rbd_device, node);
2560                 if (rbd_dev->dev_id == dev_id) {
2561                         spin_unlock(&rbd_dev_list_lock);
2562                         return rbd_dev;
2563                 }
2564         }
2565         spin_unlock(&rbd_dev_list_lock);
2566         return NULL;
2567 }
2568
2569 static void rbd_dev_release(struct device *dev)
2570 {
2571         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2572
2573         if (rbd_dev->watch_request) {
2574                 struct ceph_client *client = rbd_dev->rbd_client->client;
2575
2576                 ceph_osdc_unregister_linger_request(&client->osdc,
2577                                                     rbd_dev->watch_request);
2578         }
2579         if (rbd_dev->watch_event)
2580                 rbd_req_sync_unwatch(rbd_dev);
2581
2582         rbd_put_client(rbd_dev);
2583
2584         /* clean up and free blkdev */
2585         rbd_free_disk(rbd_dev);
2586         unregister_blkdev(rbd_dev->major, rbd_dev->name);
2587
2588         /* done with the id, and with the rbd_dev */
2589         kfree(rbd_dev->snap_name);
2590         kfree(rbd_dev->header_name);
2591         kfree(rbd_dev->pool_name);
2592         kfree(rbd_dev->image_name);
2593         rbd_id_put(rbd_dev);
2594         kfree(rbd_dev);
2595
2596         /* release module ref */
2597         module_put(THIS_MODULE);
2598 }
2599
2600 static ssize_t rbd_remove(struct bus_type *bus,
2601                           const char *buf,
2602                           size_t count)
2603 {
2604         struct rbd_device *rbd_dev = NULL;
2605         int target_id, rc;
2606         unsigned long ul;
2607         int ret = count;
2608
2609         rc = strict_strtoul(buf, 10, &ul);
2610         if (rc)
2611                 return rc;
2612
2613         /* convert to int; abort if we lost anything in the conversion */
2614         target_id = (int) ul;
2615         if (target_id != ul)
2616                 return -EINVAL;
2617
2618         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2619
2620         rbd_dev = __rbd_get_dev(target_id);
2621         if (!rbd_dev) {
2622                 ret = -ENOENT;
2623                 goto done;
2624         }
2625
2626         __rbd_remove_all_snaps(rbd_dev);
2627         rbd_bus_del_dev(rbd_dev);
2628
2629 done:
2630         mutex_unlock(&ctl_mutex);
2631         return ret;
2632 }
2633
2634 static ssize_t rbd_snap_add(struct device *dev,
2635                             struct device_attribute *attr,
2636                             const char *buf,
2637                             size_t count)
2638 {
2639         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2640         int ret;
2641         char *name = kmalloc(count + 1, GFP_KERNEL);
2642         if (!name)
2643                 return -ENOMEM;
2644
2645         snprintf(name, count, "%s", buf);
2646
2647         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2648
2649         ret = rbd_header_add_snap(rbd_dev,
2650                                   name, GFP_KERNEL);
2651         if (ret < 0)
2652                 goto err_unlock;
2653
2654         ret = __rbd_refresh_header(rbd_dev, NULL);
2655         if (ret < 0)
2656                 goto err_unlock;
2657
2658         /* shouldn't hold ctl_mutex when notifying.. notify might
2659            trigger a watch callback that would need to get that mutex */
2660         mutex_unlock(&ctl_mutex);
2661
2662         /* make a best effort, don't error if failed */
2663         rbd_req_sync_notify(rbd_dev);
2664
2665         ret = count;
2666         kfree(name);
2667         return ret;
2668
2669 err_unlock:
2670         mutex_unlock(&ctl_mutex);
2671         kfree(name);
2672         return ret;
2673 }
2674
2675 /*
2676  * create control files in sysfs
2677  * /sys/bus/rbd/...
2678  */
2679 static int rbd_sysfs_init(void)
2680 {
2681         int ret;
2682
2683         ret = device_register(&rbd_root_dev);
2684         if (ret < 0)
2685                 return ret;
2686
2687         ret = bus_register(&rbd_bus_type);
2688         if (ret < 0)
2689                 device_unregister(&rbd_root_dev);
2690
2691         return ret;
2692 }
2693
2694 static void rbd_sysfs_cleanup(void)
2695 {
2696         bus_unregister(&rbd_bus_type);
2697         device_unregister(&rbd_root_dev);
2698 }
2699
2700 int __init rbd_init(void)
2701 {
2702         int rc;
2703
2704         rc = rbd_sysfs_init();
2705         if (rc)
2706                 return rc;
2707         pr_info("loaded " RBD_DRV_NAME_LONG "\n");
2708         return 0;
2709 }
2710
2711 void __exit rbd_exit(void)
2712 {
2713         rbd_sysfs_cleanup();
2714 }
2715
2716 module_init(rbd_init);
2717 module_exit(rbd_exit);
2718
2719 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2720 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2721 MODULE_DESCRIPTION("rados block device");
2722
2723 /* following authorship retained from original osdblk.c */
2724 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2725
2726 MODULE_LICENSE("GPL");