rbd: dynamically allocate snapshot name
[cascardo/linux.git] / drivers / block / rbd.c
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3
4
5    based on drivers/block/osdblk.c:
6
7    Copyright 2009 Red Hat, Inc.
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24    For usage instructions, please refer to:
25
26                  Documentation/ABI/testing/sysfs-bus-rbd
27
28  */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 /*
45  * The basic unit of block I/O is a sector.  It is interpreted in a
46  * number of contexts in Linux (blk, bio, genhd), but the default is
47  * universally 512 bytes.  These symbols are just slightly more
48  * meaningful than the bare numbers they represent.
49  */
50 #define SECTOR_SHIFT    9
51 #define SECTOR_SIZE     (1ULL << SECTOR_SHIFT)
52
53 #define RBD_DRV_NAME "rbd"
54 #define RBD_DRV_NAME_LONG "rbd (rados block device)"
55
56 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
57
58 #define RBD_MAX_SNAP_NAME_LEN   32
59 #define RBD_MAX_OPT_LEN         1024
60
61 #define RBD_SNAP_HEAD_NAME      "-"
62
63 /*
64  * An RBD device name will be "rbd#", where the "rbd" comes from
65  * RBD_DRV_NAME above, and # is a unique integer identifier.
66  * MAX_INT_FORMAT_WIDTH is used in ensuring DEV_NAME_LEN is big
67  * enough to hold all possible device names.
68  */
69 #define DEV_NAME_LEN            32
70 #define MAX_INT_FORMAT_WIDTH    ((5 * sizeof (int)) / 2 + 1)
71
72 #define RBD_NOTIFY_TIMEOUT_DEFAULT 10
73
74 /*
75  * block device image metadata (in-memory version)
76  */
77 struct rbd_image_header {
78         u64 image_size;
79         char *object_prefix;
80         __u8 obj_order;
81         __u8 crypt_type;
82         __u8 comp_type;
83         struct ceph_snap_context *snapc;
84         size_t snap_names_len;
85         u64 snap_seq;
86         u32 total_snaps;
87
88         char *snap_names;
89         u64 *snap_sizes;
90
91         u64 obj_version;
92 };
93
94 struct rbd_options {
95         int     notify_timeout;
96 };
97
98 /*
99  * an instance of the client.  multiple devices may share an rbd client.
100  */
101 struct rbd_client {
102         struct ceph_client      *client;
103         struct rbd_options      *rbd_opts;
104         struct kref             kref;
105         struct list_head        node;
106 };
107
108 /*
109  * a request completion status
110  */
111 struct rbd_req_status {
112         int done;
113         int rc;
114         u64 bytes;
115 };
116
117 /*
118  * a collection of requests
119  */
120 struct rbd_req_coll {
121         int                     total;
122         int                     num_done;
123         struct kref             kref;
124         struct rbd_req_status   status[0];
125 };
126
127 /*
128  * a single io request
129  */
130 struct rbd_request {
131         struct request          *rq;            /* blk layer request */
132         struct bio              *bio;           /* cloned bio */
133         struct page             **pages;        /* list of used pages */
134         u64                     len;
135         int                     coll_index;
136         struct rbd_req_coll     *coll;
137 };
138
139 struct rbd_snap {
140         struct  device          dev;
141         const char              *name;
142         u64                     size;
143         struct list_head        node;
144         u64                     id;
145 };
146
147 /*
148  * a single device
149  */
150 struct rbd_device {
151         int                     id;             /* blkdev unique id */
152
153         int                     major;          /* blkdev assigned major */
154         struct gendisk          *disk;          /* blkdev's gendisk and rq */
155         struct request_queue    *q;
156
157         struct rbd_client       *rbd_client;
158
159         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
160
161         spinlock_t              lock;           /* queue lock */
162
163         struct rbd_image_header header;
164         char                    *obj; /* rbd image name */
165         size_t                  obj_len;
166         char                    *obj_md_name; /* hdr nm. */
167         char                    *pool_name;
168         int                     pool_id;
169
170         struct ceph_osd_event   *watch_event;
171         struct ceph_osd_request *watch_request;
172
173         /* protects updating the header */
174         struct rw_semaphore     header_rwsem;
175         char                    *snap_name;
176         u64                     snap_id;        /* current snapshot id */
177         int read_only;
178
179         struct list_head        node;
180
181         /* list of snapshots */
182         struct list_head        snaps;
183
184         /* sysfs related */
185         struct device           dev;
186 };
187
188 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
189
190 static LIST_HEAD(rbd_dev_list);    /* devices */
191 static DEFINE_SPINLOCK(rbd_dev_list_lock);
192
193 static LIST_HEAD(rbd_client_list);              /* clients */
194 static DEFINE_SPINLOCK(rbd_client_list_lock);
195
196 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
197 static void rbd_dev_release(struct device *dev);
198 static ssize_t rbd_snap_add(struct device *dev,
199                             struct device_attribute *attr,
200                             const char *buf,
201                             size_t count);
202 static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
203                                   struct rbd_snap *snap);
204
205 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
206                        size_t count);
207 static ssize_t rbd_remove(struct bus_type *bus, const char *buf,
208                           size_t count);
209
210 static struct bus_attribute rbd_bus_attrs[] = {
211         __ATTR(add, S_IWUSR, NULL, rbd_add),
212         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
213         __ATTR_NULL
214 };
215
216 static struct bus_type rbd_bus_type = {
217         .name           = "rbd",
218         .bus_attrs      = rbd_bus_attrs,
219 };
220
221 static void rbd_root_dev_release(struct device *dev)
222 {
223 }
224
225 static struct device rbd_root_dev = {
226         .init_name =    "rbd",
227         .release =      rbd_root_dev_release,
228 };
229
230
231 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
232 {
233         return get_device(&rbd_dev->dev);
234 }
235
236 static void rbd_put_dev(struct rbd_device *rbd_dev)
237 {
238         put_device(&rbd_dev->dev);
239 }
240
241 static int __rbd_refresh_header(struct rbd_device *rbd_dev);
242
243 static int rbd_open(struct block_device *bdev, fmode_t mode)
244 {
245         struct rbd_device *rbd_dev = bdev->bd_disk->private_data;
246
247         rbd_get_dev(rbd_dev);
248
249         set_device_ro(bdev, rbd_dev->read_only);
250
251         if ((mode & FMODE_WRITE) && rbd_dev->read_only)
252                 return -EROFS;
253
254         return 0;
255 }
256
257 static int rbd_release(struct gendisk *disk, fmode_t mode)
258 {
259         struct rbd_device *rbd_dev = disk->private_data;
260
261         rbd_put_dev(rbd_dev);
262
263         return 0;
264 }
265
266 static const struct block_device_operations rbd_bd_ops = {
267         .owner                  = THIS_MODULE,
268         .open                   = rbd_open,
269         .release                = rbd_release,
270 };
271
272 /*
273  * Initialize an rbd client instance.
274  * We own *opt.
275  */
276 static struct rbd_client *rbd_client_create(struct ceph_options *opt,
277                                             struct rbd_options *rbd_opts)
278 {
279         struct rbd_client *rbdc;
280         int ret = -ENOMEM;
281
282         dout("rbd_client_create\n");
283         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
284         if (!rbdc)
285                 goto out_opt;
286
287         kref_init(&rbdc->kref);
288         INIT_LIST_HEAD(&rbdc->node);
289
290         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
291
292         rbdc->client = ceph_create_client(opt, rbdc, 0, 0);
293         if (IS_ERR(rbdc->client))
294                 goto out_mutex;
295         opt = NULL; /* Now rbdc->client is responsible for opt */
296
297         ret = ceph_open_session(rbdc->client);
298         if (ret < 0)
299                 goto out_err;
300
301         rbdc->rbd_opts = rbd_opts;
302
303         spin_lock(&rbd_client_list_lock);
304         list_add_tail(&rbdc->node, &rbd_client_list);
305         spin_unlock(&rbd_client_list_lock);
306
307         mutex_unlock(&ctl_mutex);
308
309         dout("rbd_client_create created %p\n", rbdc);
310         return rbdc;
311
312 out_err:
313         ceph_destroy_client(rbdc->client);
314 out_mutex:
315         mutex_unlock(&ctl_mutex);
316         kfree(rbdc);
317 out_opt:
318         if (opt)
319                 ceph_destroy_options(opt);
320         return ERR_PTR(ret);
321 }
322
323 /*
324  * Find a ceph client with specific addr and configuration.
325  */
326 static struct rbd_client *__rbd_client_find(struct ceph_options *opt)
327 {
328         struct rbd_client *client_node;
329
330         if (opt->flags & CEPH_OPT_NOSHARE)
331                 return NULL;
332
333         list_for_each_entry(client_node, &rbd_client_list, node)
334                 if (ceph_compare_options(opt, client_node->client) == 0)
335                         return client_node;
336         return NULL;
337 }
338
339 /*
340  * mount options
341  */
342 enum {
343         Opt_notify_timeout,
344         Opt_last_int,
345         /* int args above */
346         Opt_last_string,
347         /* string args above */
348 };
349
350 static match_table_t rbdopt_tokens = {
351         {Opt_notify_timeout, "notify_timeout=%d"},
352         /* int args above */
353         /* string args above */
354         {-1, NULL}
355 };
356
357 static int parse_rbd_opts_token(char *c, void *private)
358 {
359         struct rbd_options *rbdopt = private;
360         substring_t argstr[MAX_OPT_ARGS];
361         int token, intval, ret;
362
363         token = match_token(c, rbdopt_tokens, argstr);
364         if (token < 0)
365                 return -EINVAL;
366
367         if (token < Opt_last_int) {
368                 ret = match_int(&argstr[0], &intval);
369                 if (ret < 0) {
370                         pr_err("bad mount option arg (not int) "
371                                "at '%s'\n", c);
372                         return ret;
373                 }
374                 dout("got int token %d val %d\n", token, intval);
375         } else if (token > Opt_last_int && token < Opt_last_string) {
376                 dout("got string token %d val %s\n", token,
377                      argstr[0].from);
378         } else {
379                 dout("got token %d\n", token);
380         }
381
382         switch (token) {
383         case Opt_notify_timeout:
384                 rbdopt->notify_timeout = intval;
385                 break;
386         default:
387                 BUG_ON(token);
388         }
389         return 0;
390 }
391
392 /*
393  * Get a ceph client with specific addr and configuration, if one does
394  * not exist create it.
395  */
396 static struct rbd_client *rbd_get_client(const char *mon_addr,
397                                          size_t mon_addr_len,
398                                          char *options)
399 {
400         struct rbd_client *rbdc;
401         struct ceph_options *opt;
402         struct rbd_options *rbd_opts;
403
404         rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
405         if (!rbd_opts)
406                 return ERR_PTR(-ENOMEM);
407
408         rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
409
410         opt = ceph_parse_options(options, mon_addr,
411                                 mon_addr + mon_addr_len,
412                                 parse_rbd_opts_token, rbd_opts);
413         if (IS_ERR(opt)) {
414                 kfree(rbd_opts);
415                 return ERR_CAST(opt);
416         }
417
418         spin_lock(&rbd_client_list_lock);
419         rbdc = __rbd_client_find(opt);
420         if (rbdc) {
421                 /* using an existing client */
422                 kref_get(&rbdc->kref);
423                 spin_unlock(&rbd_client_list_lock);
424
425                 ceph_destroy_options(opt);
426                 kfree(rbd_opts);
427
428                 return rbdc;
429         }
430         spin_unlock(&rbd_client_list_lock);
431
432         rbdc = rbd_client_create(opt, rbd_opts);
433
434         if (IS_ERR(rbdc))
435                 kfree(rbd_opts);
436
437         return rbdc;
438 }
439
440 /*
441  * Destroy ceph client
442  *
443  * Caller must hold rbd_client_list_lock.
444  */
445 static void rbd_client_release(struct kref *kref)
446 {
447         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
448
449         dout("rbd_release_client %p\n", rbdc);
450         spin_lock(&rbd_client_list_lock);
451         list_del(&rbdc->node);
452         spin_unlock(&rbd_client_list_lock);
453
454         ceph_destroy_client(rbdc->client);
455         kfree(rbdc->rbd_opts);
456         kfree(rbdc);
457 }
458
459 /*
460  * Drop reference to ceph client node. If it's not referenced anymore, release
461  * it.
462  */
463 static void rbd_put_client(struct rbd_device *rbd_dev)
464 {
465         kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
466         rbd_dev->rbd_client = NULL;
467 }
468
469 /*
470  * Destroy requests collection
471  */
472 static void rbd_coll_release(struct kref *kref)
473 {
474         struct rbd_req_coll *coll =
475                 container_of(kref, struct rbd_req_coll, kref);
476
477         dout("rbd_coll_release %p\n", coll);
478         kfree(coll);
479 }
480
481 /*
482  * Create a new header structure, translate header format from the on-disk
483  * header.
484  */
485 static int rbd_header_from_disk(struct rbd_image_header *header,
486                                  struct rbd_image_header_ondisk *ondisk,
487                                  u32 allocated_snaps,
488                                  gfp_t gfp_flags)
489 {
490         u32 i, snap_count;
491
492         if (memcmp(ondisk, RBD_HEADER_TEXT, sizeof(RBD_HEADER_TEXT)))
493                 return -ENXIO;
494
495         snap_count = le32_to_cpu(ondisk->snap_count);
496         if (snap_count > (UINT_MAX - sizeof(struct ceph_snap_context))
497                          / sizeof (*ondisk))
498                 return -EINVAL;
499         header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
500                                 snap_count * sizeof(u64),
501                                 gfp_flags);
502         if (!header->snapc)
503                 return -ENOMEM;
504
505         header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
506         if (snap_count) {
507                 header->snap_names = kmalloc(header->snap_names_len,
508                                              gfp_flags);
509                 if (!header->snap_names)
510                         goto err_snapc;
511                 header->snap_sizes = kmalloc(snap_count * sizeof(u64),
512                                              gfp_flags);
513                 if (!header->snap_sizes)
514                         goto err_names;
515         } else {
516                 header->snap_names = NULL;
517                 header->snap_sizes = NULL;
518         }
519
520         header->object_prefix = kmalloc(sizeof (ondisk->block_name) + 1,
521                                         gfp_flags);
522         if (!header->object_prefix)
523                 goto err_sizes;
524
525         memcpy(header->object_prefix, ondisk->block_name,
526                sizeof(ondisk->block_name));
527         header->object_prefix[sizeof (ondisk->block_name)] = '\0';
528
529         header->image_size = le64_to_cpu(ondisk->image_size);
530         header->obj_order = ondisk->options.order;
531         header->crypt_type = ondisk->options.crypt_type;
532         header->comp_type = ondisk->options.comp_type;
533
534         atomic_set(&header->snapc->nref, 1);
535         header->snap_seq = le64_to_cpu(ondisk->snap_seq);
536         header->snapc->num_snaps = snap_count;
537         header->total_snaps = snap_count;
538
539         if (snap_count && allocated_snaps == snap_count) {
540                 for (i = 0; i < snap_count; i++) {
541                         header->snapc->snaps[i] =
542                                 le64_to_cpu(ondisk->snaps[i].id);
543                         header->snap_sizes[i] =
544                                 le64_to_cpu(ondisk->snaps[i].image_size);
545                 }
546
547                 /* copy snapshot names */
548                 memcpy(header->snap_names, &ondisk->snaps[i],
549                         header->snap_names_len);
550         }
551
552         return 0;
553
554 err_sizes:
555         kfree(header->snap_sizes);
556 err_names:
557         kfree(header->snap_names);
558 err_snapc:
559         kfree(header->snapc);
560         return -ENOMEM;
561 }
562
563 static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
564                         u64 *seq, u64 *size)
565 {
566         int i;
567         char *p = header->snap_names;
568
569         for (i = 0; i < header->total_snaps; i++) {
570                 if (!strcmp(snap_name, p)) {
571
572                         /* Found it.  Pass back its id and/or size */
573
574                         if (seq)
575                                 *seq = header->snapc->snaps[i];
576                         if (size)
577                                 *size = header->snap_sizes[i];
578                         return i;
579                 }
580                 p += strlen(p) + 1;     /* Skip ahead to the next name */
581         }
582         return -ENOENT;
583 }
584
585 static int rbd_header_set_snap(struct rbd_device *dev, u64 *size)
586 {
587         struct rbd_image_header *header = &dev->header;
588         struct ceph_snap_context *snapc = header->snapc;
589         int ret = -ENOENT;
590
591         down_write(&dev->header_rwsem);
592
593         if (!memcmp(dev->snap_name, RBD_SNAP_HEAD_NAME,
594                     sizeof (RBD_SNAP_HEAD_NAME))) {
595                 if (header->total_snaps)
596                         snapc->seq = header->snap_seq;
597                 else
598                         snapc->seq = 0;
599                 dev->snap_id = CEPH_NOSNAP;
600                 dev->read_only = 0;
601                 if (size)
602                         *size = header->image_size;
603         } else {
604                 ret = snap_by_name(header, dev->snap_name, &snapc->seq, size);
605                 if (ret < 0)
606                         goto done;
607                 dev->snap_id = snapc->seq;
608                 dev->read_only = 1;
609         }
610
611         ret = 0;
612 done:
613         up_write(&dev->header_rwsem);
614         return ret;
615 }
616
617 static void rbd_header_free(struct rbd_image_header *header)
618 {
619         kfree(header->object_prefix);
620         kfree(header->snap_sizes);
621         kfree(header->snap_names);
622         kfree(header->snapc);
623 }
624
625 /*
626  * get the actual striped segment name, offset and length
627  */
628 static u64 rbd_get_segment(struct rbd_image_header *header,
629                            const char *object_prefix,
630                            u64 ofs, u64 len,
631                            char *seg_name, u64 *segofs)
632 {
633         u64 seg = ofs >> header->obj_order;
634
635         if (seg_name)
636                 snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
637                          "%s.%012llx", object_prefix, seg);
638
639         ofs = ofs & ((1 << header->obj_order) - 1);
640         len = min_t(u64, len, (1 << header->obj_order) - ofs);
641
642         if (segofs)
643                 *segofs = ofs;
644
645         return len;
646 }
647
648 static int rbd_get_num_segments(struct rbd_image_header *header,
649                                 u64 ofs, u64 len)
650 {
651         u64 start_seg = ofs >> header->obj_order;
652         u64 end_seg = (ofs + len - 1) >> header->obj_order;
653         return end_seg - start_seg + 1;
654 }
655
656 /*
657  * returns the size of an object in the image
658  */
659 static u64 rbd_obj_bytes(struct rbd_image_header *header)
660 {
661         return 1 << header->obj_order;
662 }
663
664 /*
665  * bio helpers
666  */
667
668 static void bio_chain_put(struct bio *chain)
669 {
670         struct bio *tmp;
671
672         while (chain) {
673                 tmp = chain;
674                 chain = chain->bi_next;
675                 bio_put(tmp);
676         }
677 }
678
679 /*
680  * zeros a bio chain, starting at specific offset
681  */
682 static void zero_bio_chain(struct bio *chain, int start_ofs)
683 {
684         struct bio_vec *bv;
685         unsigned long flags;
686         void *buf;
687         int i;
688         int pos = 0;
689
690         while (chain) {
691                 bio_for_each_segment(bv, chain, i) {
692                         if (pos + bv->bv_len > start_ofs) {
693                                 int remainder = max(start_ofs - pos, 0);
694                                 buf = bvec_kmap_irq(bv, &flags);
695                                 memset(buf + remainder, 0,
696                                        bv->bv_len - remainder);
697                                 bvec_kunmap_irq(buf, &flags);
698                         }
699                         pos += bv->bv_len;
700                 }
701
702                 chain = chain->bi_next;
703         }
704 }
705
706 /*
707  * bio_chain_clone - clone a chain of bios up to a certain length.
708  * might return a bio_pair that will need to be released.
709  */
710 static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
711                                    struct bio_pair **bp,
712                                    int len, gfp_t gfpmask)
713 {
714         struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
715         int total = 0;
716
717         if (*bp) {
718                 bio_pair_release(*bp);
719                 *bp = NULL;
720         }
721
722         while (old_chain && (total < len)) {
723                 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
724                 if (!tmp)
725                         goto err_out;
726
727                 if (total + old_chain->bi_size > len) {
728                         struct bio_pair *bp;
729
730                         /*
731                          * this split can only happen with a single paged bio,
732                          * split_bio will BUG_ON if this is not the case
733                          */
734                         dout("bio_chain_clone split! total=%d remaining=%d"
735                              "bi_size=%d\n",
736                              (int)total, (int)len-total,
737                              (int)old_chain->bi_size);
738
739                         /* split the bio. We'll release it either in the next
740                            call, or it will have to be released outside */
741                         bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
742                         if (!bp)
743                                 goto err_out;
744
745                         __bio_clone(tmp, &bp->bio1);
746
747                         *next = &bp->bio2;
748                 } else {
749                         __bio_clone(tmp, old_chain);
750                         *next = old_chain->bi_next;
751                 }
752
753                 tmp->bi_bdev = NULL;
754                 gfpmask &= ~__GFP_WAIT;
755                 tmp->bi_next = NULL;
756
757                 if (!new_chain) {
758                         new_chain = tail = tmp;
759                 } else {
760                         tail->bi_next = tmp;
761                         tail = tmp;
762                 }
763                 old_chain = old_chain->bi_next;
764
765                 total += tmp->bi_size;
766         }
767
768         BUG_ON(total < len);
769
770         if (tail)
771                 tail->bi_next = NULL;
772
773         *old = old_chain;
774
775         return new_chain;
776
777 err_out:
778         dout("bio_chain_clone with err\n");
779         bio_chain_put(new_chain);
780         return NULL;
781 }
782
783 /*
784  * helpers for osd request op vectors.
785  */
786 static int rbd_create_rw_ops(struct ceph_osd_req_op **ops,
787                             int num_ops,
788                             int opcode,
789                             u32 payload_len)
790 {
791         *ops = kzalloc(sizeof(struct ceph_osd_req_op) * (num_ops + 1),
792                        GFP_NOIO);
793         if (!*ops)
794                 return -ENOMEM;
795         (*ops)[0].op = opcode;
796         /*
797          * op extent offset and length will be set later on
798          * in calc_raw_layout()
799          */
800         (*ops)[0].payload_len = payload_len;
801         return 0;
802 }
803
804 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
805 {
806         kfree(ops);
807 }
808
809 static void rbd_coll_end_req_index(struct request *rq,
810                                    struct rbd_req_coll *coll,
811                                    int index,
812                                    int ret, u64 len)
813 {
814         struct request_queue *q;
815         int min, max, i;
816
817         dout("rbd_coll_end_req_index %p index %d ret %d len %lld\n",
818              coll, index, ret, len);
819
820         if (!rq)
821                 return;
822
823         if (!coll) {
824                 blk_end_request(rq, ret, len);
825                 return;
826         }
827
828         q = rq->q;
829
830         spin_lock_irq(q->queue_lock);
831         coll->status[index].done = 1;
832         coll->status[index].rc = ret;
833         coll->status[index].bytes = len;
834         max = min = coll->num_done;
835         while (max < coll->total && coll->status[max].done)
836                 max++;
837
838         for (i = min; i<max; i++) {
839                 __blk_end_request(rq, coll->status[i].rc,
840                                   coll->status[i].bytes);
841                 coll->num_done++;
842                 kref_put(&coll->kref, rbd_coll_release);
843         }
844         spin_unlock_irq(q->queue_lock);
845 }
846
847 static void rbd_coll_end_req(struct rbd_request *req,
848                              int ret, u64 len)
849 {
850         rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
851 }
852
853 /*
854  * Send ceph osd request
855  */
856 static int rbd_do_request(struct request *rq,
857                           struct rbd_device *dev,
858                           struct ceph_snap_context *snapc,
859                           u64 snapid,
860                           const char *obj, u64 ofs, u64 len,
861                           struct bio *bio,
862                           struct page **pages,
863                           int num_pages,
864                           int flags,
865                           struct ceph_osd_req_op *ops,
866                           int num_reply,
867                           struct rbd_req_coll *coll,
868                           int coll_index,
869                           void (*rbd_cb)(struct ceph_osd_request *req,
870                                          struct ceph_msg *msg),
871                           struct ceph_osd_request **linger_req,
872                           u64 *ver)
873 {
874         struct ceph_osd_request *req;
875         struct ceph_file_layout *layout;
876         int ret;
877         u64 bno;
878         struct timespec mtime = CURRENT_TIME;
879         struct rbd_request *req_data;
880         struct ceph_osd_request_head *reqhead;
881         struct ceph_osd_client *osdc;
882
883         req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
884         if (!req_data) {
885                 if (coll)
886                         rbd_coll_end_req_index(rq, coll, coll_index,
887                                                -ENOMEM, len);
888                 return -ENOMEM;
889         }
890
891         if (coll) {
892                 req_data->coll = coll;
893                 req_data->coll_index = coll_index;
894         }
895
896         dout("rbd_do_request obj=%s ofs=%lld len=%lld\n", obj, len, ofs);
897
898         down_read(&dev->header_rwsem);
899
900         osdc = &dev->rbd_client->client->osdc;
901         req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
902                                         false, GFP_NOIO, pages, bio);
903         if (!req) {
904                 up_read(&dev->header_rwsem);
905                 ret = -ENOMEM;
906                 goto done_pages;
907         }
908
909         req->r_callback = rbd_cb;
910
911         req_data->rq = rq;
912         req_data->bio = bio;
913         req_data->pages = pages;
914         req_data->len = len;
915
916         req->r_priv = req_data;
917
918         reqhead = req->r_request->front.iov_base;
919         reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
920
921         strncpy(req->r_oid, obj, sizeof(req->r_oid));
922         req->r_oid_len = strlen(req->r_oid);
923
924         layout = &req->r_file_layout;
925         memset(layout, 0, sizeof(*layout));
926         layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
927         layout->fl_stripe_count = cpu_to_le32(1);
928         layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
929         layout->fl_pg_pool = cpu_to_le32(dev->pool_id);
930         ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
931                                 req, ops);
932
933         ceph_osdc_build_request(req, ofs, &len,
934                                 ops,
935                                 snapc,
936                                 &mtime,
937                                 req->r_oid, req->r_oid_len);
938         up_read(&dev->header_rwsem);
939
940         if (linger_req) {
941                 ceph_osdc_set_request_linger(osdc, req);
942                 *linger_req = req;
943         }
944
945         ret = ceph_osdc_start_request(osdc, req, false);
946         if (ret < 0)
947                 goto done_err;
948
949         if (!rbd_cb) {
950                 ret = ceph_osdc_wait_request(osdc, req);
951                 if (ver)
952                         *ver = le64_to_cpu(req->r_reassert_version.version);
953                 dout("reassert_ver=%lld\n",
954                      le64_to_cpu(req->r_reassert_version.version));
955                 ceph_osdc_put_request(req);
956         }
957         return ret;
958
959 done_err:
960         bio_chain_put(req_data->bio);
961         ceph_osdc_put_request(req);
962 done_pages:
963         rbd_coll_end_req(req_data, ret, len);
964         kfree(req_data);
965         return ret;
966 }
967
968 /*
969  * Ceph osd op callback
970  */
971 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
972 {
973         struct rbd_request *req_data = req->r_priv;
974         struct ceph_osd_reply_head *replyhead;
975         struct ceph_osd_op *op;
976         __s32 rc;
977         u64 bytes;
978         int read_op;
979
980         /* parse reply */
981         replyhead = msg->front.iov_base;
982         WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
983         op = (void *)(replyhead + 1);
984         rc = le32_to_cpu(replyhead->result);
985         bytes = le64_to_cpu(op->extent.length);
986         read_op = (le16_to_cpu(op->op) == CEPH_OSD_OP_READ);
987
988         dout("rbd_req_cb bytes=%lld readop=%d rc=%d\n", bytes, read_op, rc);
989
990         if (rc == -ENOENT && read_op) {
991                 zero_bio_chain(req_data->bio, 0);
992                 rc = 0;
993         } else if (rc == 0 && read_op && bytes < req_data->len) {
994                 zero_bio_chain(req_data->bio, bytes);
995                 bytes = req_data->len;
996         }
997
998         rbd_coll_end_req(req_data, rc, bytes);
999
1000         if (req_data->bio)
1001                 bio_chain_put(req_data->bio);
1002
1003         ceph_osdc_put_request(req);
1004         kfree(req_data);
1005 }
1006
1007 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
1008 {
1009         ceph_osdc_put_request(req);
1010 }
1011
1012 /*
1013  * Do a synchronous ceph osd operation
1014  */
1015 static int rbd_req_sync_op(struct rbd_device *dev,
1016                            struct ceph_snap_context *snapc,
1017                            u64 snapid,
1018                            int opcode,
1019                            int flags,
1020                            struct ceph_osd_req_op *orig_ops,
1021                            int num_reply,
1022                            const char *obj,
1023                            u64 ofs, u64 len,
1024                            char *buf,
1025                            struct ceph_osd_request **linger_req,
1026                            u64 *ver)
1027 {
1028         int ret;
1029         struct page **pages;
1030         int num_pages;
1031         struct ceph_osd_req_op *ops = orig_ops;
1032         u32 payload_len;
1033
1034         num_pages = calc_pages_for(ofs , len);
1035         pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1036         if (IS_ERR(pages))
1037                 return PTR_ERR(pages);
1038
1039         if (!orig_ops) {
1040                 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? len : 0);
1041                 ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1042                 if (ret < 0)
1043                         goto done;
1044
1045                 if ((flags & CEPH_OSD_FLAG_WRITE) && buf) {
1046                         ret = ceph_copy_to_page_vector(pages, buf, ofs, len);
1047                         if (ret < 0)
1048                                 goto done_ops;
1049                 }
1050         }
1051
1052         ret = rbd_do_request(NULL, dev, snapc, snapid,
1053                           obj, ofs, len, NULL,
1054                           pages, num_pages,
1055                           flags,
1056                           ops,
1057                           2,
1058                           NULL, 0,
1059                           NULL,
1060                           linger_req, ver);
1061         if (ret < 0)
1062                 goto done_ops;
1063
1064         if ((flags & CEPH_OSD_FLAG_READ) && buf)
1065                 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1066
1067 done_ops:
1068         if (!orig_ops)
1069                 rbd_destroy_ops(ops);
1070 done:
1071         ceph_release_page_vector(pages, num_pages);
1072         return ret;
1073 }
1074
1075 /*
1076  * Do an asynchronous ceph osd operation
1077  */
1078 static int rbd_do_op(struct request *rq,
1079                      struct rbd_device *rbd_dev ,
1080                      struct ceph_snap_context *snapc,
1081                      u64 snapid,
1082                      int opcode, int flags, int num_reply,
1083                      u64 ofs, u64 len,
1084                      struct bio *bio,
1085                      struct rbd_req_coll *coll,
1086                      int coll_index)
1087 {
1088         char *seg_name;
1089         u64 seg_ofs;
1090         u64 seg_len;
1091         int ret;
1092         struct ceph_osd_req_op *ops;
1093         u32 payload_len;
1094
1095         seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1096         if (!seg_name)
1097                 return -ENOMEM;
1098
1099         seg_len = rbd_get_segment(&rbd_dev->header,
1100                                   rbd_dev->header.object_prefix,
1101                                   ofs, len,
1102                                   seg_name, &seg_ofs);
1103
1104         payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1105
1106         ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1107         if (ret < 0)
1108                 goto done;
1109
1110         /* we've taken care of segment sizes earlier when we
1111            cloned the bios. We should never have a segment
1112            truncated at this point */
1113         BUG_ON(seg_len < len);
1114
1115         ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1116                              seg_name, seg_ofs, seg_len,
1117                              bio,
1118                              NULL, 0,
1119                              flags,
1120                              ops,
1121                              num_reply,
1122                              coll, coll_index,
1123                              rbd_req_cb, 0, NULL);
1124
1125         rbd_destroy_ops(ops);
1126 done:
1127         kfree(seg_name);
1128         return ret;
1129 }
1130
1131 /*
1132  * Request async osd write
1133  */
1134 static int rbd_req_write(struct request *rq,
1135                          struct rbd_device *rbd_dev,
1136                          struct ceph_snap_context *snapc,
1137                          u64 ofs, u64 len,
1138                          struct bio *bio,
1139                          struct rbd_req_coll *coll,
1140                          int coll_index)
1141 {
1142         return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1143                          CEPH_OSD_OP_WRITE,
1144                          CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1145                          2,
1146                          ofs, len, bio, coll, coll_index);
1147 }
1148
1149 /*
1150  * Request async osd read
1151  */
1152 static int rbd_req_read(struct request *rq,
1153                          struct rbd_device *rbd_dev,
1154                          u64 snapid,
1155                          u64 ofs, u64 len,
1156                          struct bio *bio,
1157                          struct rbd_req_coll *coll,
1158                          int coll_index)
1159 {
1160         return rbd_do_op(rq, rbd_dev, NULL,
1161                          snapid,
1162                          CEPH_OSD_OP_READ,
1163                          CEPH_OSD_FLAG_READ,
1164                          2,
1165                          ofs, len, bio, coll, coll_index);
1166 }
1167
1168 /*
1169  * Request sync osd read
1170  */
1171 static int rbd_req_sync_read(struct rbd_device *dev,
1172                           struct ceph_snap_context *snapc,
1173                           u64 snapid,
1174                           const char *obj,
1175                           u64 ofs, u64 len,
1176                           char *buf,
1177                           u64 *ver)
1178 {
1179         return rbd_req_sync_op(dev, NULL,
1180                                snapid,
1181                                CEPH_OSD_OP_READ,
1182                                CEPH_OSD_FLAG_READ,
1183                                NULL,
1184                                1, obj, ofs, len, buf, NULL, ver);
1185 }
1186
1187 /*
1188  * Request sync osd watch
1189  */
1190 static int rbd_req_sync_notify_ack(struct rbd_device *dev,
1191                                    u64 ver,
1192                                    u64 notify_id,
1193                                    const char *obj)
1194 {
1195         struct ceph_osd_req_op *ops;
1196         int ret;
1197
1198         ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1199         if (ret < 0)
1200                 return ret;
1201
1202         ops[0].watch.ver = cpu_to_le64(dev->header.obj_version);
1203         ops[0].watch.cookie = notify_id;
1204         ops[0].watch.flag = 0;
1205
1206         ret = rbd_do_request(NULL, dev, NULL, CEPH_NOSNAP,
1207                           obj, 0, 0, NULL,
1208                           NULL, 0,
1209                           CEPH_OSD_FLAG_READ,
1210                           ops,
1211                           1,
1212                           NULL, 0,
1213                           rbd_simple_req_cb, 0, NULL);
1214
1215         rbd_destroy_ops(ops);
1216         return ret;
1217 }
1218
1219 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1220 {
1221         struct rbd_device *dev = (struct rbd_device *)data;
1222         int rc;
1223
1224         if (!dev)
1225                 return;
1226
1227         dout("rbd_watch_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
1228                 notify_id, (int)opcode);
1229         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1230         rc = __rbd_refresh_header(dev);
1231         mutex_unlock(&ctl_mutex);
1232         if (rc)
1233                 pr_warning(RBD_DRV_NAME "%d got notification but failed to "
1234                            " update snaps: %d\n", dev->major, rc);
1235
1236         rbd_req_sync_notify_ack(dev, ver, notify_id, dev->obj_md_name);
1237 }
1238
1239 /*
1240  * Request sync osd watch
1241  */
1242 static int rbd_req_sync_watch(struct rbd_device *dev,
1243                               const char *obj,
1244                               u64 ver)
1245 {
1246         struct ceph_osd_req_op *ops;
1247         struct ceph_osd_client *osdc = &dev->rbd_client->client->osdc;
1248
1249         int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1250         if (ret < 0)
1251                 return ret;
1252
1253         ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1254                                      (void *)dev, &dev->watch_event);
1255         if (ret < 0)
1256                 goto fail;
1257
1258         ops[0].watch.ver = cpu_to_le64(ver);
1259         ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie);
1260         ops[0].watch.flag = 1;
1261
1262         ret = rbd_req_sync_op(dev, NULL,
1263                               CEPH_NOSNAP,
1264                               0,
1265                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1266                               ops,
1267                               1, obj, 0, 0, NULL,
1268                               &dev->watch_request, NULL);
1269
1270         if (ret < 0)
1271                 goto fail_event;
1272
1273         rbd_destroy_ops(ops);
1274         return 0;
1275
1276 fail_event:
1277         ceph_osdc_cancel_event(dev->watch_event);
1278         dev->watch_event = NULL;
1279 fail:
1280         rbd_destroy_ops(ops);
1281         return ret;
1282 }
1283
1284 /*
1285  * Request sync osd unwatch
1286  */
1287 static int rbd_req_sync_unwatch(struct rbd_device *dev,
1288                                 const char *obj)
1289 {
1290         struct ceph_osd_req_op *ops;
1291
1292         int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1293         if (ret < 0)
1294                 return ret;
1295
1296         ops[0].watch.ver = 0;
1297         ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie);
1298         ops[0].watch.flag = 0;
1299
1300         ret = rbd_req_sync_op(dev, NULL,
1301                               CEPH_NOSNAP,
1302                               0,
1303                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1304                               ops,
1305                               1, obj, 0, 0, NULL, NULL, NULL);
1306
1307         rbd_destroy_ops(ops);
1308         ceph_osdc_cancel_event(dev->watch_event);
1309         dev->watch_event = NULL;
1310         return ret;
1311 }
1312
1313 struct rbd_notify_info {
1314         struct rbd_device *dev;
1315 };
1316
1317 static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1318 {
1319         struct rbd_device *dev = (struct rbd_device *)data;
1320         if (!dev)
1321                 return;
1322
1323         dout("rbd_notify_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
1324                 notify_id, (int)opcode);
1325 }
1326
1327 /*
1328  * Request sync osd notify
1329  */
1330 static int rbd_req_sync_notify(struct rbd_device *dev,
1331                           const char *obj)
1332 {
1333         struct ceph_osd_req_op *ops;
1334         struct ceph_osd_client *osdc = &dev->rbd_client->client->osdc;
1335         struct ceph_osd_event *event;
1336         struct rbd_notify_info info;
1337         int payload_len = sizeof(u32) + sizeof(u32);
1338         int ret;
1339
1340         ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY, payload_len);
1341         if (ret < 0)
1342                 return ret;
1343
1344         info.dev = dev;
1345
1346         ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1347                                      (void *)&info, &event);
1348         if (ret < 0)
1349                 goto fail;
1350
1351         ops[0].watch.ver = 1;
1352         ops[0].watch.flag = 1;
1353         ops[0].watch.cookie = event->cookie;
1354         ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1355         ops[0].watch.timeout = 12;
1356
1357         ret = rbd_req_sync_op(dev, NULL,
1358                                CEPH_NOSNAP,
1359                                0,
1360                                CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1361                                ops,
1362                                1, obj, 0, 0, NULL, NULL, NULL);
1363         if (ret < 0)
1364                 goto fail_event;
1365
1366         ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1367         dout("ceph_osdc_wait_event returned %d\n", ret);
1368         rbd_destroy_ops(ops);
1369         return 0;
1370
1371 fail_event:
1372         ceph_osdc_cancel_event(event);
1373 fail:
1374         rbd_destroy_ops(ops);
1375         return ret;
1376 }
1377
1378 /*
1379  * Request sync osd read
1380  */
1381 static int rbd_req_sync_exec(struct rbd_device *dev,
1382                              const char *obj,
1383                              const char *cls,
1384                              const char *method,
1385                              const char *data,
1386                              int len,
1387                              u64 *ver)
1388 {
1389         struct ceph_osd_req_op *ops;
1390         int cls_len = strlen(cls);
1391         int method_len = strlen(method);
1392         int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_CALL,
1393                                     cls_len + method_len + len);
1394         if (ret < 0)
1395                 return ret;
1396
1397         ops[0].cls.class_name = cls;
1398         ops[0].cls.class_len = (__u8)cls_len;
1399         ops[0].cls.method_name = method;
1400         ops[0].cls.method_len = (__u8)method_len;
1401         ops[0].cls.argc = 0;
1402         ops[0].cls.indata = data;
1403         ops[0].cls.indata_len = len;
1404
1405         ret = rbd_req_sync_op(dev, NULL,
1406                                CEPH_NOSNAP,
1407                                0,
1408                                CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1409                                ops,
1410                                1, obj, 0, 0, NULL, NULL, ver);
1411
1412         rbd_destroy_ops(ops);
1413
1414         dout("cls_exec returned %d\n", ret);
1415         return ret;
1416 }
1417
1418 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1419 {
1420         struct rbd_req_coll *coll =
1421                         kzalloc(sizeof(struct rbd_req_coll) +
1422                                 sizeof(struct rbd_req_status) * num_reqs,
1423                                 GFP_ATOMIC);
1424
1425         if (!coll)
1426                 return NULL;
1427         coll->total = num_reqs;
1428         kref_init(&coll->kref);
1429         return coll;
1430 }
1431
1432 /*
1433  * block device queue callback
1434  */
1435 static void rbd_rq_fn(struct request_queue *q)
1436 {
1437         struct rbd_device *rbd_dev = q->queuedata;
1438         struct request *rq;
1439         struct bio_pair *bp = NULL;
1440
1441         while ((rq = blk_fetch_request(q))) {
1442                 struct bio *bio;
1443                 struct bio *rq_bio, *next_bio = NULL;
1444                 bool do_write;
1445                 int size, op_size = 0;
1446                 u64 ofs;
1447                 int num_segs, cur_seg = 0;
1448                 struct rbd_req_coll *coll;
1449
1450                 /* peek at request from block layer */
1451                 if (!rq)
1452                         break;
1453
1454                 dout("fetched request\n");
1455
1456                 /* filter out block requests we don't understand */
1457                 if ((rq->cmd_type != REQ_TYPE_FS)) {
1458                         __blk_end_request_all(rq, 0);
1459                         continue;
1460                 }
1461
1462                 /* deduce our operation (read, write) */
1463                 do_write = (rq_data_dir(rq) == WRITE);
1464
1465                 size = blk_rq_bytes(rq);
1466                 ofs = blk_rq_pos(rq) * SECTOR_SIZE;
1467                 rq_bio = rq->bio;
1468                 if (do_write && rbd_dev->read_only) {
1469                         __blk_end_request_all(rq, -EROFS);
1470                         continue;
1471                 }
1472
1473                 spin_unlock_irq(q->queue_lock);
1474
1475                 dout("%s 0x%x bytes at 0x%llx\n",
1476                      do_write ? "write" : "read",
1477                      size, blk_rq_pos(rq) * SECTOR_SIZE);
1478
1479                 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1480                 coll = rbd_alloc_coll(num_segs);
1481                 if (!coll) {
1482                         spin_lock_irq(q->queue_lock);
1483                         __blk_end_request_all(rq, -ENOMEM);
1484                         continue;
1485                 }
1486
1487                 do {
1488                         /* a bio clone to be passed down to OSD req */
1489                         dout("rq->bio->bi_vcnt=%d\n", rq->bio->bi_vcnt);
1490                         op_size = rbd_get_segment(&rbd_dev->header,
1491                                                   rbd_dev->header.object_prefix,
1492                                                   ofs, size,
1493                                                   NULL, NULL);
1494                         kref_get(&coll->kref);
1495                         bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1496                                               op_size, GFP_ATOMIC);
1497                         if (!bio) {
1498                                 rbd_coll_end_req_index(rq, coll, cur_seg,
1499                                                        -ENOMEM, op_size);
1500                                 goto next_seg;
1501                         }
1502
1503
1504                         /* init OSD command: write or read */
1505                         if (do_write)
1506                                 rbd_req_write(rq, rbd_dev,
1507                                               rbd_dev->header.snapc,
1508                                               ofs,
1509                                               op_size, bio,
1510                                               coll, cur_seg);
1511                         else
1512                                 rbd_req_read(rq, rbd_dev,
1513                                              rbd_dev->snap_id,
1514                                              ofs,
1515                                              op_size, bio,
1516                                              coll, cur_seg);
1517
1518 next_seg:
1519                         size -= op_size;
1520                         ofs += op_size;
1521
1522                         cur_seg++;
1523                         rq_bio = next_bio;
1524                 } while (size > 0);
1525                 kref_put(&coll->kref, rbd_coll_release);
1526
1527                 if (bp)
1528                         bio_pair_release(bp);
1529                 spin_lock_irq(q->queue_lock);
1530         }
1531 }
1532
1533 /*
1534  * a queue callback. Makes sure that we don't create a bio that spans across
1535  * multiple osd objects. One exception would be with a single page bios,
1536  * which we handle later at bio_chain_clone
1537  */
1538 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1539                           struct bio_vec *bvec)
1540 {
1541         struct rbd_device *rbd_dev = q->queuedata;
1542         unsigned int chunk_sectors;
1543         sector_t sector;
1544         unsigned int bio_sectors;
1545         int max;
1546
1547         chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
1548         sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1549         bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
1550
1551         max =  (chunk_sectors - ((sector & (chunk_sectors - 1))
1552                                  + bio_sectors)) << SECTOR_SHIFT;
1553         if (max < 0)
1554                 max = 0; /* bio_add cannot handle a negative return */
1555         if (max <= bvec->bv_len && bio_sectors == 0)
1556                 return bvec->bv_len;
1557         return max;
1558 }
1559
1560 static void rbd_free_disk(struct rbd_device *rbd_dev)
1561 {
1562         struct gendisk *disk = rbd_dev->disk;
1563
1564         if (!disk)
1565                 return;
1566
1567         rbd_header_free(&rbd_dev->header);
1568
1569         if (disk->flags & GENHD_FL_UP)
1570                 del_gendisk(disk);
1571         if (disk->queue)
1572                 blk_cleanup_queue(disk->queue);
1573         put_disk(disk);
1574 }
1575
1576 /*
1577  * reload the ondisk the header 
1578  */
1579 static int rbd_read_header(struct rbd_device *rbd_dev,
1580                            struct rbd_image_header *header)
1581 {
1582         ssize_t rc;
1583         struct rbd_image_header_ondisk *dh;
1584         u32 snap_count = 0;
1585         u64 ver;
1586         size_t len;
1587
1588         /*
1589          * First reads the fixed-size header to determine the number
1590          * of snapshots, then re-reads it, along with all snapshot
1591          * records as well as their stored names.
1592          */
1593         len = sizeof (*dh);
1594         while (1) {
1595                 dh = kmalloc(len, GFP_KERNEL);
1596                 if (!dh)
1597                         return -ENOMEM;
1598
1599                 rc = rbd_req_sync_read(rbd_dev,
1600                                        NULL, CEPH_NOSNAP,
1601                                        rbd_dev->obj_md_name,
1602                                        0, len,
1603                                        (char *)dh, &ver);
1604                 if (rc < 0)
1605                         goto out_dh;
1606
1607                 rc = rbd_header_from_disk(header, dh, snap_count, GFP_KERNEL);
1608                 if (rc < 0) {
1609                         if (rc == -ENXIO)
1610                                 pr_warning("unrecognized header format"
1611                                            " for image %s", rbd_dev->obj);
1612                         goto out_dh;
1613                 }
1614
1615                 if (snap_count == header->total_snaps)
1616                         break;
1617
1618                 snap_count = header->total_snaps;
1619                 len = sizeof (*dh) +
1620                         snap_count * sizeof(struct rbd_image_snap_ondisk) +
1621                         header->snap_names_len;
1622
1623                 rbd_header_free(header);
1624                 kfree(dh);
1625         }
1626         header->obj_version = ver;
1627
1628 out_dh:
1629         kfree(dh);
1630         return rc;
1631 }
1632
1633 /*
1634  * create a snapshot
1635  */
1636 static int rbd_header_add_snap(struct rbd_device *dev,
1637                                const char *snap_name,
1638                                gfp_t gfp_flags)
1639 {
1640         int name_len = strlen(snap_name);
1641         u64 new_snapid;
1642         int ret;
1643         void *data, *p, *e;
1644         u64 ver;
1645         struct ceph_mon_client *monc;
1646
1647         /* we should create a snapshot only if we're pointing at the head */
1648         if (dev->snap_id != CEPH_NOSNAP)
1649                 return -EINVAL;
1650
1651         monc = &dev->rbd_client->client->monc;
1652         ret = ceph_monc_create_snapid(monc, dev->pool_id, &new_snapid);
1653         dout("created snapid=%lld\n", new_snapid);
1654         if (ret < 0)
1655                 return ret;
1656
1657         data = kmalloc(name_len + 16, gfp_flags);
1658         if (!data)
1659                 return -ENOMEM;
1660
1661         p = data;
1662         e = data + name_len + 16;
1663
1664         ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
1665         ceph_encode_64_safe(&p, e, new_snapid, bad);
1666
1667         ret = rbd_req_sync_exec(dev, dev->obj_md_name, "rbd", "snap_add",
1668                                 data, p - data, &ver);
1669
1670         kfree(data);
1671
1672         if (ret < 0)
1673                 return ret;
1674
1675         down_write(&dev->header_rwsem);
1676         dev->header.snapc->seq = new_snapid;
1677         up_write(&dev->header_rwsem);
1678
1679         return 0;
1680 bad:
1681         return -ERANGE;
1682 }
1683
1684 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1685 {
1686         struct rbd_snap *snap;
1687
1688         while (!list_empty(&rbd_dev->snaps)) {
1689                 snap = list_first_entry(&rbd_dev->snaps, struct rbd_snap, node);
1690                 __rbd_remove_snap_dev(rbd_dev, snap);
1691         }
1692 }
1693
1694 /*
1695  * only read the first part of the ondisk header, without the snaps info
1696  */
1697 static int __rbd_refresh_header(struct rbd_device *rbd_dev)
1698 {
1699         int ret;
1700         struct rbd_image_header h;
1701         u64 snap_seq;
1702         int follow_seq = 0;
1703
1704         ret = rbd_read_header(rbd_dev, &h);
1705         if (ret < 0)
1706                 return ret;
1707
1708         /* resized? */
1709         set_capacity(rbd_dev->disk, h.image_size / SECTOR_SIZE);
1710
1711         down_write(&rbd_dev->header_rwsem);
1712
1713         snap_seq = rbd_dev->header.snapc->seq;
1714         if (rbd_dev->header.total_snaps &&
1715             rbd_dev->header.snapc->snaps[0] == snap_seq)
1716                 /* pointing at the head, will need to follow that
1717                    if head moves */
1718                 follow_seq = 1;
1719
1720         /* rbd_dev->header.object_prefix shouldn't change */
1721         kfree(rbd_dev->header.snap_sizes);
1722         kfree(rbd_dev->header.snap_names);
1723         kfree(rbd_dev->header.snapc);
1724
1725         rbd_dev->header.total_snaps = h.total_snaps;
1726         rbd_dev->header.snapc = h.snapc;
1727         rbd_dev->header.snap_names = h.snap_names;
1728         rbd_dev->header.snap_names_len = h.snap_names_len;
1729         rbd_dev->header.snap_sizes = h.snap_sizes;
1730         /* Free the extra copy of the object prefix */
1731         WARN_ON(strcmp(rbd_dev->header.object_prefix, h.object_prefix));
1732         kfree(h.object_prefix);
1733
1734         if (follow_seq)
1735                 rbd_dev->header.snapc->seq = rbd_dev->header.snapc->snaps[0];
1736         else
1737                 rbd_dev->header.snapc->seq = snap_seq;
1738
1739         ret = __rbd_init_snaps_header(rbd_dev);
1740
1741         up_write(&rbd_dev->header_rwsem);
1742
1743         return ret;
1744 }
1745
1746 static int rbd_init_disk(struct rbd_device *rbd_dev)
1747 {
1748         struct gendisk *disk;
1749         struct request_queue *q;
1750         int rc;
1751         u64 segment_size;
1752         u64 total_size = 0;
1753
1754         /* contact OSD, request size info about the object being mapped */
1755         rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1756         if (rc)
1757                 return rc;
1758
1759         /* no need to lock here, as rbd_dev is not registered yet */
1760         rc = __rbd_init_snaps_header(rbd_dev);
1761         if (rc)
1762                 return rc;
1763
1764         rc = rbd_header_set_snap(rbd_dev, &total_size);
1765         if (rc)
1766                 return rc;
1767
1768         /* create gendisk info */
1769         rc = -ENOMEM;
1770         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1771         if (!disk)
1772                 goto out;
1773
1774         snprintf(disk->disk_name, sizeof(disk->disk_name), RBD_DRV_NAME "%d",
1775                  rbd_dev->id);
1776         disk->major = rbd_dev->major;
1777         disk->first_minor = 0;
1778         disk->fops = &rbd_bd_ops;
1779         disk->private_data = rbd_dev;
1780
1781         /* init rq */
1782         rc = -ENOMEM;
1783         q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1784         if (!q)
1785                 goto out_disk;
1786
1787         /* We use the default size, but let's be explicit about it. */
1788         blk_queue_physical_block_size(q, SECTOR_SIZE);
1789
1790         /* set io sizes to object size */
1791         segment_size = rbd_obj_bytes(&rbd_dev->header);
1792         blk_queue_max_hw_sectors(q, segment_size / SECTOR_SIZE);
1793         blk_queue_max_segment_size(q, segment_size);
1794         blk_queue_io_min(q, segment_size);
1795         blk_queue_io_opt(q, segment_size);
1796
1797         blk_queue_merge_bvec(q, rbd_merge_bvec);
1798         disk->queue = q;
1799
1800         q->queuedata = rbd_dev;
1801
1802         rbd_dev->disk = disk;
1803         rbd_dev->q = q;
1804
1805         /* finally, announce the disk to the world */
1806         set_capacity(disk, total_size / SECTOR_SIZE);
1807         add_disk(disk);
1808
1809         pr_info("%s: added with size 0x%llx\n",
1810                 disk->disk_name, (unsigned long long)total_size);
1811         return 0;
1812
1813 out_disk:
1814         put_disk(disk);
1815 out:
1816         return rc;
1817 }
1818
1819 /*
1820   sysfs
1821 */
1822
1823 static struct rbd_device *dev_to_rbd_dev(struct device *dev)
1824 {
1825         return container_of(dev, struct rbd_device, dev);
1826 }
1827
1828 static ssize_t rbd_size_show(struct device *dev,
1829                              struct device_attribute *attr, char *buf)
1830 {
1831         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1832
1833         return sprintf(buf, "%llu\n", (unsigned long long)rbd_dev->header.image_size);
1834 }
1835
1836 static ssize_t rbd_major_show(struct device *dev,
1837                               struct device_attribute *attr, char *buf)
1838 {
1839         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1840
1841         return sprintf(buf, "%d\n", rbd_dev->major);
1842 }
1843
1844 static ssize_t rbd_client_id_show(struct device *dev,
1845                                   struct device_attribute *attr, char *buf)
1846 {
1847         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1848
1849         return sprintf(buf, "client%lld\n",
1850                         ceph_client_id(rbd_dev->rbd_client->client));
1851 }
1852
1853 static ssize_t rbd_pool_show(struct device *dev,
1854                              struct device_attribute *attr, char *buf)
1855 {
1856         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1857
1858         return sprintf(buf, "%s\n", rbd_dev->pool_name);
1859 }
1860
1861 static ssize_t rbd_pool_id_show(struct device *dev,
1862                              struct device_attribute *attr, char *buf)
1863 {
1864         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1865
1866         return sprintf(buf, "%d\n", rbd_dev->pool_id);
1867 }
1868
1869 static ssize_t rbd_name_show(struct device *dev,
1870                              struct device_attribute *attr, char *buf)
1871 {
1872         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1873
1874         return sprintf(buf, "%s\n", rbd_dev->obj);
1875 }
1876
1877 static ssize_t rbd_snap_show(struct device *dev,
1878                              struct device_attribute *attr,
1879                              char *buf)
1880 {
1881         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1882
1883         return sprintf(buf, "%s\n", rbd_dev->snap_name);
1884 }
1885
1886 static ssize_t rbd_image_refresh(struct device *dev,
1887                                  struct device_attribute *attr,
1888                                  const char *buf,
1889                                  size_t size)
1890 {
1891         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
1892         int rc;
1893         int ret = size;
1894
1895         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1896
1897         rc = __rbd_refresh_header(rbd_dev);
1898         if (rc < 0)
1899                 ret = rc;
1900
1901         mutex_unlock(&ctl_mutex);
1902         return ret;
1903 }
1904
1905 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1906 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1907 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1908 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1909 static DEVICE_ATTR(pool_id, S_IRUGO, rbd_pool_id_show, NULL);
1910 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1911 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1912 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1913 static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
1914
1915 static struct attribute *rbd_attrs[] = {
1916         &dev_attr_size.attr,
1917         &dev_attr_major.attr,
1918         &dev_attr_client_id.attr,
1919         &dev_attr_pool.attr,
1920         &dev_attr_pool_id.attr,
1921         &dev_attr_name.attr,
1922         &dev_attr_current_snap.attr,
1923         &dev_attr_refresh.attr,
1924         &dev_attr_create_snap.attr,
1925         NULL
1926 };
1927
1928 static struct attribute_group rbd_attr_group = {
1929         .attrs = rbd_attrs,
1930 };
1931
1932 static const struct attribute_group *rbd_attr_groups[] = {
1933         &rbd_attr_group,
1934         NULL
1935 };
1936
1937 static void rbd_sysfs_dev_release(struct device *dev)
1938 {
1939 }
1940
1941 static struct device_type rbd_device_type = {
1942         .name           = "rbd",
1943         .groups         = rbd_attr_groups,
1944         .release        = rbd_sysfs_dev_release,
1945 };
1946
1947
1948 /*
1949   sysfs - snapshots
1950 */
1951
1952 static ssize_t rbd_snap_size_show(struct device *dev,
1953                                   struct device_attribute *attr,
1954                                   char *buf)
1955 {
1956         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1957
1958         return sprintf(buf, "%llu\n", (unsigned long long)snap->size);
1959 }
1960
1961 static ssize_t rbd_snap_id_show(struct device *dev,
1962                                 struct device_attribute *attr,
1963                                 char *buf)
1964 {
1965         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1966
1967         return sprintf(buf, "%llu\n", (unsigned long long)snap->id);
1968 }
1969
1970 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
1971 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
1972
1973 static struct attribute *rbd_snap_attrs[] = {
1974         &dev_attr_snap_size.attr,
1975         &dev_attr_snap_id.attr,
1976         NULL,
1977 };
1978
1979 static struct attribute_group rbd_snap_attr_group = {
1980         .attrs = rbd_snap_attrs,
1981 };
1982
1983 static void rbd_snap_dev_release(struct device *dev)
1984 {
1985         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1986         kfree(snap->name);
1987         kfree(snap);
1988 }
1989
1990 static const struct attribute_group *rbd_snap_attr_groups[] = {
1991         &rbd_snap_attr_group,
1992         NULL
1993 };
1994
1995 static struct device_type rbd_snap_device_type = {
1996         .groups         = rbd_snap_attr_groups,
1997         .release        = rbd_snap_dev_release,
1998 };
1999
2000 static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
2001                                   struct rbd_snap *snap)
2002 {
2003         list_del(&snap->node);
2004         device_unregister(&snap->dev);
2005 }
2006
2007 static int rbd_register_snap_dev(struct rbd_device *rbd_dev,
2008                                   struct rbd_snap *snap,
2009                                   struct device *parent)
2010 {
2011         struct device *dev = &snap->dev;
2012         int ret;
2013
2014         dev->type = &rbd_snap_device_type;
2015         dev->parent = parent;
2016         dev->release = rbd_snap_dev_release;
2017         dev_set_name(dev, "snap_%s", snap->name);
2018         ret = device_register(dev);
2019
2020         return ret;
2021 }
2022
2023 static int __rbd_add_snap_dev(struct rbd_device *rbd_dev,
2024                               int i, const char *name,
2025                               struct rbd_snap **snapp)
2026 {
2027         int ret;
2028         struct rbd_snap *snap = kzalloc(sizeof(*snap), GFP_KERNEL);
2029         if (!snap)
2030                 return -ENOMEM;
2031         snap->name = kstrdup(name, GFP_KERNEL);
2032         snap->size = rbd_dev->header.snap_sizes[i];
2033         snap->id = rbd_dev->header.snapc->snaps[i];
2034         if (device_is_registered(&rbd_dev->dev)) {
2035                 ret = rbd_register_snap_dev(rbd_dev, snap,
2036                                              &rbd_dev->dev);
2037                 if (ret < 0)
2038                         goto err;
2039         }
2040         *snapp = snap;
2041         return 0;
2042 err:
2043         kfree(snap->name);
2044         kfree(snap);
2045         return ret;
2046 }
2047
2048 /*
2049  * search for the previous snap in a null delimited string list
2050  */
2051 const char *rbd_prev_snap_name(const char *name, const char *start)
2052 {
2053         if (name < start + 2)
2054                 return NULL;
2055
2056         name -= 2;
2057         while (*name) {
2058                 if (name == start)
2059                         return start;
2060                 name--;
2061         }
2062         return name + 1;
2063 }
2064
2065 /*
2066  * compare the old list of snapshots that we have to what's in the header
2067  * and update it accordingly. Note that the header holds the snapshots
2068  * in a reverse order (from newest to oldest) and we need to go from
2069  * older to new so that we don't get a duplicate snap name when
2070  * doing the process (e.g., removed snapshot and recreated a new
2071  * one with the same name.
2072  */
2073 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
2074 {
2075         const char *name, *first_name;
2076         int i = rbd_dev->header.total_snaps;
2077         struct rbd_snap *snap, *old_snap = NULL;
2078         int ret;
2079         struct list_head *p, *n;
2080
2081         first_name = rbd_dev->header.snap_names;
2082         name = first_name + rbd_dev->header.snap_names_len;
2083
2084         list_for_each_prev_safe(p, n, &rbd_dev->snaps) {
2085                 u64 cur_id;
2086
2087                 old_snap = list_entry(p, struct rbd_snap, node);
2088
2089                 if (i)
2090                         cur_id = rbd_dev->header.snapc->snaps[i - 1];
2091
2092                 if (!i || old_snap->id < cur_id) {
2093                         /* old_snap->id was skipped, thus was removed */
2094                         __rbd_remove_snap_dev(rbd_dev, old_snap);
2095                         continue;
2096                 }
2097                 if (old_snap->id == cur_id) {
2098                         /* we have this snapshot already */
2099                         i--;
2100                         name = rbd_prev_snap_name(name, first_name);
2101                         continue;
2102                 }
2103                 for (; i > 0;
2104                      i--, name = rbd_prev_snap_name(name, first_name)) {
2105                         if (!name) {
2106                                 WARN_ON(1);
2107                                 return -EINVAL;
2108                         }
2109                         cur_id = rbd_dev->header.snapc->snaps[i];
2110                         /* snapshot removal? handle it above */
2111                         if (cur_id >= old_snap->id)
2112                                 break;
2113                         /* a new snapshot */
2114                         ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2115                         if (ret < 0)
2116                                 return ret;
2117
2118                         /* note that we add it backward so using n and not p */
2119                         list_add(&snap->node, n);
2120                         p = &snap->node;
2121                 }
2122         }
2123         /* we're done going over the old snap list, just add what's left */
2124         for (; i > 0; i--) {
2125                 name = rbd_prev_snap_name(name, first_name);
2126                 if (!name) {
2127                         WARN_ON(1);
2128                         return -EINVAL;
2129                 }
2130                 ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2131                 if (ret < 0)
2132                         return ret;
2133                 list_add(&snap->node, &rbd_dev->snaps);
2134         }
2135
2136         return 0;
2137 }
2138
2139 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2140 {
2141         int ret;
2142         struct device *dev;
2143         struct rbd_snap *snap;
2144
2145         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2146         dev = &rbd_dev->dev;
2147
2148         dev->bus = &rbd_bus_type;
2149         dev->type = &rbd_device_type;
2150         dev->parent = &rbd_root_dev;
2151         dev->release = rbd_dev_release;
2152         dev_set_name(dev, "%d", rbd_dev->id);
2153         ret = device_register(dev);
2154         if (ret < 0)
2155                 goto out;
2156
2157         list_for_each_entry(snap, &rbd_dev->snaps, node) {
2158                 ret = rbd_register_snap_dev(rbd_dev, snap,
2159                                              &rbd_dev->dev);
2160                 if (ret < 0)
2161                         break;
2162         }
2163 out:
2164         mutex_unlock(&ctl_mutex);
2165         return ret;
2166 }
2167
2168 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2169 {
2170         device_unregister(&rbd_dev->dev);
2171 }
2172
2173 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2174 {
2175         int ret, rc;
2176
2177         do {
2178                 ret = rbd_req_sync_watch(rbd_dev, rbd_dev->obj_md_name,
2179                                          rbd_dev->header.obj_version);
2180                 if (ret == -ERANGE) {
2181                         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2182                         rc = __rbd_refresh_header(rbd_dev);
2183                         mutex_unlock(&ctl_mutex);
2184                         if (rc < 0)
2185                                 return rc;
2186                 }
2187         } while (ret == -ERANGE);
2188
2189         return ret;
2190 }
2191
2192 static atomic64_t rbd_id_max = ATOMIC64_INIT(0);
2193
2194 /*
2195  * Get a unique rbd identifier for the given new rbd_dev, and add
2196  * the rbd_dev to the global list.  The minimum rbd id is 1.
2197  */
2198 static void rbd_id_get(struct rbd_device *rbd_dev)
2199 {
2200         rbd_dev->id = atomic64_inc_return(&rbd_id_max);
2201
2202         spin_lock(&rbd_dev_list_lock);
2203         list_add_tail(&rbd_dev->node, &rbd_dev_list);
2204         spin_unlock(&rbd_dev_list_lock);
2205 }
2206
2207 /*
2208  * Remove an rbd_dev from the global list, and record that its
2209  * identifier is no longer in use.
2210  */
2211 static void rbd_id_put(struct rbd_device *rbd_dev)
2212 {
2213         struct list_head *tmp;
2214         int rbd_id = rbd_dev->id;
2215         int max_id;
2216
2217         BUG_ON(rbd_id < 1);
2218
2219         spin_lock(&rbd_dev_list_lock);
2220         list_del_init(&rbd_dev->node);
2221
2222         /*
2223          * If the id being "put" is not the current maximum, there
2224          * is nothing special we need to do.
2225          */
2226         if (rbd_id != atomic64_read(&rbd_id_max)) {
2227                 spin_unlock(&rbd_dev_list_lock);
2228                 return;
2229         }
2230
2231         /*
2232          * We need to update the current maximum id.  Search the
2233          * list to find out what it is.  We're more likely to find
2234          * the maximum at the end, so search the list backward.
2235          */
2236         max_id = 0;
2237         list_for_each_prev(tmp, &rbd_dev_list) {
2238                 struct rbd_device *rbd_dev;
2239
2240                 rbd_dev = list_entry(tmp, struct rbd_device, node);
2241                 if (rbd_id > max_id)
2242                         max_id = rbd_id;
2243         }
2244         spin_unlock(&rbd_dev_list_lock);
2245
2246         /*
2247          * The max id could have been updated by rbd_id_get(), in
2248          * which case it now accurately reflects the new maximum.
2249          * Be careful not to overwrite the maximum value in that
2250          * case.
2251          */
2252         atomic64_cmpxchg(&rbd_id_max, rbd_id, max_id);
2253 }
2254
2255 /*
2256  * Skips over white space at *buf, and updates *buf to point to the
2257  * first found non-space character (if any). Returns the length of
2258  * the token (string of non-white space characters) found.  Note
2259  * that *buf must be terminated with '\0'.
2260  */
2261 static inline size_t next_token(const char **buf)
2262 {
2263         /*
2264         * These are the characters that produce nonzero for
2265         * isspace() in the "C" and "POSIX" locales.
2266         */
2267         const char *spaces = " \f\n\r\t\v";
2268
2269         *buf += strspn(*buf, spaces);   /* Find start of token */
2270
2271         return strcspn(*buf, spaces);   /* Return token length */
2272 }
2273
2274 /*
2275  * Finds the next token in *buf, and if the provided token buffer is
2276  * big enough, copies the found token into it.  The result, if
2277  * copied, is guaranteed to be terminated with '\0'.  Note that *buf
2278  * must be terminated with '\0' on entry.
2279  *
2280  * Returns the length of the token found (not including the '\0').
2281  * Return value will be 0 if no token is found, and it will be >=
2282  * token_size if the token would not fit.
2283  *
2284  * The *buf pointer will be updated to point beyond the end of the
2285  * found token.  Note that this occurs even if the token buffer is
2286  * too small to hold it.
2287  */
2288 static inline size_t copy_token(const char **buf,
2289                                 char *token,
2290                                 size_t token_size)
2291 {
2292         size_t len;
2293
2294         len = next_token(buf);
2295         if (len < token_size) {
2296                 memcpy(token, *buf, len);
2297                 *(token + len) = '\0';
2298         }
2299         *buf += len;
2300
2301         return len;
2302 }
2303
2304 /*
2305  * Finds the next token in *buf, dynamically allocates a buffer big
2306  * enough to hold a copy of it, and copies the token into the new
2307  * buffer.  The copy is guaranteed to be terminated with '\0'.  Note
2308  * that a duplicate buffer is created even for a zero-length token.
2309  *
2310  * Returns a pointer to the newly-allocated duplicate, or a null
2311  * pointer if memory for the duplicate was not available.  If
2312  * the lenp argument is a non-null pointer, the length of the token
2313  * (not including the '\0') is returned in *lenp.
2314  *
2315  * If successful, the *buf pointer will be updated to point beyond
2316  * the end of the found token.
2317  *
2318  * Note: uses GFP_KERNEL for allocation.
2319  */
2320 static inline char *dup_token(const char **buf, size_t *lenp)
2321 {
2322         char *dup;
2323         size_t len;
2324
2325         len = next_token(buf);
2326         dup = kmalloc(len + 1, GFP_KERNEL);
2327         if (!dup)
2328                 return NULL;
2329
2330         memcpy(dup, *buf, len);
2331         *(dup + len) = '\0';
2332         *buf += len;
2333
2334         if (lenp)
2335                 *lenp = len;
2336
2337         return dup;
2338 }
2339
2340 /*
2341  * This fills in the pool_name, obj, obj_len, snap_name, obj_len,
2342  * rbd_dev, rbd_md_name, and name fields of the given rbd_dev, based
2343  * on the list of monitor addresses and other options provided via
2344  * /sys/bus/rbd/add.
2345  *
2346  * Note: rbd_dev is assumed to have been initially zero-filled.
2347  */
2348 static int rbd_add_parse_args(struct rbd_device *rbd_dev,
2349                               const char *buf,
2350                               const char **mon_addrs,
2351                               size_t *mon_addrs_size,
2352                               char *options,
2353                               size_t options_size)
2354 {
2355         size_t len;
2356         int ret;
2357
2358         /* The first four tokens are required */
2359
2360         len = next_token(&buf);
2361         if (!len)
2362                 return -EINVAL;
2363         *mon_addrs_size = len + 1;
2364         *mon_addrs = buf;
2365
2366         buf += len;
2367
2368         len = copy_token(&buf, options, options_size);
2369         if (!len || len >= options_size)
2370                 return -EINVAL;
2371
2372         ret = -ENOMEM;
2373         rbd_dev->pool_name = dup_token(&buf, NULL);
2374         if (!rbd_dev->pool_name)
2375                 goto out_err;
2376
2377         rbd_dev->obj = dup_token(&buf, &rbd_dev->obj_len);
2378         if (!rbd_dev->obj)
2379                 goto out_err;
2380
2381         /* Create the name of the header object */
2382
2383         rbd_dev->obj_md_name = kmalloc(rbd_dev->obj_len
2384                                                 + sizeof (RBD_SUFFIX),
2385                                         GFP_KERNEL);
2386         if (!rbd_dev->obj_md_name)
2387                 goto out_err;
2388         sprintf(rbd_dev->obj_md_name, "%s%s", rbd_dev->obj, RBD_SUFFIX);
2389
2390         /*
2391          * The snapshot name is optional.  If none is is supplied,
2392          * we use the default value.
2393          */
2394         rbd_dev->snap_name = dup_token(&buf, &len);
2395         if (!rbd_dev->snap_name)
2396                 goto out_err;
2397         if (!len) {
2398                 /* Replace the empty name with the default */
2399                 kfree(rbd_dev->snap_name);
2400                 rbd_dev->snap_name
2401                         = kmalloc(sizeof (RBD_SNAP_HEAD_NAME), GFP_KERNEL);
2402                 if (!rbd_dev->snap_name)
2403                         goto out_err;
2404
2405                 memcpy(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
2406                         sizeof (RBD_SNAP_HEAD_NAME));
2407         }
2408
2409         return 0;
2410
2411 out_err:
2412         kfree(rbd_dev->obj_md_name);
2413         kfree(rbd_dev->obj);
2414         kfree(rbd_dev->pool_name);
2415         rbd_dev->pool_name = NULL;
2416
2417         return ret;
2418 }
2419
2420 static ssize_t rbd_add(struct bus_type *bus,
2421                        const char *buf,
2422                        size_t count)
2423 {
2424         char *options;
2425         struct rbd_device *rbd_dev = NULL;
2426         const char *mon_addrs = NULL;
2427         size_t mon_addrs_size = 0;
2428         struct ceph_osd_client *osdc;
2429         int rc = -ENOMEM;
2430
2431         if (!try_module_get(THIS_MODULE))
2432                 return -ENODEV;
2433
2434         options = kmalloc(count, GFP_KERNEL);
2435         if (!options)
2436                 goto err_nomem;
2437         rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2438         if (!rbd_dev)
2439                 goto err_nomem;
2440
2441         /* static rbd_device initialization */
2442         spin_lock_init(&rbd_dev->lock);
2443         INIT_LIST_HEAD(&rbd_dev->node);
2444         INIT_LIST_HEAD(&rbd_dev->snaps);
2445         init_rwsem(&rbd_dev->header_rwsem);
2446
2447         init_rwsem(&rbd_dev->header_rwsem);
2448
2449         /* generate unique id: find highest unique id, add one */
2450         rbd_id_get(rbd_dev);
2451
2452         /* Fill in the device name, now that we have its id. */
2453         BUILD_BUG_ON(DEV_NAME_LEN
2454                         < sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
2455         sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->id);
2456
2457         /* parse add command */
2458         rc = rbd_add_parse_args(rbd_dev, buf, &mon_addrs, &mon_addrs_size,
2459                                 options, count);
2460         if (rc)
2461                 goto err_put_id;
2462
2463         rbd_dev->rbd_client = rbd_get_client(mon_addrs, mon_addrs_size - 1,
2464                                                 options);
2465         if (IS_ERR(rbd_dev->rbd_client)) {
2466                 rc = PTR_ERR(rbd_dev->rbd_client);
2467                 goto err_put_id;
2468         }
2469
2470         /* pick the pool */
2471         osdc = &rbd_dev->rbd_client->client->osdc;
2472         rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2473         if (rc < 0)
2474                 goto err_out_client;
2475         rbd_dev->pool_id = rc;
2476
2477         /* register our block device */
2478         rc = register_blkdev(0, rbd_dev->name);
2479         if (rc < 0)
2480                 goto err_out_client;
2481         rbd_dev->major = rc;
2482
2483         rc = rbd_bus_add_dev(rbd_dev);
2484         if (rc)
2485                 goto err_out_blkdev;
2486
2487         /*
2488          * At this point cleanup in the event of an error is the job
2489          * of the sysfs code (initiated by rbd_bus_del_dev()).
2490          *
2491          * Set up and announce blkdev mapping.
2492          */
2493         rc = rbd_init_disk(rbd_dev);
2494         if (rc)
2495                 goto err_out_bus;
2496
2497         rc = rbd_init_watch_dev(rbd_dev);
2498         if (rc)
2499                 goto err_out_bus;
2500
2501         return count;
2502
2503 err_out_bus:
2504         /* this will also clean up rest of rbd_dev stuff */
2505
2506         rbd_bus_del_dev(rbd_dev);
2507         kfree(options);
2508         return rc;
2509
2510 err_out_blkdev:
2511         unregister_blkdev(rbd_dev->major, rbd_dev->name);
2512 err_out_client:
2513         rbd_put_client(rbd_dev);
2514 err_put_id:
2515         if (rbd_dev->pool_name) {
2516                 kfree(rbd_dev->snap_name);
2517                 kfree(rbd_dev->obj_md_name);
2518                 kfree(rbd_dev->obj);
2519                 kfree(rbd_dev->pool_name);
2520         }
2521         rbd_id_put(rbd_dev);
2522 err_nomem:
2523         kfree(rbd_dev);
2524         kfree(options);
2525
2526         dout("Error adding device %s\n", buf);
2527         module_put(THIS_MODULE);
2528
2529         return (ssize_t) rc;
2530 }
2531
2532 static struct rbd_device *__rbd_get_dev(unsigned long id)
2533 {
2534         struct list_head *tmp;
2535         struct rbd_device *rbd_dev;
2536
2537         spin_lock(&rbd_dev_list_lock);
2538         list_for_each(tmp, &rbd_dev_list) {
2539                 rbd_dev = list_entry(tmp, struct rbd_device, node);
2540                 if (rbd_dev->id == id) {
2541                         spin_unlock(&rbd_dev_list_lock);
2542                         return rbd_dev;
2543                 }
2544         }
2545         spin_unlock(&rbd_dev_list_lock);
2546         return NULL;
2547 }
2548
2549 static void rbd_dev_release(struct device *dev)
2550 {
2551         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2552
2553         if (rbd_dev->watch_request) {
2554                 struct ceph_client *client = rbd_dev->rbd_client->client;
2555
2556                 ceph_osdc_unregister_linger_request(&client->osdc,
2557                                                     rbd_dev->watch_request);
2558         }
2559         if (rbd_dev->watch_event)
2560                 rbd_req_sync_unwatch(rbd_dev, rbd_dev->obj_md_name);
2561
2562         rbd_put_client(rbd_dev);
2563
2564         /* clean up and free blkdev */
2565         rbd_free_disk(rbd_dev);
2566         unregister_blkdev(rbd_dev->major, rbd_dev->name);
2567
2568         /* done with the id, and with the rbd_dev */
2569         kfree(rbd_dev->snap_name);
2570         kfree(rbd_dev->obj_md_name);
2571         kfree(rbd_dev->pool_name);
2572         kfree(rbd_dev->obj);
2573         rbd_id_put(rbd_dev);
2574         kfree(rbd_dev);
2575
2576         /* release module ref */
2577         module_put(THIS_MODULE);
2578 }
2579
2580 static ssize_t rbd_remove(struct bus_type *bus,
2581                           const char *buf,
2582                           size_t count)
2583 {
2584         struct rbd_device *rbd_dev = NULL;
2585         int target_id, rc;
2586         unsigned long ul;
2587         int ret = count;
2588
2589         rc = strict_strtoul(buf, 10, &ul);
2590         if (rc)
2591                 return rc;
2592
2593         /* convert to int; abort if we lost anything in the conversion */
2594         target_id = (int) ul;
2595         if (target_id != ul)
2596                 return -EINVAL;
2597
2598         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2599
2600         rbd_dev = __rbd_get_dev(target_id);
2601         if (!rbd_dev) {
2602                 ret = -ENOENT;
2603                 goto done;
2604         }
2605
2606         __rbd_remove_all_snaps(rbd_dev);
2607         rbd_bus_del_dev(rbd_dev);
2608
2609 done:
2610         mutex_unlock(&ctl_mutex);
2611         return ret;
2612 }
2613
2614 static ssize_t rbd_snap_add(struct device *dev,
2615                             struct device_attribute *attr,
2616                             const char *buf,
2617                             size_t count)
2618 {
2619         struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
2620         int ret;
2621         char *name = kmalloc(count + 1, GFP_KERNEL);
2622         if (!name)
2623                 return -ENOMEM;
2624
2625         snprintf(name, count, "%s", buf);
2626
2627         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2628
2629         ret = rbd_header_add_snap(rbd_dev,
2630                                   name, GFP_KERNEL);
2631         if (ret < 0)
2632                 goto err_unlock;
2633
2634         ret = __rbd_refresh_header(rbd_dev);
2635         if (ret < 0)
2636                 goto err_unlock;
2637
2638         /* shouldn't hold ctl_mutex when notifying.. notify might
2639            trigger a watch callback that would need to get that mutex */
2640         mutex_unlock(&ctl_mutex);
2641
2642         /* make a best effort, don't error if failed */
2643         rbd_req_sync_notify(rbd_dev, rbd_dev->obj_md_name);
2644
2645         ret = count;
2646         kfree(name);
2647         return ret;
2648
2649 err_unlock:
2650         mutex_unlock(&ctl_mutex);
2651         kfree(name);
2652         return ret;
2653 }
2654
2655 /*
2656  * create control files in sysfs
2657  * /sys/bus/rbd/...
2658  */
2659 static int rbd_sysfs_init(void)
2660 {
2661         int ret;
2662
2663         ret = device_register(&rbd_root_dev);
2664         if (ret < 0)
2665                 return ret;
2666
2667         ret = bus_register(&rbd_bus_type);
2668         if (ret < 0)
2669                 device_unregister(&rbd_root_dev);
2670
2671         return ret;
2672 }
2673
2674 static void rbd_sysfs_cleanup(void)
2675 {
2676         bus_unregister(&rbd_bus_type);
2677         device_unregister(&rbd_root_dev);
2678 }
2679
2680 int __init rbd_init(void)
2681 {
2682         int rc;
2683
2684         rc = rbd_sysfs_init();
2685         if (rc)
2686                 return rc;
2687         pr_info("loaded " RBD_DRV_NAME_LONG "\n");
2688         return 0;
2689 }
2690
2691 void __exit rbd_exit(void)
2692 {
2693         rbd_sysfs_cleanup();
2694 }
2695
2696 module_init(rbd_init);
2697 module_exit(rbd_exit);
2698
2699 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2700 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2701 MODULE_DESCRIPTION("rados block device");
2702
2703 /* following authorship retained from original osdblk.c */
2704 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2705
2706 MODULE_LICENSE("GPL");