rbd: protect the rbd_dev_list with a spinlock
[cascardo/linux.git] / drivers / block / rbd.c
1 /*
2    rbd.c -- Export ceph rados objects as a Linux block device
3
4
5    based on drivers/block/osdblk.c:
6
7    Copyright 2009 Red Hat, Inc.
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation.
12
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17
18    You should have received a copy of the GNU General Public License
19    along with this program; see the file COPYING.  If not, write to
20    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24    For usage instructions, please refer to:
25
26                  Documentation/ABI/testing/sysfs-bus-rbd
27
28  */
29
30 #include <linux/ceph/libceph.h>
31 #include <linux/ceph/osd_client.h>
32 #include <linux/ceph/mon_client.h>
33 #include <linux/ceph/decode.h>
34 #include <linux/parser.h>
35
36 #include <linux/kernel.h>
37 #include <linux/device.h>
38 #include <linux/module.h>
39 #include <linux/fs.h>
40 #include <linux/blkdev.h>
41
42 #include "rbd_types.h"
43
44 #define DRV_NAME "rbd"
45 #define DRV_NAME_LONG "rbd (rados block device)"
46
47 #define RBD_MINORS_PER_MAJOR    256             /* max minors per blkdev */
48
49 #define RBD_MAX_MD_NAME_LEN     (RBD_MAX_OBJ_NAME_LEN + sizeof(RBD_SUFFIX))
50 #define RBD_MAX_POOL_NAME_LEN   64
51 #define RBD_MAX_SNAP_NAME_LEN   32
52 #define RBD_MAX_OPT_LEN         1024
53
54 #define RBD_SNAP_HEAD_NAME      "-"
55
56 #define DEV_NAME_LEN            32
57
58 #define RBD_NOTIFY_TIMEOUT_DEFAULT 10
59
60 /*
61  * block device image metadata (in-memory version)
62  */
63 struct rbd_image_header {
64         u64 image_size;
65         char block_name[32];
66         __u8 obj_order;
67         __u8 crypt_type;
68         __u8 comp_type;
69         struct rw_semaphore snap_rwsem;
70         struct ceph_snap_context *snapc;
71         size_t snap_names_len;
72         u64 snap_seq;
73         u32 total_snaps;
74
75         char *snap_names;
76         u64 *snap_sizes;
77
78         u64 obj_version;
79 };
80
81 struct rbd_options {
82         int     notify_timeout;
83 };
84
85 /*
86  * an instance of the client.  multiple devices may share a client.
87  */
88 struct rbd_client {
89         struct ceph_client      *client;
90         struct rbd_options      *rbd_opts;
91         struct kref             kref;
92         struct list_head        node;
93 };
94
95 struct rbd_req_coll;
96
97 /*
98  * a single io request
99  */
100 struct rbd_request {
101         struct request          *rq;            /* blk layer request */
102         struct bio              *bio;           /* cloned bio */
103         struct page             **pages;        /* list of used pages */
104         u64                     len;
105         int                     coll_index;
106         struct rbd_req_coll     *coll;
107 };
108
109 struct rbd_req_status {
110         int done;
111         int rc;
112         u64 bytes;
113 };
114
115 /*
116  * a collection of requests
117  */
118 struct rbd_req_coll {
119         int                     total;
120         int                     num_done;
121         struct kref             kref;
122         struct rbd_req_status   status[0];
123 };
124
125 struct rbd_snap {
126         struct  device          dev;
127         const char              *name;
128         size_t                  size;
129         struct list_head        node;
130         u64                     id;
131 };
132
133 /*
134  * a single device
135  */
136 struct rbd_device {
137         int                     id;             /* blkdev unique id */
138
139         int                     major;          /* blkdev assigned major */
140         struct gendisk          *disk;          /* blkdev's gendisk and rq */
141         struct request_queue    *q;
142
143         struct rbd_client       *rbd_client;
144
145         char                    name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
146
147         spinlock_t              lock;           /* queue lock */
148
149         struct rbd_image_header header;
150         char                    obj[RBD_MAX_OBJ_NAME_LEN]; /* rbd image name */
151         int                     obj_len;
152         char                    obj_md_name[RBD_MAX_MD_NAME_LEN]; /* hdr nm. */
153         char                    pool_name[RBD_MAX_POOL_NAME_LEN];
154         int                     poolid;
155
156         struct ceph_osd_event   *watch_event;
157         struct ceph_osd_request *watch_request;
158
159         char                    snap_name[RBD_MAX_SNAP_NAME_LEN];
160         u32 cur_snap;   /* index+1 of current snapshot within snap context
161                            0 - for the head */
162         int read_only;
163
164         struct list_head        node;
165
166         /* list of snapshots */
167         struct list_head        snaps;
168
169         /* sysfs related */
170         struct device           dev;
171 };
172
173 static struct bus_type rbd_bus_type = {
174         .name           = "rbd",
175 };
176
177 static DEFINE_MUTEX(ctl_mutex);   /* Serialize open/close/setup/teardown */
178
179 static LIST_HEAD(rbd_dev_list);    /* devices */
180 static DEFINE_SPINLOCK(rbd_dev_list_lock);
181
182 static LIST_HEAD(rbd_client_list);      /* clients */
183 static DEFINE_SPINLOCK(node_lock);      /* protects client get/put */
184
185 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev);
186 static void rbd_dev_release(struct device *dev);
187 static ssize_t rbd_snap_add(struct device *dev,
188                             struct device_attribute *attr,
189                             const char *buf,
190                             size_t count);
191 static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
192                                   struct rbd_snap *snap);
193
194
195 static struct rbd_device *dev_to_rbd(struct device *dev)
196 {
197         return container_of(dev, struct rbd_device, dev);
198 }
199
200 static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
201 {
202         return get_device(&rbd_dev->dev);
203 }
204
205 static void rbd_put_dev(struct rbd_device *rbd_dev)
206 {
207         put_device(&rbd_dev->dev);
208 }
209
210 static int __rbd_update_snaps(struct rbd_device *rbd_dev);
211
212 static int rbd_open(struct block_device *bdev, fmode_t mode)
213 {
214         struct gendisk *disk = bdev->bd_disk;
215         struct rbd_device *rbd_dev = disk->private_data;
216
217         rbd_get_dev(rbd_dev);
218
219         set_device_ro(bdev, rbd_dev->read_only);
220
221         if ((mode & FMODE_WRITE) && rbd_dev->read_only)
222                 return -EROFS;
223
224         return 0;
225 }
226
227 static int rbd_release(struct gendisk *disk, fmode_t mode)
228 {
229         struct rbd_device *rbd_dev = disk->private_data;
230
231         rbd_put_dev(rbd_dev);
232
233         return 0;
234 }
235
236 static const struct block_device_operations rbd_bd_ops = {
237         .owner                  = THIS_MODULE,
238         .open                   = rbd_open,
239         .release                = rbd_release,
240 };
241
242 /*
243  * Initialize an rbd client instance.
244  * We own *opt.
245  */
246 static struct rbd_client *rbd_client_create(struct ceph_options *opt,
247                                             struct rbd_options *rbd_opts)
248 {
249         struct rbd_client *rbdc;
250         int ret = -ENOMEM;
251
252         dout("rbd_client_create\n");
253         rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
254         if (!rbdc)
255                 goto out_opt;
256
257         kref_init(&rbdc->kref);
258         INIT_LIST_HEAD(&rbdc->node);
259
260         rbdc->client = ceph_create_client(opt, rbdc, 0, 0);
261         if (IS_ERR(rbdc->client))
262                 goto out_rbdc;
263         opt = NULL; /* Now rbdc->client is responsible for opt */
264
265         ret = ceph_open_session(rbdc->client);
266         if (ret < 0)
267                 goto out_err;
268
269         rbdc->rbd_opts = rbd_opts;
270
271         spin_lock(&node_lock);
272         list_add_tail(&rbdc->node, &rbd_client_list);
273         spin_unlock(&node_lock);
274
275         dout("rbd_client_create created %p\n", rbdc);
276         return rbdc;
277
278 out_err:
279         ceph_destroy_client(rbdc->client);
280 out_rbdc:
281         kfree(rbdc);
282 out_opt:
283         if (opt)
284                 ceph_destroy_options(opt);
285         return ERR_PTR(ret);
286 }
287
288 /*
289  * Find a ceph client with specific addr and configuration.
290  */
291 static struct rbd_client *__rbd_client_find(struct ceph_options *opt)
292 {
293         struct rbd_client *client_node;
294
295         if (opt->flags & CEPH_OPT_NOSHARE)
296                 return NULL;
297
298         list_for_each_entry(client_node, &rbd_client_list, node)
299                 if (ceph_compare_options(opt, client_node->client) == 0)
300                         return client_node;
301         return NULL;
302 }
303
304 /*
305  * mount options
306  */
307 enum {
308         Opt_notify_timeout,
309         Opt_last_int,
310         /* int args above */
311         Opt_last_string,
312         /* string args above */
313 };
314
315 static match_table_t rbdopt_tokens = {
316         {Opt_notify_timeout, "notify_timeout=%d"},
317         /* int args above */
318         /* string args above */
319         {-1, NULL}
320 };
321
322 static int parse_rbd_opts_token(char *c, void *private)
323 {
324         struct rbd_options *rbdopt = private;
325         substring_t argstr[MAX_OPT_ARGS];
326         int token, intval, ret;
327
328         token = match_token(c, rbdopt_tokens, argstr);
329         if (token < 0)
330                 return -EINVAL;
331
332         if (token < Opt_last_int) {
333                 ret = match_int(&argstr[0], &intval);
334                 if (ret < 0) {
335                         pr_err("bad mount option arg (not int) "
336                                "at '%s'\n", c);
337                         return ret;
338                 }
339                 dout("got int token %d val %d\n", token, intval);
340         } else if (token > Opt_last_int && token < Opt_last_string) {
341                 dout("got string token %d val %s\n", token,
342                      argstr[0].from);
343         } else {
344                 dout("got token %d\n", token);
345         }
346
347         switch (token) {
348         case Opt_notify_timeout:
349                 rbdopt->notify_timeout = intval;
350                 break;
351         default:
352                 BUG_ON(token);
353         }
354         return 0;
355 }
356
357 /*
358  * Get a ceph client with specific addr and configuration, if one does
359  * not exist create it.
360  */
361 static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
362                           char *options)
363 {
364         struct rbd_client *rbdc;
365         struct ceph_options *opt;
366         int ret;
367         struct rbd_options *rbd_opts;
368
369         rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
370         if (!rbd_opts)
371                 return -ENOMEM;
372
373         rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
374
375         opt = ceph_parse_options(options, mon_addr,
376                                 mon_addr + strlen(mon_addr),
377                                 parse_rbd_opts_token, rbd_opts);
378         if (IS_ERR(opt)) {
379                 ret = PTR_ERR(opt);
380                 goto done_err;
381         }
382
383         spin_lock(&node_lock);
384         rbdc = __rbd_client_find(opt);
385         if (rbdc) {
386                 ceph_destroy_options(opt);
387                 kfree(rbd_opts);
388
389                 /* using an existing client */
390                 kref_get(&rbdc->kref);
391                 rbd_dev->rbd_client = rbdc;
392                 spin_unlock(&node_lock);
393                 return 0;
394         }
395         spin_unlock(&node_lock);
396
397         rbdc = rbd_client_create(opt, rbd_opts);
398         if (IS_ERR(rbdc)) {
399                 ret = PTR_ERR(rbdc);
400                 goto done_err;
401         }
402
403         rbd_dev->rbd_client = rbdc;
404         return 0;
405 done_err:
406         kfree(rbd_opts);
407         return ret;
408 }
409
410 /*
411  * Destroy ceph client
412  *
413  * Caller must hold node_lock.
414  */
415 static void rbd_client_release(struct kref *kref)
416 {
417         struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
418
419         dout("rbd_release_client %p\n", rbdc);
420         list_del(&rbdc->node);
421
422         ceph_destroy_client(rbdc->client);
423         kfree(rbdc->rbd_opts);
424         kfree(rbdc);
425 }
426
427 /*
428  * Drop reference to ceph client node. If it's not referenced anymore, release
429  * it.
430  */
431 static void rbd_put_client(struct rbd_device *rbd_dev)
432 {
433         spin_lock(&node_lock);
434         kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
435         spin_unlock(&node_lock);
436         rbd_dev->rbd_client = NULL;
437 }
438
439 /*
440  * Destroy requests collection
441  */
442 static void rbd_coll_release(struct kref *kref)
443 {
444         struct rbd_req_coll *coll =
445                 container_of(kref, struct rbd_req_coll, kref);
446
447         dout("rbd_coll_release %p\n", coll);
448         kfree(coll);
449 }
450
451 /*
452  * Create a new header structure, translate header format from the on-disk
453  * header.
454  */
455 static int rbd_header_from_disk(struct rbd_image_header *header,
456                                  struct rbd_image_header_ondisk *ondisk,
457                                  int allocated_snaps,
458                                  gfp_t gfp_flags)
459 {
460         int i;
461         u32 snap_count = le32_to_cpu(ondisk->snap_count);
462         int ret = -ENOMEM;
463
464         if (memcmp(ondisk, RBD_HEADER_TEXT, sizeof(RBD_HEADER_TEXT)))
465                 return -ENXIO;
466
467         init_rwsem(&header->snap_rwsem);
468         header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
469         header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
470                                 snap_count * sizeof (*ondisk),
471                                 gfp_flags);
472         if (!header->snapc)
473                 return -ENOMEM;
474         if (snap_count) {
475                 header->snap_names = kmalloc(header->snap_names_len,
476                                              GFP_KERNEL);
477                 if (!header->snap_names)
478                         goto err_snapc;
479                 header->snap_sizes = kmalloc(snap_count * sizeof(u64),
480                                              GFP_KERNEL);
481                 if (!header->snap_sizes)
482                         goto err_names;
483         } else {
484                 header->snap_names = NULL;
485                 header->snap_sizes = NULL;
486         }
487         memcpy(header->block_name, ondisk->block_name,
488                sizeof(ondisk->block_name));
489
490         header->image_size = le64_to_cpu(ondisk->image_size);
491         header->obj_order = ondisk->options.order;
492         header->crypt_type = ondisk->options.crypt_type;
493         header->comp_type = ondisk->options.comp_type;
494
495         atomic_set(&header->snapc->nref, 1);
496         header->snap_seq = le64_to_cpu(ondisk->snap_seq);
497         header->snapc->num_snaps = snap_count;
498         header->total_snaps = snap_count;
499
500         if (snap_count && allocated_snaps == snap_count) {
501                 for (i = 0; i < snap_count; i++) {
502                         header->snapc->snaps[i] =
503                                 le64_to_cpu(ondisk->snaps[i].id);
504                         header->snap_sizes[i] =
505                                 le64_to_cpu(ondisk->snaps[i].image_size);
506                 }
507
508                 /* copy snapshot names */
509                 memcpy(header->snap_names, &ondisk->snaps[i],
510                         header->snap_names_len);
511         }
512
513         return 0;
514
515 err_names:
516         kfree(header->snap_names);
517 err_snapc:
518         kfree(header->snapc);
519         return ret;
520 }
521
522 static int snap_index(struct rbd_image_header *header, int snap_num)
523 {
524         return header->total_snaps - snap_num;
525 }
526
527 static u64 cur_snap_id(struct rbd_device *rbd_dev)
528 {
529         struct rbd_image_header *header = &rbd_dev->header;
530
531         if (!rbd_dev->cur_snap)
532                 return 0;
533
534         return header->snapc->snaps[snap_index(header, rbd_dev->cur_snap)];
535 }
536
537 static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
538                         u64 *seq, u64 *size)
539 {
540         int i;
541         char *p = header->snap_names;
542
543         for (i = 0; i < header->total_snaps; i++, p += strlen(p) + 1) {
544                 if (strcmp(snap_name, p) == 0)
545                         break;
546         }
547         if (i == header->total_snaps)
548                 return -ENOENT;
549         if (seq)
550                 *seq = header->snapc->snaps[i];
551
552         if (size)
553                 *size = header->snap_sizes[i];
554
555         return i;
556 }
557
558 static int rbd_header_set_snap(struct rbd_device *dev, u64 *size)
559 {
560         struct rbd_image_header *header = &dev->header;
561         struct ceph_snap_context *snapc = header->snapc;
562         int ret = -ENOENT;
563
564         BUILD_BUG_ON(sizeof (dev->snap_name) < sizeof (RBD_SNAP_HEAD_NAME));
565
566         down_write(&header->snap_rwsem);
567
568         if (!memcmp(dev->snap_name, RBD_SNAP_HEAD_NAME,
569                     sizeof (RBD_SNAP_HEAD_NAME))) {
570                 if (header->total_snaps)
571                         snapc->seq = header->snap_seq;
572                 else
573                         snapc->seq = 0;
574                 dev->cur_snap = 0;
575                 dev->read_only = 0;
576                 if (size)
577                         *size = header->image_size;
578         } else {
579                 ret = snap_by_name(header, dev->snap_name, &snapc->seq, size);
580                 if (ret < 0)
581                         goto done;
582
583                 dev->cur_snap = header->total_snaps - ret;
584                 dev->read_only = 1;
585         }
586
587         ret = 0;
588 done:
589         up_write(&header->snap_rwsem);
590         return ret;
591 }
592
593 static void rbd_header_free(struct rbd_image_header *header)
594 {
595         kfree(header->snapc);
596         kfree(header->snap_names);
597         kfree(header->snap_sizes);
598 }
599
600 /*
601  * get the actual striped segment name, offset and length
602  */
603 static u64 rbd_get_segment(struct rbd_image_header *header,
604                            const char *block_name,
605                            u64 ofs, u64 len,
606                            char *seg_name, u64 *segofs)
607 {
608         u64 seg = ofs >> header->obj_order;
609
610         if (seg_name)
611                 snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
612                          "%s.%012llx", block_name, seg);
613
614         ofs = ofs & ((1 << header->obj_order) - 1);
615         len = min_t(u64, len, (1 << header->obj_order) - ofs);
616
617         if (segofs)
618                 *segofs = ofs;
619
620         return len;
621 }
622
623 static int rbd_get_num_segments(struct rbd_image_header *header,
624                                 u64 ofs, u64 len)
625 {
626         u64 start_seg = ofs >> header->obj_order;
627         u64 end_seg = (ofs + len - 1) >> header->obj_order;
628         return end_seg - start_seg + 1;
629 }
630
631 /*
632  * returns the size of an object in the image
633  */
634 static u64 rbd_obj_bytes(struct rbd_image_header *header)
635 {
636         return 1 << header->obj_order;
637 }
638
639 /*
640  * bio helpers
641  */
642
643 static void bio_chain_put(struct bio *chain)
644 {
645         struct bio *tmp;
646
647         while (chain) {
648                 tmp = chain;
649                 chain = chain->bi_next;
650                 bio_put(tmp);
651         }
652 }
653
654 /*
655  * zeros a bio chain, starting at specific offset
656  */
657 static void zero_bio_chain(struct bio *chain, int start_ofs)
658 {
659         struct bio_vec *bv;
660         unsigned long flags;
661         void *buf;
662         int i;
663         int pos = 0;
664
665         while (chain) {
666                 bio_for_each_segment(bv, chain, i) {
667                         if (pos + bv->bv_len > start_ofs) {
668                                 int remainder = max(start_ofs - pos, 0);
669                                 buf = bvec_kmap_irq(bv, &flags);
670                                 memset(buf + remainder, 0,
671                                        bv->bv_len - remainder);
672                                 bvec_kunmap_irq(buf, &flags);
673                         }
674                         pos += bv->bv_len;
675                 }
676
677                 chain = chain->bi_next;
678         }
679 }
680
681 /*
682  * bio_chain_clone - clone a chain of bios up to a certain length.
683  * might return a bio_pair that will need to be released.
684  */
685 static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
686                                    struct bio_pair **bp,
687                                    int len, gfp_t gfpmask)
688 {
689         struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
690         int total = 0;
691
692         if (*bp) {
693                 bio_pair_release(*bp);
694                 *bp = NULL;
695         }
696
697         while (old_chain && (total < len)) {
698                 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
699                 if (!tmp)
700                         goto err_out;
701
702                 if (total + old_chain->bi_size > len) {
703                         struct bio_pair *bp;
704
705                         /*
706                          * this split can only happen with a single paged bio,
707                          * split_bio will BUG_ON if this is not the case
708                          */
709                         dout("bio_chain_clone split! total=%d remaining=%d"
710                              "bi_size=%d\n",
711                              (int)total, (int)len-total,
712                              (int)old_chain->bi_size);
713
714                         /* split the bio. We'll release it either in the next
715                            call, or it will have to be released outside */
716                         bp = bio_split(old_chain, (len - total) / 512ULL);
717                         if (!bp)
718                                 goto err_out;
719
720                         __bio_clone(tmp, &bp->bio1);
721
722                         *next = &bp->bio2;
723                 } else {
724                         __bio_clone(tmp, old_chain);
725                         *next = old_chain->bi_next;
726                 }
727
728                 tmp->bi_bdev = NULL;
729                 gfpmask &= ~__GFP_WAIT;
730                 tmp->bi_next = NULL;
731
732                 if (!new_chain) {
733                         new_chain = tail = tmp;
734                 } else {
735                         tail->bi_next = tmp;
736                         tail = tmp;
737                 }
738                 old_chain = old_chain->bi_next;
739
740                 total += tmp->bi_size;
741         }
742
743         BUG_ON(total < len);
744
745         if (tail)
746                 tail->bi_next = NULL;
747
748         *old = old_chain;
749
750         return new_chain;
751
752 err_out:
753         dout("bio_chain_clone with err\n");
754         bio_chain_put(new_chain);
755         return NULL;
756 }
757
758 /*
759  * helpers for osd request op vectors.
760  */
761 static int rbd_create_rw_ops(struct ceph_osd_req_op **ops,
762                             int num_ops,
763                             int opcode,
764                             u32 payload_len)
765 {
766         *ops = kzalloc(sizeof(struct ceph_osd_req_op) * (num_ops + 1),
767                        GFP_NOIO);
768         if (!*ops)
769                 return -ENOMEM;
770         (*ops)[0].op = opcode;
771         /*
772          * op extent offset and length will be set later on
773          * in calc_raw_layout()
774          */
775         (*ops)[0].payload_len = payload_len;
776         return 0;
777 }
778
779 static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
780 {
781         kfree(ops);
782 }
783
784 static void rbd_coll_end_req_index(struct request *rq,
785                                    struct rbd_req_coll *coll,
786                                    int index,
787                                    int ret, u64 len)
788 {
789         struct request_queue *q;
790         int min, max, i;
791
792         dout("rbd_coll_end_req_index %p index %d ret %d len %lld\n",
793              coll, index, ret, len);
794
795         if (!rq)
796                 return;
797
798         if (!coll) {
799                 blk_end_request(rq, ret, len);
800                 return;
801         }
802
803         q = rq->q;
804
805         spin_lock_irq(q->queue_lock);
806         coll->status[index].done = 1;
807         coll->status[index].rc = ret;
808         coll->status[index].bytes = len;
809         max = min = coll->num_done;
810         while (max < coll->total && coll->status[max].done)
811                 max++;
812
813         for (i = min; i<max; i++) {
814                 __blk_end_request(rq, coll->status[i].rc,
815                                   coll->status[i].bytes);
816                 coll->num_done++;
817                 kref_put(&coll->kref, rbd_coll_release);
818         }
819         spin_unlock_irq(q->queue_lock);
820 }
821
822 static void rbd_coll_end_req(struct rbd_request *req,
823                              int ret, u64 len)
824 {
825         rbd_coll_end_req_index(req->rq, req->coll, req->coll_index, ret, len);
826 }
827
828 /*
829  * Send ceph osd request
830  */
831 static int rbd_do_request(struct request *rq,
832                           struct rbd_device *dev,
833                           struct ceph_snap_context *snapc,
834                           u64 snapid,
835                           const char *obj, u64 ofs, u64 len,
836                           struct bio *bio,
837                           struct page **pages,
838                           int num_pages,
839                           int flags,
840                           struct ceph_osd_req_op *ops,
841                           int num_reply,
842                           struct rbd_req_coll *coll,
843                           int coll_index,
844                           void (*rbd_cb)(struct ceph_osd_request *req,
845                                          struct ceph_msg *msg),
846                           struct ceph_osd_request **linger_req,
847                           u64 *ver)
848 {
849         struct ceph_osd_request *req;
850         struct ceph_file_layout *layout;
851         int ret;
852         u64 bno;
853         struct timespec mtime = CURRENT_TIME;
854         struct rbd_request *req_data;
855         struct ceph_osd_request_head *reqhead;
856         struct rbd_image_header *header = &dev->header;
857         struct ceph_osd_client *osdc;
858
859         req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
860         if (!req_data) {
861                 if (coll)
862                         rbd_coll_end_req_index(rq, coll, coll_index,
863                                                -ENOMEM, len);
864                 return -ENOMEM;
865         }
866
867         if (coll) {
868                 req_data->coll = coll;
869                 req_data->coll_index = coll_index;
870         }
871
872         dout("rbd_do_request obj=%s ofs=%lld len=%lld\n", obj, len, ofs);
873
874         down_read(&header->snap_rwsem);
875
876         osdc = &dev->rbd_client->client->osdc;
877         req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
878                                         false, GFP_NOIO, pages, bio);
879         if (!req) {
880                 up_read(&header->snap_rwsem);
881                 ret = -ENOMEM;
882                 goto done_pages;
883         }
884
885         req->r_callback = rbd_cb;
886
887         req_data->rq = rq;
888         req_data->bio = bio;
889         req_data->pages = pages;
890         req_data->len = len;
891
892         req->r_priv = req_data;
893
894         reqhead = req->r_request->front.iov_base;
895         reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
896
897         strncpy(req->r_oid, obj, sizeof(req->r_oid));
898         req->r_oid_len = strlen(req->r_oid);
899
900         layout = &req->r_file_layout;
901         memset(layout, 0, sizeof(*layout));
902         layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
903         layout->fl_stripe_count = cpu_to_le32(1);
904         layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
905         layout->fl_pg_preferred = cpu_to_le32(-1);
906         layout->fl_pg_pool = cpu_to_le32(dev->poolid);
907         ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
908                                 req, ops);
909
910         ceph_osdc_build_request(req, ofs, &len,
911                                 ops,
912                                 snapc,
913                                 &mtime,
914                                 req->r_oid, req->r_oid_len);
915         up_read(&header->snap_rwsem);
916
917         if (linger_req) {
918                 ceph_osdc_set_request_linger(osdc, req);
919                 *linger_req = req;
920         }
921
922         ret = ceph_osdc_start_request(osdc, req, false);
923         if (ret < 0)
924                 goto done_err;
925
926         if (!rbd_cb) {
927                 ret = ceph_osdc_wait_request(osdc, req);
928                 if (ver)
929                         *ver = le64_to_cpu(req->r_reassert_version.version);
930                 dout("reassert_ver=%lld\n",
931                      le64_to_cpu(req->r_reassert_version.version));
932                 ceph_osdc_put_request(req);
933         }
934         return ret;
935
936 done_err:
937         bio_chain_put(req_data->bio);
938         ceph_osdc_put_request(req);
939 done_pages:
940         rbd_coll_end_req(req_data, ret, len);
941         kfree(req_data);
942         return ret;
943 }
944
945 /*
946  * Ceph osd op callback
947  */
948 static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
949 {
950         struct rbd_request *req_data = req->r_priv;
951         struct ceph_osd_reply_head *replyhead;
952         struct ceph_osd_op *op;
953         __s32 rc;
954         u64 bytes;
955         int read_op;
956
957         /* parse reply */
958         replyhead = msg->front.iov_base;
959         WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
960         op = (void *)(replyhead + 1);
961         rc = le32_to_cpu(replyhead->result);
962         bytes = le64_to_cpu(op->extent.length);
963         read_op = (le32_to_cpu(op->op) == CEPH_OSD_OP_READ);
964
965         dout("rbd_req_cb bytes=%lld readop=%d rc=%d\n", bytes, read_op, rc);
966
967         if (rc == -ENOENT && read_op) {
968                 zero_bio_chain(req_data->bio, 0);
969                 rc = 0;
970         } else if (rc == 0 && read_op && bytes < req_data->len) {
971                 zero_bio_chain(req_data->bio, bytes);
972                 bytes = req_data->len;
973         }
974
975         rbd_coll_end_req(req_data, rc, bytes);
976
977         if (req_data->bio)
978                 bio_chain_put(req_data->bio);
979
980         ceph_osdc_put_request(req);
981         kfree(req_data);
982 }
983
984 static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
985 {
986         ceph_osdc_put_request(req);
987 }
988
989 /*
990  * Do a synchronous ceph osd operation
991  */
992 static int rbd_req_sync_op(struct rbd_device *dev,
993                            struct ceph_snap_context *snapc,
994                            u64 snapid,
995                            int opcode,
996                            int flags,
997                            struct ceph_osd_req_op *orig_ops,
998                            int num_reply,
999                            const char *obj,
1000                            u64 ofs, u64 len,
1001                            char *buf,
1002                            struct ceph_osd_request **linger_req,
1003                            u64 *ver)
1004 {
1005         int ret;
1006         struct page **pages;
1007         int num_pages;
1008         struct ceph_osd_req_op *ops = orig_ops;
1009         u32 payload_len;
1010
1011         num_pages = calc_pages_for(ofs , len);
1012         pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
1013         if (IS_ERR(pages))
1014                 return PTR_ERR(pages);
1015
1016         if (!orig_ops) {
1017                 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? len : 0);
1018                 ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1019                 if (ret < 0)
1020                         goto done;
1021
1022                 if ((flags & CEPH_OSD_FLAG_WRITE) && buf) {
1023                         ret = ceph_copy_to_page_vector(pages, buf, ofs, len);
1024                         if (ret < 0)
1025                                 goto done_ops;
1026                 }
1027         }
1028
1029         ret = rbd_do_request(NULL, dev, snapc, snapid,
1030                           obj, ofs, len, NULL,
1031                           pages, num_pages,
1032                           flags,
1033                           ops,
1034                           2,
1035                           NULL, 0,
1036                           NULL,
1037                           linger_req, ver);
1038         if (ret < 0)
1039                 goto done_ops;
1040
1041         if ((flags & CEPH_OSD_FLAG_READ) && buf)
1042                 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
1043
1044 done_ops:
1045         if (!orig_ops)
1046                 rbd_destroy_ops(ops);
1047 done:
1048         ceph_release_page_vector(pages, num_pages);
1049         return ret;
1050 }
1051
1052 /*
1053  * Do an asynchronous ceph osd operation
1054  */
1055 static int rbd_do_op(struct request *rq,
1056                      struct rbd_device *rbd_dev ,
1057                      struct ceph_snap_context *snapc,
1058                      u64 snapid,
1059                      int opcode, int flags, int num_reply,
1060                      u64 ofs, u64 len,
1061                      struct bio *bio,
1062                      struct rbd_req_coll *coll,
1063                      int coll_index)
1064 {
1065         char *seg_name;
1066         u64 seg_ofs;
1067         u64 seg_len;
1068         int ret;
1069         struct ceph_osd_req_op *ops;
1070         u32 payload_len;
1071
1072         seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1073         if (!seg_name)
1074                 return -ENOMEM;
1075
1076         seg_len = rbd_get_segment(&rbd_dev->header,
1077                                   rbd_dev->header.block_name,
1078                                   ofs, len,
1079                                   seg_name, &seg_ofs);
1080
1081         payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
1082
1083         ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
1084         if (ret < 0)
1085                 goto done;
1086
1087         /* we've taken care of segment sizes earlier when we
1088            cloned the bios. We should never have a segment
1089            truncated at this point */
1090         BUG_ON(seg_len < len);
1091
1092         ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
1093                              seg_name, seg_ofs, seg_len,
1094                              bio,
1095                              NULL, 0,
1096                              flags,
1097                              ops,
1098                              num_reply,
1099                              coll, coll_index,
1100                              rbd_req_cb, 0, NULL);
1101
1102         rbd_destroy_ops(ops);
1103 done:
1104         kfree(seg_name);
1105         return ret;
1106 }
1107
1108 /*
1109  * Request async osd write
1110  */
1111 static int rbd_req_write(struct request *rq,
1112                          struct rbd_device *rbd_dev,
1113                          struct ceph_snap_context *snapc,
1114                          u64 ofs, u64 len,
1115                          struct bio *bio,
1116                          struct rbd_req_coll *coll,
1117                          int coll_index)
1118 {
1119         return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
1120                          CEPH_OSD_OP_WRITE,
1121                          CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1122                          2,
1123                          ofs, len, bio, coll, coll_index);
1124 }
1125
1126 /*
1127  * Request async osd read
1128  */
1129 static int rbd_req_read(struct request *rq,
1130                          struct rbd_device *rbd_dev,
1131                          u64 snapid,
1132                          u64 ofs, u64 len,
1133                          struct bio *bio,
1134                          struct rbd_req_coll *coll,
1135                          int coll_index)
1136 {
1137         return rbd_do_op(rq, rbd_dev, NULL,
1138                          (snapid ? snapid : CEPH_NOSNAP),
1139                          CEPH_OSD_OP_READ,
1140                          CEPH_OSD_FLAG_READ,
1141                          2,
1142                          ofs, len, bio, coll, coll_index);
1143 }
1144
1145 /*
1146  * Request sync osd read
1147  */
1148 static int rbd_req_sync_read(struct rbd_device *dev,
1149                           struct ceph_snap_context *snapc,
1150                           u64 snapid,
1151                           const char *obj,
1152                           u64 ofs, u64 len,
1153                           char *buf,
1154                           u64 *ver)
1155 {
1156         return rbd_req_sync_op(dev, NULL,
1157                                (snapid ? snapid : CEPH_NOSNAP),
1158                                CEPH_OSD_OP_READ,
1159                                CEPH_OSD_FLAG_READ,
1160                                NULL,
1161                                1, obj, ofs, len, buf, NULL, ver);
1162 }
1163
1164 /*
1165  * Request sync osd watch
1166  */
1167 static int rbd_req_sync_notify_ack(struct rbd_device *dev,
1168                                    u64 ver,
1169                                    u64 notify_id,
1170                                    const char *obj)
1171 {
1172         struct ceph_osd_req_op *ops;
1173         struct page **pages = NULL;
1174         int ret;
1175
1176         ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY_ACK, 0);
1177         if (ret < 0)
1178                 return ret;
1179
1180         ops[0].watch.ver = cpu_to_le64(dev->header.obj_version);
1181         ops[0].watch.cookie = notify_id;
1182         ops[0].watch.flag = 0;
1183
1184         ret = rbd_do_request(NULL, dev, NULL, CEPH_NOSNAP,
1185                           obj, 0, 0, NULL,
1186                           pages, 0,
1187                           CEPH_OSD_FLAG_READ,
1188                           ops,
1189                           1,
1190                           NULL, 0,
1191                           rbd_simple_req_cb, 0, NULL);
1192
1193         rbd_destroy_ops(ops);
1194         return ret;
1195 }
1196
1197 static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1198 {
1199         struct rbd_device *dev = (struct rbd_device *)data;
1200         int rc;
1201
1202         if (!dev)
1203                 return;
1204
1205         dout("rbd_watch_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
1206                 notify_id, (int)opcode);
1207         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1208         rc = __rbd_update_snaps(dev);
1209         mutex_unlock(&ctl_mutex);
1210         if (rc)
1211                 pr_warning(DRV_NAME "%d got notification but failed to update"
1212                            " snaps: %d\n", dev->major, rc);
1213
1214         rbd_req_sync_notify_ack(dev, ver, notify_id, dev->obj_md_name);
1215 }
1216
1217 /*
1218  * Request sync osd watch
1219  */
1220 static int rbd_req_sync_watch(struct rbd_device *dev,
1221                               const char *obj,
1222                               u64 ver)
1223 {
1224         struct ceph_osd_req_op *ops;
1225         struct ceph_osd_client *osdc = &dev->rbd_client->client->osdc;
1226
1227         int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1228         if (ret < 0)
1229                 return ret;
1230
1231         ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
1232                                      (void *)dev, &dev->watch_event);
1233         if (ret < 0)
1234                 goto fail;
1235
1236         ops[0].watch.ver = cpu_to_le64(ver);
1237         ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie);
1238         ops[0].watch.flag = 1;
1239
1240         ret = rbd_req_sync_op(dev, NULL,
1241                               CEPH_NOSNAP,
1242                               0,
1243                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1244                               ops,
1245                               1, obj, 0, 0, NULL,
1246                               &dev->watch_request, NULL);
1247
1248         if (ret < 0)
1249                 goto fail_event;
1250
1251         rbd_destroy_ops(ops);
1252         return 0;
1253
1254 fail_event:
1255         ceph_osdc_cancel_event(dev->watch_event);
1256         dev->watch_event = NULL;
1257 fail:
1258         rbd_destroy_ops(ops);
1259         return ret;
1260 }
1261
1262 /*
1263  * Request sync osd unwatch
1264  */
1265 static int rbd_req_sync_unwatch(struct rbd_device *dev,
1266                                 const char *obj)
1267 {
1268         struct ceph_osd_req_op *ops;
1269
1270         int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
1271         if (ret < 0)
1272                 return ret;
1273
1274         ops[0].watch.ver = 0;
1275         ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie);
1276         ops[0].watch.flag = 0;
1277
1278         ret = rbd_req_sync_op(dev, NULL,
1279                               CEPH_NOSNAP,
1280                               0,
1281                               CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1282                               ops,
1283                               1, obj, 0, 0, NULL, NULL, NULL);
1284
1285         rbd_destroy_ops(ops);
1286         ceph_osdc_cancel_event(dev->watch_event);
1287         dev->watch_event = NULL;
1288         return ret;
1289 }
1290
1291 struct rbd_notify_info {
1292         struct rbd_device *dev;
1293 };
1294
1295 static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
1296 {
1297         struct rbd_device *dev = (struct rbd_device *)data;
1298         if (!dev)
1299                 return;
1300
1301         dout("rbd_notify_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
1302                 notify_id, (int)opcode);
1303 }
1304
1305 /*
1306  * Request sync osd notify
1307  */
1308 static int rbd_req_sync_notify(struct rbd_device *dev,
1309                           const char *obj)
1310 {
1311         struct ceph_osd_req_op *ops;
1312         struct ceph_osd_client *osdc = &dev->rbd_client->client->osdc;
1313         struct ceph_osd_event *event;
1314         struct rbd_notify_info info;
1315         int payload_len = sizeof(u32) + sizeof(u32);
1316         int ret;
1317
1318         ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY, payload_len);
1319         if (ret < 0)
1320                 return ret;
1321
1322         info.dev = dev;
1323
1324         ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
1325                                      (void *)&info, &event);
1326         if (ret < 0)
1327                 goto fail;
1328
1329         ops[0].watch.ver = 1;
1330         ops[0].watch.flag = 1;
1331         ops[0].watch.cookie = event->cookie;
1332         ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
1333         ops[0].watch.timeout = 12;
1334
1335         ret = rbd_req_sync_op(dev, NULL,
1336                                CEPH_NOSNAP,
1337                                0,
1338                                CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1339                                ops,
1340                                1, obj, 0, 0, NULL, NULL, NULL);
1341         if (ret < 0)
1342                 goto fail_event;
1343
1344         ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
1345         dout("ceph_osdc_wait_event returned %d\n", ret);
1346         rbd_destroy_ops(ops);
1347         return 0;
1348
1349 fail_event:
1350         ceph_osdc_cancel_event(event);
1351 fail:
1352         rbd_destroy_ops(ops);
1353         return ret;
1354 }
1355
1356 /*
1357  * Request sync osd read
1358  */
1359 static int rbd_req_sync_exec(struct rbd_device *dev,
1360                              const char *obj,
1361                              const char *cls,
1362                              const char *method,
1363                              const char *data,
1364                              int len,
1365                              u64 *ver)
1366 {
1367         struct ceph_osd_req_op *ops;
1368         int cls_len = strlen(cls);
1369         int method_len = strlen(method);
1370         int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_CALL,
1371                                     cls_len + method_len + len);
1372         if (ret < 0)
1373                 return ret;
1374
1375         ops[0].cls.class_name = cls;
1376         ops[0].cls.class_len = (__u8)cls_len;
1377         ops[0].cls.method_name = method;
1378         ops[0].cls.method_len = (__u8)method_len;
1379         ops[0].cls.argc = 0;
1380         ops[0].cls.indata = data;
1381         ops[0].cls.indata_len = len;
1382
1383         ret = rbd_req_sync_op(dev, NULL,
1384                                CEPH_NOSNAP,
1385                                0,
1386                                CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1387                                ops,
1388                                1, obj, 0, 0, NULL, NULL, ver);
1389
1390         rbd_destroy_ops(ops);
1391
1392         dout("cls_exec returned %d\n", ret);
1393         return ret;
1394 }
1395
1396 static struct rbd_req_coll *rbd_alloc_coll(int num_reqs)
1397 {
1398         struct rbd_req_coll *coll =
1399                         kzalloc(sizeof(struct rbd_req_coll) +
1400                                 sizeof(struct rbd_req_status) * num_reqs,
1401                                 GFP_ATOMIC);
1402
1403         if (!coll)
1404                 return NULL;
1405         coll->total = num_reqs;
1406         kref_init(&coll->kref);
1407         return coll;
1408 }
1409
1410 /*
1411  * block device queue callback
1412  */
1413 static void rbd_rq_fn(struct request_queue *q)
1414 {
1415         struct rbd_device *rbd_dev = q->queuedata;
1416         struct request *rq;
1417         struct bio_pair *bp = NULL;
1418
1419         rq = blk_fetch_request(q);
1420
1421         while (1) {
1422                 struct bio *bio;
1423                 struct bio *rq_bio, *next_bio = NULL;
1424                 bool do_write;
1425                 int size, op_size = 0;
1426                 u64 ofs;
1427                 int num_segs, cur_seg = 0;
1428                 struct rbd_req_coll *coll;
1429
1430                 /* peek at request from block layer */
1431                 if (!rq)
1432                         break;
1433
1434                 dout("fetched request\n");
1435
1436                 /* filter out block requests we don't understand */
1437                 if ((rq->cmd_type != REQ_TYPE_FS)) {
1438                         __blk_end_request_all(rq, 0);
1439                         goto next;
1440                 }
1441
1442                 /* deduce our operation (read, write) */
1443                 do_write = (rq_data_dir(rq) == WRITE);
1444
1445                 size = blk_rq_bytes(rq);
1446                 ofs = blk_rq_pos(rq) * 512ULL;
1447                 rq_bio = rq->bio;
1448                 if (do_write && rbd_dev->read_only) {
1449                         __blk_end_request_all(rq, -EROFS);
1450                         goto next;
1451                 }
1452
1453                 spin_unlock_irq(q->queue_lock);
1454
1455                 dout("%s 0x%x bytes at 0x%llx\n",
1456                      do_write ? "write" : "read",
1457                      size, blk_rq_pos(rq) * 512ULL);
1458
1459                 num_segs = rbd_get_num_segments(&rbd_dev->header, ofs, size);
1460                 coll = rbd_alloc_coll(num_segs);
1461                 if (!coll) {
1462                         spin_lock_irq(q->queue_lock);
1463                         __blk_end_request_all(rq, -ENOMEM);
1464                         goto next;
1465                 }
1466
1467                 do {
1468                         /* a bio clone to be passed down to OSD req */
1469                         dout("rq->bio->bi_vcnt=%d\n", rq->bio->bi_vcnt);
1470                         op_size = rbd_get_segment(&rbd_dev->header,
1471                                                   rbd_dev->header.block_name,
1472                                                   ofs, size,
1473                                                   NULL, NULL);
1474                         kref_get(&coll->kref);
1475                         bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1476                                               op_size, GFP_ATOMIC);
1477                         if (!bio) {
1478                                 rbd_coll_end_req_index(rq, coll, cur_seg,
1479                                                        -ENOMEM, op_size);
1480                                 goto next_seg;
1481                         }
1482
1483
1484                         /* init OSD command: write or read */
1485                         if (do_write)
1486                                 rbd_req_write(rq, rbd_dev,
1487                                               rbd_dev->header.snapc,
1488                                               ofs,
1489                                               op_size, bio,
1490                                               coll, cur_seg);
1491                         else
1492                                 rbd_req_read(rq, rbd_dev,
1493                                              cur_snap_id(rbd_dev),
1494                                              ofs,
1495                                              op_size, bio,
1496                                              coll, cur_seg);
1497
1498 next_seg:
1499                         size -= op_size;
1500                         ofs += op_size;
1501
1502                         cur_seg++;
1503                         rq_bio = next_bio;
1504                 } while (size > 0);
1505                 kref_put(&coll->kref, rbd_coll_release);
1506
1507                 if (bp)
1508                         bio_pair_release(bp);
1509                 spin_lock_irq(q->queue_lock);
1510 next:
1511                 rq = blk_fetch_request(q);
1512         }
1513 }
1514
1515 /*
1516  * a queue callback. Makes sure that we don't create a bio that spans across
1517  * multiple osd objects. One exception would be with a single page bios,
1518  * which we handle later at bio_chain_clone
1519  */
1520 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1521                           struct bio_vec *bvec)
1522 {
1523         struct rbd_device *rbd_dev = q->queuedata;
1524         unsigned int chunk_sectors = 1 << (rbd_dev->header.obj_order - 9);
1525         sector_t sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1526         unsigned int bio_sectors = bmd->bi_size >> 9;
1527         int max;
1528
1529         max =  (chunk_sectors - ((sector & (chunk_sectors - 1))
1530                                  + bio_sectors)) << 9;
1531         if (max < 0)
1532                 max = 0; /* bio_add cannot handle a negative return */
1533         if (max <= bvec->bv_len && bio_sectors == 0)
1534                 return bvec->bv_len;
1535         return max;
1536 }
1537
1538 static void rbd_free_disk(struct rbd_device *rbd_dev)
1539 {
1540         struct gendisk *disk = rbd_dev->disk;
1541
1542         if (!disk)
1543                 return;
1544
1545         rbd_header_free(&rbd_dev->header);
1546
1547         if (disk->flags & GENHD_FL_UP)
1548                 del_gendisk(disk);
1549         if (disk->queue)
1550                 blk_cleanup_queue(disk->queue);
1551         put_disk(disk);
1552 }
1553
1554 /*
1555  * reload the ondisk the header 
1556  */
1557 static int rbd_read_header(struct rbd_device *rbd_dev,
1558                            struct rbd_image_header *header)
1559 {
1560         ssize_t rc;
1561         struct rbd_image_header_ondisk *dh;
1562         int snap_count = 0;
1563         u64 snap_names_len = 0;
1564         u64 ver;
1565
1566         while (1) {
1567                 int len = sizeof(*dh) +
1568                           snap_count * sizeof(struct rbd_image_snap_ondisk) +
1569                           snap_names_len;
1570
1571                 rc = -ENOMEM;
1572                 dh = kmalloc(len, GFP_KERNEL);
1573                 if (!dh)
1574                         return -ENOMEM;
1575
1576                 rc = rbd_req_sync_read(rbd_dev,
1577                                        NULL, CEPH_NOSNAP,
1578                                        rbd_dev->obj_md_name,
1579                                        0, len,
1580                                        (char *)dh, &ver);
1581                 if (rc < 0)
1582                         goto out_dh;
1583
1584                 rc = rbd_header_from_disk(header, dh, snap_count, GFP_KERNEL);
1585                 if (rc < 0) {
1586                         if (rc == -ENXIO) {
1587                                 pr_warning("unrecognized header format"
1588                                            " for image %s", rbd_dev->obj);
1589                         }
1590                         goto out_dh;
1591                 }
1592
1593                 if (snap_count != header->total_snaps) {
1594                         snap_count = header->total_snaps;
1595                         snap_names_len = header->snap_names_len;
1596                         rbd_header_free(header);
1597                         kfree(dh);
1598                         continue;
1599                 }
1600                 break;
1601         }
1602         header->obj_version = ver;
1603
1604 out_dh:
1605         kfree(dh);
1606         return rc;
1607 }
1608
1609 /*
1610  * create a snapshot
1611  */
1612 static int rbd_header_add_snap(struct rbd_device *dev,
1613                                const char *snap_name,
1614                                gfp_t gfp_flags)
1615 {
1616         int name_len = strlen(snap_name);
1617         u64 new_snapid;
1618         int ret;
1619         void *data, *p, *e;
1620         u64 ver;
1621         struct ceph_mon_client *monc;
1622
1623         /* we should create a snapshot only if we're pointing at the head */
1624         if (dev->cur_snap)
1625                 return -EINVAL;
1626
1627         monc = &dev->rbd_client->client->monc;
1628         ret = ceph_monc_create_snapid(monc, dev->poolid, &new_snapid);
1629         dout("created snapid=%lld\n", new_snapid);
1630         if (ret < 0)
1631                 return ret;
1632
1633         data = kmalloc(name_len + 16, gfp_flags);
1634         if (!data)
1635                 return -ENOMEM;
1636
1637         p = data;
1638         e = data + name_len + 16;
1639
1640         ceph_encode_string_safe(&p, e, snap_name, name_len, bad);
1641         ceph_encode_64_safe(&p, e, new_snapid, bad);
1642
1643         ret = rbd_req_sync_exec(dev, dev->obj_md_name, "rbd", "snap_add",
1644                                 data, p - data, &ver);
1645
1646         kfree(data);
1647
1648         if (ret < 0)
1649                 return ret;
1650
1651         dev->header.snapc->seq =  new_snapid;
1652
1653         return 0;
1654 bad:
1655         return -ERANGE;
1656 }
1657
1658 static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
1659 {
1660         struct rbd_snap *snap;
1661
1662         while (!list_empty(&rbd_dev->snaps)) {
1663                 snap = list_first_entry(&rbd_dev->snaps, struct rbd_snap, node);
1664                 __rbd_remove_snap_dev(rbd_dev, snap);
1665         }
1666 }
1667
1668 /*
1669  * only read the first part of the ondisk header, without the snaps info
1670  */
1671 static int __rbd_update_snaps(struct rbd_device *rbd_dev)
1672 {
1673         int ret;
1674         struct rbd_image_header h;
1675         u64 snap_seq;
1676         int follow_seq = 0;
1677
1678         ret = rbd_read_header(rbd_dev, &h);
1679         if (ret < 0)
1680                 return ret;
1681
1682         /* resized? */
1683         set_capacity(rbd_dev->disk, h.image_size / 512ULL);
1684
1685         down_write(&rbd_dev->header.snap_rwsem);
1686
1687         snap_seq = rbd_dev->header.snapc->seq;
1688         if (rbd_dev->header.total_snaps &&
1689             rbd_dev->header.snapc->snaps[0] == snap_seq)
1690                 /* pointing at the head, will need to follow that
1691                    if head moves */
1692                 follow_seq = 1;
1693
1694         kfree(rbd_dev->header.snapc);
1695         kfree(rbd_dev->header.snap_names);
1696         kfree(rbd_dev->header.snap_sizes);
1697
1698         rbd_dev->header.total_snaps = h.total_snaps;
1699         rbd_dev->header.snapc = h.snapc;
1700         rbd_dev->header.snap_names = h.snap_names;
1701         rbd_dev->header.snap_names_len = h.snap_names_len;
1702         rbd_dev->header.snap_sizes = h.snap_sizes;
1703         if (follow_seq)
1704                 rbd_dev->header.snapc->seq = rbd_dev->header.snapc->snaps[0];
1705         else
1706                 rbd_dev->header.snapc->seq = snap_seq;
1707
1708         ret = __rbd_init_snaps_header(rbd_dev);
1709
1710         up_write(&rbd_dev->header.snap_rwsem);
1711
1712         return ret;
1713 }
1714
1715 static int rbd_init_disk(struct rbd_device *rbd_dev)
1716 {
1717         struct gendisk *disk;
1718         struct request_queue *q;
1719         int rc;
1720         u64 total_size = 0;
1721
1722         /* contact OSD, request size info about the object being mapped */
1723         rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1724         if (rc)
1725                 return rc;
1726
1727         /* no need to lock here, as rbd_dev is not registered yet */
1728         rc = __rbd_init_snaps_header(rbd_dev);
1729         if (rc)
1730                 return rc;
1731
1732         rc = rbd_header_set_snap(rbd_dev, &total_size);
1733         if (rc)
1734                 return rc;
1735
1736         /* create gendisk info */
1737         rc = -ENOMEM;
1738         disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1739         if (!disk)
1740                 goto out;
1741
1742         snprintf(disk->disk_name, sizeof(disk->disk_name), DRV_NAME "%d",
1743                  rbd_dev->id);
1744         disk->major = rbd_dev->major;
1745         disk->first_minor = 0;
1746         disk->fops = &rbd_bd_ops;
1747         disk->private_data = rbd_dev;
1748
1749         /* init rq */
1750         rc = -ENOMEM;
1751         q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1752         if (!q)
1753                 goto out_disk;
1754
1755         /* set io sizes to object size */
1756         blk_queue_max_hw_sectors(q, rbd_obj_bytes(&rbd_dev->header) / 512ULL);
1757         blk_queue_max_segment_size(q, rbd_obj_bytes(&rbd_dev->header));
1758         blk_queue_io_min(q, rbd_obj_bytes(&rbd_dev->header));
1759         blk_queue_io_opt(q, rbd_obj_bytes(&rbd_dev->header));
1760
1761         blk_queue_merge_bvec(q, rbd_merge_bvec);
1762         disk->queue = q;
1763
1764         q->queuedata = rbd_dev;
1765
1766         rbd_dev->disk = disk;
1767         rbd_dev->q = q;
1768
1769         /* finally, announce the disk to the world */
1770         set_capacity(disk, total_size / 512ULL);
1771         add_disk(disk);
1772
1773         pr_info("%s: added with size 0x%llx\n",
1774                 disk->disk_name, (unsigned long long)total_size);
1775         return 0;
1776
1777 out_disk:
1778         put_disk(disk);
1779 out:
1780         return rc;
1781 }
1782
1783 /*
1784   sysfs
1785 */
1786
1787 static ssize_t rbd_size_show(struct device *dev,
1788                              struct device_attribute *attr, char *buf)
1789 {
1790         struct rbd_device *rbd_dev = dev_to_rbd(dev);
1791
1792         return sprintf(buf, "%llu\n", (unsigned long long)rbd_dev->header.image_size);
1793 }
1794
1795 static ssize_t rbd_major_show(struct device *dev,
1796                               struct device_attribute *attr, char *buf)
1797 {
1798         struct rbd_device *rbd_dev = dev_to_rbd(dev);
1799
1800         return sprintf(buf, "%d\n", rbd_dev->major);
1801 }
1802
1803 static ssize_t rbd_client_id_show(struct device *dev,
1804                                   struct device_attribute *attr, char *buf)
1805 {
1806         struct rbd_device *rbd_dev = dev_to_rbd(dev);
1807
1808         return sprintf(buf, "client%lld\n",
1809                         ceph_client_id(rbd_dev->rbd_client->client));
1810 }
1811
1812 static ssize_t rbd_pool_show(struct device *dev,
1813                              struct device_attribute *attr, char *buf)
1814 {
1815         struct rbd_device *rbd_dev = dev_to_rbd(dev);
1816
1817         return sprintf(buf, "%s\n", rbd_dev->pool_name);
1818 }
1819
1820 static ssize_t rbd_name_show(struct device *dev,
1821                              struct device_attribute *attr, char *buf)
1822 {
1823         struct rbd_device *rbd_dev = dev_to_rbd(dev);
1824
1825         return sprintf(buf, "%s\n", rbd_dev->obj);
1826 }
1827
1828 static ssize_t rbd_snap_show(struct device *dev,
1829                              struct device_attribute *attr,
1830                              char *buf)
1831 {
1832         struct rbd_device *rbd_dev = dev_to_rbd(dev);
1833
1834         return sprintf(buf, "%s\n", rbd_dev->snap_name);
1835 }
1836
1837 static ssize_t rbd_image_refresh(struct device *dev,
1838                                  struct device_attribute *attr,
1839                                  const char *buf,
1840                                  size_t size)
1841 {
1842         struct rbd_device *rbd_dev = dev_to_rbd(dev);
1843         int rc;
1844         int ret = size;
1845
1846         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1847
1848         rc = __rbd_update_snaps(rbd_dev);
1849         if (rc < 0)
1850                 ret = rc;
1851
1852         mutex_unlock(&ctl_mutex);
1853         return ret;
1854 }
1855
1856 static DEVICE_ATTR(size, S_IRUGO, rbd_size_show, NULL);
1857 static DEVICE_ATTR(major, S_IRUGO, rbd_major_show, NULL);
1858 static DEVICE_ATTR(client_id, S_IRUGO, rbd_client_id_show, NULL);
1859 static DEVICE_ATTR(pool, S_IRUGO, rbd_pool_show, NULL);
1860 static DEVICE_ATTR(name, S_IRUGO, rbd_name_show, NULL);
1861 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
1862 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
1863 static DEVICE_ATTR(create_snap, S_IWUSR, NULL, rbd_snap_add);
1864
1865 static struct attribute *rbd_attrs[] = {
1866         &dev_attr_size.attr,
1867         &dev_attr_major.attr,
1868         &dev_attr_client_id.attr,
1869         &dev_attr_pool.attr,
1870         &dev_attr_name.attr,
1871         &dev_attr_current_snap.attr,
1872         &dev_attr_refresh.attr,
1873         &dev_attr_create_snap.attr,
1874         NULL
1875 };
1876
1877 static struct attribute_group rbd_attr_group = {
1878         .attrs = rbd_attrs,
1879 };
1880
1881 static const struct attribute_group *rbd_attr_groups[] = {
1882         &rbd_attr_group,
1883         NULL
1884 };
1885
1886 static void rbd_sysfs_dev_release(struct device *dev)
1887 {
1888 }
1889
1890 static struct device_type rbd_device_type = {
1891         .name           = "rbd",
1892         .groups         = rbd_attr_groups,
1893         .release        = rbd_sysfs_dev_release,
1894 };
1895
1896
1897 /*
1898   sysfs - snapshots
1899 */
1900
1901 static ssize_t rbd_snap_size_show(struct device *dev,
1902                                   struct device_attribute *attr,
1903                                   char *buf)
1904 {
1905         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1906
1907         return sprintf(buf, "%lld\n", (long long)snap->size);
1908 }
1909
1910 static ssize_t rbd_snap_id_show(struct device *dev,
1911                                 struct device_attribute *attr,
1912                                 char *buf)
1913 {
1914         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1915
1916         return sprintf(buf, "%lld\n", (long long)snap->id);
1917 }
1918
1919 static DEVICE_ATTR(snap_size, S_IRUGO, rbd_snap_size_show, NULL);
1920 static DEVICE_ATTR(snap_id, S_IRUGO, rbd_snap_id_show, NULL);
1921
1922 static struct attribute *rbd_snap_attrs[] = {
1923         &dev_attr_snap_size.attr,
1924         &dev_attr_snap_id.attr,
1925         NULL,
1926 };
1927
1928 static struct attribute_group rbd_snap_attr_group = {
1929         .attrs = rbd_snap_attrs,
1930 };
1931
1932 static void rbd_snap_dev_release(struct device *dev)
1933 {
1934         struct rbd_snap *snap = container_of(dev, struct rbd_snap, dev);
1935         kfree(snap->name);
1936         kfree(snap);
1937 }
1938
1939 static const struct attribute_group *rbd_snap_attr_groups[] = {
1940         &rbd_snap_attr_group,
1941         NULL
1942 };
1943
1944 static struct device_type rbd_snap_device_type = {
1945         .groups         = rbd_snap_attr_groups,
1946         .release        = rbd_snap_dev_release,
1947 };
1948
1949 static void __rbd_remove_snap_dev(struct rbd_device *rbd_dev,
1950                                   struct rbd_snap *snap)
1951 {
1952         list_del(&snap->node);
1953         device_unregister(&snap->dev);
1954 }
1955
1956 static int rbd_register_snap_dev(struct rbd_device *rbd_dev,
1957                                   struct rbd_snap *snap,
1958                                   struct device *parent)
1959 {
1960         struct device *dev = &snap->dev;
1961         int ret;
1962
1963         dev->type = &rbd_snap_device_type;
1964         dev->parent = parent;
1965         dev->release = rbd_snap_dev_release;
1966         dev_set_name(dev, "snap_%s", snap->name);
1967         ret = device_register(dev);
1968
1969         return ret;
1970 }
1971
1972 static int __rbd_add_snap_dev(struct rbd_device *rbd_dev,
1973                               int i, const char *name,
1974                               struct rbd_snap **snapp)
1975 {
1976         int ret;
1977         struct rbd_snap *snap = kzalloc(sizeof(*snap), GFP_KERNEL);
1978         if (!snap)
1979                 return -ENOMEM;
1980         snap->name = kstrdup(name, GFP_KERNEL);
1981         snap->size = rbd_dev->header.snap_sizes[i];
1982         snap->id = rbd_dev->header.snapc->snaps[i];
1983         if (device_is_registered(&rbd_dev->dev)) {
1984                 ret = rbd_register_snap_dev(rbd_dev, snap,
1985                                              &rbd_dev->dev);
1986                 if (ret < 0)
1987                         goto err;
1988         }
1989         *snapp = snap;
1990         return 0;
1991 err:
1992         kfree(snap->name);
1993         kfree(snap);
1994         return ret;
1995 }
1996
1997 /*
1998  * search for the previous snap in a null delimited string list
1999  */
2000 const char *rbd_prev_snap_name(const char *name, const char *start)
2001 {
2002         if (name < start + 2)
2003                 return NULL;
2004
2005         name -= 2;
2006         while (*name) {
2007                 if (name == start)
2008                         return start;
2009                 name--;
2010         }
2011         return name + 1;
2012 }
2013
2014 /*
2015  * compare the old list of snapshots that we have to what's in the header
2016  * and update it accordingly. Note that the header holds the snapshots
2017  * in a reverse order (from newest to oldest) and we need to go from
2018  * older to new so that we don't get a duplicate snap name when
2019  * doing the process (e.g., removed snapshot and recreated a new
2020  * one with the same name.
2021  */
2022 static int __rbd_init_snaps_header(struct rbd_device *rbd_dev)
2023 {
2024         const char *name, *first_name;
2025         int i = rbd_dev->header.total_snaps;
2026         struct rbd_snap *snap, *old_snap = NULL;
2027         int ret;
2028         struct list_head *p, *n;
2029
2030         first_name = rbd_dev->header.snap_names;
2031         name = first_name + rbd_dev->header.snap_names_len;
2032
2033         list_for_each_prev_safe(p, n, &rbd_dev->snaps) {
2034                 u64 cur_id;
2035
2036                 old_snap = list_entry(p, struct rbd_snap, node);
2037
2038                 if (i)
2039                         cur_id = rbd_dev->header.snapc->snaps[i - 1];
2040
2041                 if (!i || old_snap->id < cur_id) {
2042                         /* old_snap->id was skipped, thus was removed */
2043                         __rbd_remove_snap_dev(rbd_dev, old_snap);
2044                         continue;
2045                 }
2046                 if (old_snap->id == cur_id) {
2047                         /* we have this snapshot already */
2048                         i--;
2049                         name = rbd_prev_snap_name(name, first_name);
2050                         continue;
2051                 }
2052                 for (; i > 0;
2053                      i--, name = rbd_prev_snap_name(name, first_name)) {
2054                         if (!name) {
2055                                 WARN_ON(1);
2056                                 return -EINVAL;
2057                         }
2058                         cur_id = rbd_dev->header.snapc->snaps[i];
2059                         /* snapshot removal? handle it above */
2060                         if (cur_id >= old_snap->id)
2061                                 break;
2062                         /* a new snapshot */
2063                         ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2064                         if (ret < 0)
2065                                 return ret;
2066
2067                         /* note that we add it backward so using n and not p */
2068                         list_add(&snap->node, n);
2069                         p = &snap->node;
2070                 }
2071         }
2072         /* we're done going over the old snap list, just add what's left */
2073         for (; i > 0; i--) {
2074                 name = rbd_prev_snap_name(name, first_name);
2075                 if (!name) {
2076                         WARN_ON(1);
2077                         return -EINVAL;
2078                 }
2079                 ret = __rbd_add_snap_dev(rbd_dev, i - 1, name, &snap);
2080                 if (ret < 0)
2081                         return ret;
2082                 list_add(&snap->node, &rbd_dev->snaps);
2083         }
2084
2085         return 0;
2086 }
2087
2088
2089 static void rbd_root_dev_release(struct device *dev)
2090 {
2091 }
2092
2093 static struct device rbd_root_dev = {
2094         .init_name =    "rbd",
2095         .release =      rbd_root_dev_release,
2096 };
2097
2098 static int rbd_bus_add_dev(struct rbd_device *rbd_dev)
2099 {
2100         int ret = -ENOMEM;
2101         struct device *dev;
2102         struct rbd_snap *snap;
2103
2104         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2105         dev = &rbd_dev->dev;
2106
2107         dev->bus = &rbd_bus_type;
2108         dev->type = &rbd_device_type;
2109         dev->parent = &rbd_root_dev;
2110         dev->release = rbd_dev_release;
2111         dev_set_name(dev, "%d", rbd_dev->id);
2112         ret = device_register(dev);
2113         if (ret < 0)
2114                 goto done_free;
2115
2116         list_for_each_entry(snap, &rbd_dev->snaps, node) {
2117                 ret = rbd_register_snap_dev(rbd_dev, snap,
2118                                              &rbd_dev->dev);
2119                 if (ret < 0)
2120                         break;
2121         }
2122
2123         mutex_unlock(&ctl_mutex);
2124         return 0;
2125 done_free:
2126         mutex_unlock(&ctl_mutex);
2127         return ret;
2128 }
2129
2130 static void rbd_bus_del_dev(struct rbd_device *rbd_dev)
2131 {
2132         device_unregister(&rbd_dev->dev);
2133 }
2134
2135 static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
2136 {
2137         int ret, rc;
2138
2139         do {
2140                 ret = rbd_req_sync_watch(rbd_dev, rbd_dev->obj_md_name,
2141                                          rbd_dev->header.obj_version);
2142                 if (ret == -ERANGE) {
2143                         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2144                         rc = __rbd_update_snaps(rbd_dev);
2145                         mutex_unlock(&ctl_mutex);
2146                         if (rc < 0)
2147                                 return rc;
2148                 }
2149         } while (ret == -ERANGE);
2150
2151         return ret;
2152 }
2153
2154 static atomic64_t rbd_id_max = ATOMIC64_INIT(0);
2155
2156 /*
2157  * Get a unique rbd identifier.  The minimum rbd id is 1.
2158  */
2159 static int rbd_id_get(void)
2160 {
2161         return atomic64_inc_return(&rbd_id_max);
2162 }
2163
2164 /*
2165  * Record that an rbd identifier is no longer in use.
2166  */
2167 static void rbd_id_put(int rbd_id)
2168 {
2169         BUG_ON(rbd_id < 1);
2170
2171         /*
2172          * New id's are always one more than the current maximum.
2173          * If the id being "put" *is* that maximum, decrement the
2174          * maximum so the next one requested just reuses this one.
2175          */
2176         atomic64_cmpxchg(&rbd_id_max, rbd_id, rbd_id - 1);
2177 }
2178
2179 static ssize_t rbd_add(struct bus_type *bus,
2180                        const char *buf,
2181                        size_t count)
2182 {
2183         struct ceph_osd_client *osdc;
2184         struct rbd_device *rbd_dev;
2185         ssize_t rc = -ENOMEM;
2186         int irc;
2187         char *mon_dev_name;
2188         char *options;
2189
2190         if (!try_module_get(THIS_MODULE))
2191                 return -ENODEV;
2192
2193         mon_dev_name = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL);
2194         if (!mon_dev_name)
2195                 goto err_out_mod;
2196
2197         options = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL);
2198         if (!options)
2199                 goto err_mon_dev;
2200
2201         /* new rbd_device object */
2202         rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
2203         if (!rbd_dev)
2204                 goto err_out_opt;
2205
2206         /* static rbd_device initialization */
2207         spin_lock_init(&rbd_dev->lock);
2208         INIT_LIST_HEAD(&rbd_dev->node);
2209         INIT_LIST_HEAD(&rbd_dev->snaps);
2210
2211         init_rwsem(&rbd_dev->header.snap_rwsem);
2212
2213         /* generate unique id: one more than highest used so far */
2214         rbd_dev->id = rbd_id_get();
2215
2216         /* add to global list */
2217         spin_lock(&rbd_dev_list_lock);
2218         list_add_tail(&rbd_dev->node, &rbd_dev_list);
2219         spin_unlock(&rbd_dev_list_lock);
2220
2221         /* parse add command */
2222         if (sscanf(buf, "%" __stringify(RBD_MAX_OPT_LEN) "s "
2223                    "%" __stringify(RBD_MAX_OPT_LEN) "s "
2224                    "%" __stringify(RBD_MAX_POOL_NAME_LEN) "s "
2225                    "%" __stringify(RBD_MAX_OBJ_NAME_LEN) "s"
2226                    "%" __stringify(RBD_MAX_SNAP_NAME_LEN) "s",
2227                    mon_dev_name, options, rbd_dev->pool_name,
2228                    rbd_dev->obj, rbd_dev->snap_name) < 4) {
2229                 rc = -EINVAL;
2230                 goto err_out_slot;
2231         }
2232
2233         if (rbd_dev->snap_name[0] == 0)
2234                 memcpy(rbd_dev->snap_name, RBD_SNAP_HEAD_NAME,
2235                         sizeof (RBD_SNAP_HEAD_NAME));
2236
2237         rbd_dev->obj_len = strlen(rbd_dev->obj);
2238         snprintf(rbd_dev->obj_md_name, sizeof(rbd_dev->obj_md_name), "%s%s",
2239                  rbd_dev->obj, RBD_SUFFIX);
2240
2241         /* initialize rest of new object */
2242         snprintf(rbd_dev->name, DEV_NAME_LEN, DRV_NAME "%d", rbd_dev->id);
2243
2244         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2245         rc = rbd_get_client(rbd_dev, mon_dev_name, options);
2246         mutex_unlock(&ctl_mutex);
2247
2248         if (rc < 0)
2249                 goto err_out_slot;
2250
2251         /* pick the pool */
2252         osdc = &rbd_dev->rbd_client->client->osdc;
2253         rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
2254         if (rc < 0)
2255                 goto err_out_client;
2256         rbd_dev->poolid = rc;
2257
2258         /* register our block device */
2259         irc = register_blkdev(0, rbd_dev->name);
2260         if (irc < 0) {
2261                 rc = irc;
2262                 goto err_out_client;
2263         }
2264         rbd_dev->major = irc;
2265
2266         rc = rbd_bus_add_dev(rbd_dev);
2267         if (rc)
2268                 goto err_out_blkdev;
2269
2270         /* set up and announce blkdev mapping */
2271         rc = rbd_init_disk(rbd_dev);
2272         if (rc)
2273                 goto err_out_bus;
2274
2275         rc = rbd_init_watch_dev(rbd_dev);
2276         if (rc)
2277                 goto err_out_bus;
2278
2279         return count;
2280
2281 err_out_bus:
2282         spin_lock(&rbd_dev_list_lock);
2283         list_del_init(&rbd_dev->node);
2284         spin_unlock(&rbd_dev_list_lock);
2285         rbd_id_put(target_id);
2286
2287         /* this will also clean up rest of rbd_dev stuff */
2288
2289         rbd_bus_del_dev(rbd_dev);
2290         kfree(options);
2291         kfree(mon_dev_name);
2292         return rc;
2293
2294 err_out_blkdev:
2295         unregister_blkdev(rbd_dev->major, rbd_dev->name);
2296 err_out_client:
2297         rbd_put_client(rbd_dev);
2298 err_out_slot:
2299         spin_lock(&rbd_dev_list_lock);
2300         list_del_init(&rbd_dev->node);
2301         spin_unlock(&rbd_dev_list_lock);
2302         rbd_id_put(target_id);
2303
2304         kfree(rbd_dev);
2305 err_out_opt:
2306         kfree(options);
2307 err_mon_dev:
2308         kfree(mon_dev_name);
2309 err_out_mod:
2310         dout("Error adding device %s\n", buf);
2311         module_put(THIS_MODULE);
2312         return rc;
2313 }
2314
2315 static struct rbd_device *__rbd_get_dev(unsigned long id)
2316 {
2317         struct list_head *tmp;
2318         struct rbd_device *rbd_dev;
2319
2320         spin_lock(&rbd_dev_list_lock);
2321         list_for_each(tmp, &rbd_dev_list) {
2322                 rbd_dev = list_entry(tmp, struct rbd_device, node);
2323                 if (rbd_dev->id == id) {
2324                         spin_unlock(&rbd_dev_list_lock);
2325                         return rbd_dev;
2326                 }
2327         }
2328         spin_unlock(&rbd_dev_list_lock);
2329         return NULL;
2330 }
2331
2332 static void rbd_dev_release(struct device *dev)
2333 {
2334         struct rbd_device *rbd_dev =
2335                         container_of(dev, struct rbd_device, dev);
2336
2337         if (rbd_dev->watch_request) {
2338                 struct ceph_client *client = rbd_dev->rbd_client->client;
2339
2340                 ceph_osdc_unregister_linger_request(&client->osdc,
2341                                                     rbd_dev->watch_request);
2342         }
2343         if (rbd_dev->watch_event)
2344                 rbd_req_sync_unwatch(rbd_dev, rbd_dev->obj_md_name);
2345
2346         rbd_put_client(rbd_dev);
2347
2348         /* clean up and free blkdev */
2349         rbd_free_disk(rbd_dev);
2350         unregister_blkdev(rbd_dev->major, rbd_dev->name);
2351         kfree(rbd_dev);
2352
2353         /* release module ref */
2354         module_put(THIS_MODULE);
2355 }
2356
2357 static ssize_t rbd_remove(struct bus_type *bus,
2358                           const char *buf,
2359                           size_t count)
2360 {
2361         struct rbd_device *rbd_dev = NULL;
2362         int target_id, rc;
2363         unsigned long ul;
2364         int ret = count;
2365
2366         rc = strict_strtoul(buf, 10, &ul);
2367         if (rc)
2368                 return rc;
2369
2370         /* convert to int; abort if we lost anything in the conversion */
2371         target_id = (int) ul;
2372         if (target_id != ul)
2373                 return -EINVAL;
2374
2375         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2376
2377         rbd_dev = __rbd_get_dev(target_id);
2378         if (!rbd_dev) {
2379                 ret = -ENOENT;
2380                 goto done;
2381         }
2382
2383         spin_lock(&rbd_dev_list_lock);
2384         list_del_init(&rbd_dev->node);
2385         spin_unlock(&rbd_dev_list_lock);
2386
2387         rbd_id_put(target_id);
2388
2389         __rbd_remove_all_snaps(rbd_dev);
2390         rbd_bus_del_dev(rbd_dev);
2391
2392 done:
2393         mutex_unlock(&ctl_mutex);
2394         return ret;
2395 }
2396
2397 static ssize_t rbd_snap_add(struct device *dev,
2398                             struct device_attribute *attr,
2399                             const char *buf,
2400                             size_t count)
2401 {
2402         struct rbd_device *rbd_dev = dev_to_rbd(dev);
2403         int ret;
2404         char *name = kmalloc(count + 1, GFP_KERNEL);
2405         if (!name)
2406                 return -ENOMEM;
2407
2408         snprintf(name, count, "%s", buf);
2409
2410         mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
2411
2412         ret = rbd_header_add_snap(rbd_dev,
2413                                   name, GFP_KERNEL);
2414         if (ret < 0)
2415                 goto err_unlock;
2416
2417         ret = __rbd_update_snaps(rbd_dev);
2418         if (ret < 0)
2419                 goto err_unlock;
2420
2421         /* shouldn't hold ctl_mutex when notifying.. notify might
2422            trigger a watch callback that would need to get that mutex */
2423         mutex_unlock(&ctl_mutex);
2424
2425         /* make a best effort, don't error if failed */
2426         rbd_req_sync_notify(rbd_dev, rbd_dev->obj_md_name);
2427
2428         ret = count;
2429         kfree(name);
2430         return ret;
2431
2432 err_unlock:
2433         mutex_unlock(&ctl_mutex);
2434         kfree(name);
2435         return ret;
2436 }
2437
2438 static struct bus_attribute rbd_bus_attrs[] = {
2439         __ATTR(add, S_IWUSR, NULL, rbd_add),
2440         __ATTR(remove, S_IWUSR, NULL, rbd_remove),
2441         __ATTR_NULL
2442 };
2443
2444 /*
2445  * create control files in sysfs
2446  * /sys/bus/rbd/...
2447  */
2448 static int rbd_sysfs_init(void)
2449 {
2450         int ret;
2451
2452         rbd_bus_type.bus_attrs = rbd_bus_attrs;
2453
2454         ret = bus_register(&rbd_bus_type);
2455         if (ret < 0)
2456                 return ret;
2457
2458         ret = device_register(&rbd_root_dev);
2459
2460         return ret;
2461 }
2462
2463 static void rbd_sysfs_cleanup(void)
2464 {
2465         device_unregister(&rbd_root_dev);
2466         bus_unregister(&rbd_bus_type);
2467 }
2468
2469 int __init rbd_init(void)
2470 {
2471         int rc;
2472
2473         rc = rbd_sysfs_init();
2474         if (rc)
2475                 return rc;
2476         pr_info("loaded " DRV_NAME_LONG "\n");
2477         return 0;
2478 }
2479
2480 void __exit rbd_exit(void)
2481 {
2482         rbd_sysfs_cleanup();
2483 }
2484
2485 module_init(rbd_init);
2486 module_exit(rbd_exit);
2487
2488 MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
2489 MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
2490 MODULE_DESCRIPTION("rados block device");
2491
2492 /* following authorship retained from original osdblk.c */
2493 MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
2494
2495 MODULE_LICENSE("GPL");