drbd: Fixes from the drbd-8.3 branch
[cascardo/linux.git] / drivers / block / drbd / drbd_main.c
1 /*
2    drbd.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    Thanks to Carter Burden, Bart Grantham and Gennadiy Nerubayev
11    from Logicworks, Inc. for making SDP replication support possible.
12
13    drbd is free software; you can redistribute it and/or modify
14    it under the terms of the GNU General Public License as published by
15    the Free Software Foundation; either version 2, or (at your option)
16    any later version.
17
18    drbd is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21    GNU General Public License for more details.
22
23    You should have received a copy of the GNU General Public License
24    along with drbd; see the file COPYING.  If not, write to
25    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
26
27  */
28
29 #include <linux/module.h>
30 #include <linux/drbd.h>
31 #include <asm/uaccess.h>
32 #include <asm/types.h>
33 #include <net/sock.h>
34 #include <linux/ctype.h>
35 #include <linux/mutex.h>
36 #include <linux/fs.h>
37 #include <linux/file.h>
38 #include <linux/proc_fs.h>
39 #include <linux/init.h>
40 #include <linux/mm.h>
41 #include <linux/memcontrol.h>
42 #include <linux/mm_inline.h>
43 #include <linux/slab.h>
44 #include <linux/random.h>
45 #include <linux/reboot.h>
46 #include <linux/notifier.h>
47 #include <linux/kthread.h>
48
49 #define __KERNEL_SYSCALLS__
50 #include <linux/unistd.h>
51 #include <linux/vmalloc.h>
52
53 #include <linux/drbd_limits.h>
54 #include "drbd_int.h"
55 #include "drbd_req.h" /* only for _req_mod in tl_release and tl_clear */
56
57 #include "drbd_vli.h"
58
59 static DEFINE_MUTEX(drbd_main_mutex);
60 int drbdd_init(struct drbd_thread *);
61 int drbd_worker(struct drbd_thread *);
62 int drbd_asender(struct drbd_thread *);
63
64 int drbd_init(void);
65 static int drbd_open(struct block_device *bdev, fmode_t mode);
66 static int drbd_release(struct gendisk *gd, fmode_t mode);
67 static int w_md_sync(struct drbd_work *w, int unused);
68 static void md_sync_timer_fn(unsigned long data);
69 static int w_bitmap_io(struct drbd_work *w, int unused);
70 static int w_go_diskless(struct drbd_work *w, int unused);
71
72 MODULE_AUTHOR("Philipp Reisner <phil@linbit.com>, "
73               "Lars Ellenberg <lars@linbit.com>");
74 MODULE_DESCRIPTION("drbd - Distributed Replicated Block Device v" REL_VERSION);
75 MODULE_VERSION(REL_VERSION);
76 MODULE_LICENSE("GPL");
77 MODULE_PARM_DESC(minor_count, "Approximate number of drbd devices ("
78                  __stringify(DRBD_MINOR_COUNT_MIN) "-" __stringify(DRBD_MINOR_COUNT_MAX) ")");
79 MODULE_ALIAS_BLOCKDEV_MAJOR(DRBD_MAJOR);
80
81 #include <linux/moduleparam.h>
82 /* allow_open_on_secondary */
83 MODULE_PARM_DESC(allow_oos, "DONT USE!");
84 /* thanks to these macros, if compiled into the kernel (not-module),
85  * this becomes the boot parameter drbd.minor_count */
86 module_param(minor_count, uint, 0444);
87 module_param(disable_sendpage, bool, 0644);
88 module_param(allow_oos, bool, 0);
89 module_param(proc_details, int, 0644);
90
91 #ifdef CONFIG_DRBD_FAULT_INJECTION
92 int enable_faults;
93 int fault_rate;
94 static int fault_count;
95 int fault_devs;
96 /* bitmap of enabled faults */
97 module_param(enable_faults, int, 0664);
98 /* fault rate % value - applies to all enabled faults */
99 module_param(fault_rate, int, 0664);
100 /* count of faults inserted */
101 module_param(fault_count, int, 0664);
102 /* bitmap of devices to insert faults on */
103 module_param(fault_devs, int, 0644);
104 #endif
105
106 /* module parameter, defined */
107 unsigned int minor_count = DRBD_MINOR_COUNT_DEF;
108 int disable_sendpage;
109 int allow_oos;
110 int proc_details;       /* Detail level in proc drbd*/
111
112 /* Module parameter for setting the user mode helper program
113  * to run. Default is /sbin/drbdadm */
114 char usermode_helper[80] = "/sbin/drbdadm";
115
116 module_param_string(usermode_helper, usermode_helper, sizeof(usermode_helper), 0644);
117
118 /* in 2.6.x, our device mapping and config info contains our virtual gendisks
119  * as member "struct gendisk *vdisk;"
120  */
121 struct idr minors;
122 struct list_head drbd_tconns;  /* list of struct drbd_tconn */
123
124 struct kmem_cache *drbd_request_cache;
125 struct kmem_cache *drbd_ee_cache;       /* peer requests */
126 struct kmem_cache *drbd_bm_ext_cache;   /* bitmap extents */
127 struct kmem_cache *drbd_al_ext_cache;   /* activity log extents */
128 mempool_t *drbd_request_mempool;
129 mempool_t *drbd_ee_mempool;
130 mempool_t *drbd_md_io_page_pool;
131 struct bio_set *drbd_md_io_bio_set;
132
133 /* I do not use a standard mempool, because:
134    1) I want to hand out the pre-allocated objects first.
135    2) I want to be able to interrupt sleeping allocation with a signal.
136    Note: This is a single linked list, the next pointer is the private
137          member of struct page.
138  */
139 struct page *drbd_pp_pool;
140 spinlock_t   drbd_pp_lock;
141 int          drbd_pp_vacant;
142 wait_queue_head_t drbd_pp_wait;
143
144 DEFINE_RATELIMIT_STATE(drbd_ratelimit_state, 5 * HZ, 5);
145
146 static const struct block_device_operations drbd_ops = {
147         .owner =   THIS_MODULE,
148         .open =    drbd_open,
149         .release = drbd_release,
150 };
151
152 static void bio_destructor_drbd(struct bio *bio)
153 {
154         bio_free(bio, drbd_md_io_bio_set);
155 }
156
157 struct bio *bio_alloc_drbd(gfp_t gfp_mask)
158 {
159         struct bio *bio;
160
161         if (!drbd_md_io_bio_set)
162                 return bio_alloc(gfp_mask, 1);
163
164         bio = bio_alloc_bioset(gfp_mask, 1, drbd_md_io_bio_set);
165         if (!bio)
166                 return NULL;
167         bio->bi_destructor = bio_destructor_drbd;
168         return bio;
169 }
170
171 #ifdef __CHECKER__
172 /* When checking with sparse, and this is an inline function, sparse will
173    give tons of false positives. When this is a real functions sparse works.
174  */
175 int _get_ldev_if_state(struct drbd_conf *mdev, enum drbd_disk_state mins)
176 {
177         int io_allowed;
178
179         atomic_inc(&mdev->local_cnt);
180         io_allowed = (mdev->state.disk >= mins);
181         if (!io_allowed) {
182                 if (atomic_dec_and_test(&mdev->local_cnt))
183                         wake_up(&mdev->misc_wait);
184         }
185         return io_allowed;
186 }
187
188 #endif
189
190 /**
191  * DOC: The transfer log
192  *
193  * The transfer log is a single linked list of &struct drbd_tl_epoch objects.
194  * mdev->tconn->newest_tle points to the head, mdev->tconn->oldest_tle points to the tail
195  * of the list. There is always at least one &struct drbd_tl_epoch object.
196  *
197  * Each &struct drbd_tl_epoch has a circular double linked list of requests
198  * attached.
199  */
200 static int tl_init(struct drbd_tconn *tconn)
201 {
202         struct drbd_tl_epoch *b;
203
204         /* during device minor initialization, we may well use GFP_KERNEL */
205         b = kmalloc(sizeof(struct drbd_tl_epoch), GFP_KERNEL);
206         if (!b)
207                 return 0;
208         INIT_LIST_HEAD(&b->requests);
209         INIT_LIST_HEAD(&b->w.list);
210         b->next = NULL;
211         b->br_number = 4711;
212         b->n_writes = 0;
213         b->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
214
215         tconn->oldest_tle = b;
216         tconn->newest_tle = b;
217         INIT_LIST_HEAD(&tconn->out_of_sequence_requests);
218         INIT_LIST_HEAD(&tconn->barrier_acked_requests);
219
220         return 1;
221 }
222
223 static void tl_cleanup(struct drbd_tconn *tconn)
224 {
225         if (tconn->oldest_tle != tconn->newest_tle)
226                 conn_err(tconn, "ASSERT FAILED: oldest_tle == newest_tle\n");
227         if (!list_empty(&tconn->out_of_sequence_requests))
228                 conn_err(tconn, "ASSERT FAILED: list_empty(out_of_sequence_requests)\n");
229         kfree(tconn->oldest_tle);
230         tconn->oldest_tle = NULL;
231         kfree(tconn->unused_spare_tle);
232         tconn->unused_spare_tle = NULL;
233 }
234
235 /**
236  * _tl_add_barrier() - Adds a barrier to the transfer log
237  * @mdev:       DRBD device.
238  * @new:        Barrier to be added before the current head of the TL.
239  *
240  * The caller must hold the req_lock.
241  */
242 void _tl_add_barrier(struct drbd_tconn *tconn, struct drbd_tl_epoch *new)
243 {
244         struct drbd_tl_epoch *newest_before;
245
246         INIT_LIST_HEAD(&new->requests);
247         INIT_LIST_HEAD(&new->w.list);
248         new->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
249         new->next = NULL;
250         new->n_writes = 0;
251
252         newest_before = tconn->newest_tle;
253         /* never send a barrier number == 0, because that is special-cased
254          * when using TCQ for our write ordering code */
255         new->br_number = (newest_before->br_number+1) ?: 1;
256         if (tconn->newest_tle != new) {
257                 tconn->newest_tle->next = new;
258                 tconn->newest_tle = new;
259         }
260 }
261
262 /**
263  * tl_release() - Free or recycle the oldest &struct drbd_tl_epoch object of the TL
264  * @mdev:       DRBD device.
265  * @barrier_nr: Expected identifier of the DRBD write barrier packet.
266  * @set_size:   Expected number of requests before that barrier.
267  *
268  * In case the passed barrier_nr or set_size does not match the oldest
269  * &struct drbd_tl_epoch objects this function will cause a termination
270  * of the connection.
271  */
272 void tl_release(struct drbd_tconn *tconn, unsigned int barrier_nr,
273                 unsigned int set_size)
274 {
275         struct drbd_conf *mdev;
276         struct drbd_tl_epoch *b, *nob; /* next old barrier */
277         struct list_head *le, *tle;
278         struct drbd_request *r;
279
280         spin_lock_irq(&tconn->req_lock);
281
282         b = tconn->oldest_tle;
283
284         /* first some paranoia code */
285         if (b == NULL) {
286                 conn_err(tconn, "BAD! BarrierAck #%u received, but no epoch in tl!?\n",
287                          barrier_nr);
288                 goto bail;
289         }
290         if (b->br_number != barrier_nr) {
291                 conn_err(tconn, "BAD! BarrierAck #%u received, expected #%u!\n",
292                          barrier_nr, b->br_number);
293                 goto bail;
294         }
295         if (b->n_writes != set_size) {
296                 conn_err(tconn, "BAD! BarrierAck #%u received with n_writes=%u, expected n_writes=%u!\n",
297                          barrier_nr, set_size, b->n_writes);
298                 goto bail;
299         }
300
301         /* Clean up list of requests processed during current epoch */
302         list_for_each_safe(le, tle, &b->requests) {
303                 r = list_entry(le, struct drbd_request, tl_requests);
304                 _req_mod(r, BARRIER_ACKED);
305         }
306         /* There could be requests on the list waiting for completion
307            of the write to the local disk. To avoid corruptions of
308            slab's data structures we have to remove the lists head.
309
310            Also there could have been a barrier ack out of sequence, overtaking
311            the write acks - which would be a bug and violating write ordering.
312            To not deadlock in case we lose connection while such requests are
313            still pending, we need some way to find them for the
314            _req_mode(CONNECTION_LOST_WHILE_PENDING).
315
316            These have been list_move'd to the out_of_sequence_requests list in
317            _req_mod(, BARRIER_ACKED) above.
318            */
319         list_splice_init(&b->requests, &tconn->barrier_acked_requests);
320         mdev = b->w.mdev;
321
322         nob = b->next;
323         if (test_and_clear_bit(CREATE_BARRIER, &mdev->flags)) {
324                 _tl_add_barrier(tconn, b);
325                 if (nob)
326                         tconn->oldest_tle = nob;
327                 /* if nob == NULL b was the only barrier, and becomes the new
328                    barrier. Therefore tconn->oldest_tle points already to b */
329         } else {
330                 D_ASSERT(nob != NULL);
331                 tconn->oldest_tle = nob;
332                 kfree(b);
333         }
334
335         spin_unlock_irq(&tconn->req_lock);
336         dec_ap_pending(mdev);
337
338         return;
339
340 bail:
341         spin_unlock_irq(&tconn->req_lock);
342         conn_request_state(tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
343 }
344
345
346 /**
347  * _tl_restart() - Walks the transfer log, and applies an action to all requests
348  * @mdev:       DRBD device.
349  * @what:       The action/event to perform with all request objects
350  *
351  * @what might be one of CONNECTION_LOST_WHILE_PENDING, RESEND, FAIL_FROZEN_DISK_IO,
352  * RESTART_FROZEN_DISK_IO.
353  */
354 void _tl_restart(struct drbd_tconn *tconn, enum drbd_req_event what)
355 {
356         struct drbd_tl_epoch *b, *tmp, **pn;
357         struct list_head *le, *tle, carry_reads;
358         struct drbd_request *req;
359         int rv, n_writes, n_reads;
360
361         b = tconn->oldest_tle;
362         pn = &tconn->oldest_tle;
363         while (b) {
364                 n_writes = 0;
365                 n_reads = 0;
366                 INIT_LIST_HEAD(&carry_reads);
367                 list_for_each_safe(le, tle, &b->requests) {
368                         req = list_entry(le, struct drbd_request, tl_requests);
369                         rv = _req_mod(req, what);
370
371                         if (rv & MR_WRITE)
372                                 n_writes++;
373                         if (rv & MR_READ)
374                                 n_reads++;
375                 }
376                 tmp = b->next;
377
378                 if (n_writes) {
379                         if (what == RESEND) {
380                                 b->n_writes = n_writes;
381                                 if (b->w.cb == NULL) {
382                                         b->w.cb = w_send_barrier;
383                                         inc_ap_pending(b->w.mdev);
384                                         set_bit(CREATE_BARRIER, &b->w.mdev->flags);
385                                 }
386
387                                 drbd_queue_work(&tconn->data.work, &b->w);
388                         }
389                         pn = &b->next;
390                 } else {
391                         if (n_reads)
392                                 list_add(&carry_reads, &b->requests);
393                         /* there could still be requests on that ring list,
394                          * in case local io is still pending */
395                         list_del(&b->requests);
396
397                         /* dec_ap_pending corresponding to queue_barrier.
398                          * the newest barrier may not have been queued yet,
399                          * in which case w.cb is still NULL. */
400                         if (b->w.cb != NULL)
401                                 dec_ap_pending(b->w.mdev);
402
403                         if (b == tconn->newest_tle) {
404                                 /* recycle, but reinit! */
405                                 if (tmp != NULL)
406                                         conn_err(tconn, "ASSERT FAILED tmp == NULL");
407                                 INIT_LIST_HEAD(&b->requests);
408                                 list_splice(&carry_reads, &b->requests);
409                                 INIT_LIST_HEAD(&b->w.list);
410                                 b->w.cb = NULL;
411                                 b->br_number = net_random();
412                                 b->n_writes = 0;
413
414                                 *pn = b;
415                                 break;
416                         }
417                         *pn = tmp;
418                         kfree(b);
419                 }
420                 b = tmp;
421                 list_splice(&carry_reads, &b->requests);
422         }
423
424         /* Actions operating on the disk state, also want to work on
425            requests that got barrier acked. */
426         switch (what) {
427         case FAIL_FROZEN_DISK_IO:
428         case RESTART_FROZEN_DISK_IO:
429                 list_for_each_safe(le, tle, &tconn->barrier_acked_requests) {
430                         req = list_entry(le, struct drbd_request, tl_requests);
431                         _req_mod(req, what);
432                 }
433         case CONNECTION_LOST_WHILE_PENDING:
434         case RESEND:
435                 break;
436         default:
437                 conn_err(tconn, "what = %d in _tl_restart()\n", what);
438         }
439 }
440
441 /**
442  * tl_clear() - Clears all requests and &struct drbd_tl_epoch objects out of the TL
443  * @mdev:       DRBD device.
444  *
445  * This is called after the connection to the peer was lost. The storage covered
446  * by the requests on the transfer gets marked as our of sync. Called from the
447  * receiver thread and the worker thread.
448  */
449 void tl_clear(struct drbd_tconn *tconn)
450 {
451         struct drbd_conf *mdev;
452         struct list_head *le, *tle;
453         struct drbd_request *r;
454         int vnr;
455
456         spin_lock_irq(&tconn->req_lock);
457
458         _tl_restart(tconn, CONNECTION_LOST_WHILE_PENDING);
459
460         /* we expect this list to be empty. */
461         if (!list_empty(&tconn->out_of_sequence_requests))
462                 conn_err(tconn, "ASSERT FAILED list_empty(&out_of_sequence_requests)\n");
463
464         /* but just in case, clean it up anyways! */
465         list_for_each_safe(le, tle, &tconn->out_of_sequence_requests) {
466                 r = list_entry(le, struct drbd_request, tl_requests);
467                 /* It would be nice to complete outside of spinlock.
468                  * But this is easier for now. */
469                 _req_mod(r, CONNECTION_LOST_WHILE_PENDING);
470         }
471
472         /* ensure bit indicating barrier is required is clear */
473         rcu_read_lock();
474         idr_for_each_entry(&tconn->volumes, mdev, vnr)
475                 clear_bit(CREATE_BARRIER, &mdev->flags);
476         rcu_read_unlock();
477
478         spin_unlock_irq(&tconn->req_lock);
479 }
480
481 void tl_restart(struct drbd_tconn *tconn, enum drbd_req_event what)
482 {
483         spin_lock_irq(&tconn->req_lock);
484         _tl_restart(tconn, what);
485         spin_unlock_irq(&tconn->req_lock);
486 }
487
488 /**
489  * tl_abort_disk_io() - Abort disk I/O for all requests for a certain mdev in the TL
490  * @mdev:       DRBD device.
491  */
492 void tl_abort_disk_io(struct drbd_conf *mdev)
493 {
494         struct drbd_tconn *tconn = mdev->tconn;
495         struct drbd_tl_epoch *b;
496         struct list_head *le, *tle;
497         struct drbd_request *req;
498
499         spin_lock_irq(&tconn->req_lock);
500         b = tconn->oldest_tle;
501         while (b) {
502                 list_for_each_safe(le, tle, &b->requests) {
503                         req = list_entry(le, struct drbd_request, tl_requests);
504                         if (!(req->rq_state & RQ_LOCAL_PENDING))
505                                 continue;
506                         if (req->w.mdev == mdev)
507                                 _req_mod(req, ABORT_DISK_IO);
508                 }
509                 b = b->next;
510         }
511
512         list_for_each_safe(le, tle, &tconn->barrier_acked_requests) {
513                 req = list_entry(le, struct drbd_request, tl_requests);
514                 if (!(req->rq_state & RQ_LOCAL_PENDING))
515                         continue;
516                 if (req->w.mdev == mdev)
517                         _req_mod(req, ABORT_DISK_IO);
518         }
519
520         spin_unlock_irq(&tconn->req_lock);
521 }
522
523 static int drbd_thread_setup(void *arg)
524 {
525         struct drbd_thread *thi = (struct drbd_thread *) arg;
526         struct drbd_tconn *tconn = thi->tconn;
527         unsigned long flags;
528         int retval;
529
530         snprintf(current->comm, sizeof(current->comm), "drbd_%c_%s",
531                  thi->name[0], thi->tconn->name);
532
533 restart:
534         retval = thi->function(thi);
535
536         spin_lock_irqsave(&thi->t_lock, flags);
537
538         /* if the receiver has been "EXITING", the last thing it did
539          * was set the conn state to "StandAlone",
540          * if now a re-connect request comes in, conn state goes C_UNCONNECTED,
541          * and receiver thread will be "started".
542          * drbd_thread_start needs to set "RESTARTING" in that case.
543          * t_state check and assignment needs to be within the same spinlock,
544          * so either thread_start sees EXITING, and can remap to RESTARTING,
545          * or thread_start see NONE, and can proceed as normal.
546          */
547
548         if (thi->t_state == RESTARTING) {
549                 conn_info(tconn, "Restarting %s thread\n", thi->name);
550                 thi->t_state = RUNNING;
551                 spin_unlock_irqrestore(&thi->t_lock, flags);
552                 goto restart;
553         }
554
555         thi->task = NULL;
556         thi->t_state = NONE;
557         smp_mb();
558         complete_all(&thi->stop);
559         spin_unlock_irqrestore(&thi->t_lock, flags);
560
561         conn_info(tconn, "Terminating %s\n", current->comm);
562
563         /* Release mod reference taken when thread was started */
564
565         kref_put(&tconn->kref, &conn_destroy);
566         module_put(THIS_MODULE);
567         return retval;
568 }
569
570 static void drbd_thread_init(struct drbd_tconn *tconn, struct drbd_thread *thi,
571                              int (*func) (struct drbd_thread *), char *name)
572 {
573         spin_lock_init(&thi->t_lock);
574         thi->task    = NULL;
575         thi->t_state = NONE;
576         thi->function = func;
577         thi->tconn = tconn;
578         strncpy(thi->name, name, ARRAY_SIZE(thi->name));
579 }
580
581 int drbd_thread_start(struct drbd_thread *thi)
582 {
583         struct drbd_tconn *tconn = thi->tconn;
584         struct task_struct *nt;
585         unsigned long flags;
586
587         /* is used from state engine doing drbd_thread_stop_nowait,
588          * while holding the req lock irqsave */
589         spin_lock_irqsave(&thi->t_lock, flags);
590
591         switch (thi->t_state) {
592         case NONE:
593                 conn_info(tconn, "Starting %s thread (from %s [%d])\n",
594                          thi->name, current->comm, current->pid);
595
596                 /* Get ref on module for thread - this is released when thread exits */
597                 if (!try_module_get(THIS_MODULE)) {
598                         conn_err(tconn, "Failed to get module reference in drbd_thread_start\n");
599                         spin_unlock_irqrestore(&thi->t_lock, flags);
600                         return false;
601                 }
602
603                 kref_get(&thi->tconn->kref);
604
605                 init_completion(&thi->stop);
606                 thi->reset_cpu_mask = 1;
607                 thi->t_state = RUNNING;
608                 spin_unlock_irqrestore(&thi->t_lock, flags);
609                 flush_signals(current); /* otherw. may get -ERESTARTNOINTR */
610
611                 nt = kthread_create(drbd_thread_setup, (void *) thi,
612                                     "drbd_%c_%s", thi->name[0], thi->tconn->name);
613
614                 if (IS_ERR(nt)) {
615                         conn_err(tconn, "Couldn't start thread\n");
616
617                         kref_put(&tconn->kref, &conn_destroy);
618                         module_put(THIS_MODULE);
619                         return false;
620                 }
621                 spin_lock_irqsave(&thi->t_lock, flags);
622                 thi->task = nt;
623                 thi->t_state = RUNNING;
624                 spin_unlock_irqrestore(&thi->t_lock, flags);
625                 wake_up_process(nt);
626                 break;
627         case EXITING:
628                 thi->t_state = RESTARTING;
629                 conn_info(tconn, "Restarting %s thread (from %s [%d])\n",
630                                 thi->name, current->comm, current->pid);
631                 /* fall through */
632         case RUNNING:
633         case RESTARTING:
634         default:
635                 spin_unlock_irqrestore(&thi->t_lock, flags);
636                 break;
637         }
638
639         return true;
640 }
641
642
643 void _drbd_thread_stop(struct drbd_thread *thi, int restart, int wait)
644 {
645         unsigned long flags;
646
647         enum drbd_thread_state ns = restart ? RESTARTING : EXITING;
648
649         /* may be called from state engine, holding the req lock irqsave */
650         spin_lock_irqsave(&thi->t_lock, flags);
651
652         if (thi->t_state == NONE) {
653                 spin_unlock_irqrestore(&thi->t_lock, flags);
654                 if (restart)
655                         drbd_thread_start(thi);
656                 return;
657         }
658
659         if (thi->t_state != ns) {
660                 if (thi->task == NULL) {
661                         spin_unlock_irqrestore(&thi->t_lock, flags);
662                         return;
663                 }
664
665                 thi->t_state = ns;
666                 smp_mb();
667                 init_completion(&thi->stop);
668                 if (thi->task != current)
669                         force_sig(DRBD_SIGKILL, thi->task);
670         }
671
672         spin_unlock_irqrestore(&thi->t_lock, flags);
673
674         if (wait)
675                 wait_for_completion(&thi->stop);
676 }
677
678 static struct drbd_thread *drbd_task_to_thread(struct drbd_tconn *tconn, struct task_struct *task)
679 {
680         struct drbd_thread *thi =
681                 task == tconn->receiver.task ? &tconn->receiver :
682                 task == tconn->asender.task  ? &tconn->asender :
683                 task == tconn->worker.task   ? &tconn->worker : NULL;
684
685         return thi;
686 }
687
688 char *drbd_task_to_thread_name(struct drbd_tconn *tconn, struct task_struct *task)
689 {
690         struct drbd_thread *thi = drbd_task_to_thread(tconn, task);
691         return thi ? thi->name : task->comm;
692 }
693
694 int conn_lowest_minor(struct drbd_tconn *tconn)
695 {
696         struct drbd_conf *mdev;
697         int vnr = 0, m;
698
699         rcu_read_lock();
700         mdev = idr_get_next(&tconn->volumes, &vnr);
701         m = mdev ? mdev_to_minor(mdev) : -1;
702         rcu_read_unlock();
703
704         return m;
705 }
706
707 #ifdef CONFIG_SMP
708 /**
709  * drbd_calc_cpu_mask() - Generate CPU masks, spread over all CPUs
710  * @mdev:       DRBD device.
711  *
712  * Forces all threads of a device onto the same CPU. This is beneficial for
713  * DRBD's performance. May be overwritten by user's configuration.
714  */
715 void drbd_calc_cpu_mask(struct drbd_tconn *tconn)
716 {
717         int ord, cpu;
718
719         /* user override. */
720         if (cpumask_weight(tconn->cpu_mask))
721                 return;
722
723         ord = conn_lowest_minor(tconn) % cpumask_weight(cpu_online_mask);
724         for_each_online_cpu(cpu) {
725                 if (ord-- == 0) {
726                         cpumask_set_cpu(cpu, tconn->cpu_mask);
727                         return;
728                 }
729         }
730         /* should not be reached */
731         cpumask_setall(tconn->cpu_mask);
732 }
733
734 /**
735  * drbd_thread_current_set_cpu() - modifies the cpu mask of the _current_ thread
736  * @mdev:       DRBD device.
737  * @thi:        drbd_thread object
738  *
739  * call in the "main loop" of _all_ threads, no need for any mutex, current won't die
740  * prematurely.
741  */
742 void drbd_thread_current_set_cpu(struct drbd_thread *thi)
743 {
744         struct task_struct *p = current;
745
746         if (!thi->reset_cpu_mask)
747                 return;
748         thi->reset_cpu_mask = 0;
749         set_cpus_allowed_ptr(p, thi->tconn->cpu_mask);
750 }
751 #endif
752
753 /**
754  * drbd_header_size  -  size of a packet header
755  *
756  * The header size is a multiple of 8, so any payload following the header is
757  * word aligned on 64-bit architectures.  (The bitmap send and receive code
758  * relies on this.)
759  */
760 unsigned int drbd_header_size(struct drbd_tconn *tconn)
761 {
762         if (tconn->agreed_pro_version >= 100) {
763                 BUILD_BUG_ON(!IS_ALIGNED(sizeof(struct p_header100), 8));
764                 return sizeof(struct p_header100);
765         } else {
766                 BUILD_BUG_ON(sizeof(struct p_header80) !=
767                              sizeof(struct p_header95));
768                 BUILD_BUG_ON(!IS_ALIGNED(sizeof(struct p_header80), 8));
769                 return sizeof(struct p_header80);
770         }
771 }
772
773 static unsigned int prepare_header80(struct p_header80 *h, enum drbd_packet cmd, int size)
774 {
775         h->magic   = cpu_to_be32(DRBD_MAGIC);
776         h->command = cpu_to_be16(cmd);
777         h->length  = cpu_to_be16(size);
778         return sizeof(struct p_header80);
779 }
780
781 static unsigned int prepare_header95(struct p_header95 *h, enum drbd_packet cmd, int size)
782 {
783         h->magic   = cpu_to_be16(DRBD_MAGIC_BIG);
784         h->command = cpu_to_be16(cmd);
785         h->length = cpu_to_be32(size);
786         return sizeof(struct p_header95);
787 }
788
789 static unsigned int prepare_header100(struct p_header100 *h, enum drbd_packet cmd,
790                                       int size, int vnr)
791 {
792         h->magic = cpu_to_be32(DRBD_MAGIC_100);
793         h->volume = cpu_to_be16(vnr);
794         h->command = cpu_to_be16(cmd);
795         h->length = cpu_to_be32(size);
796         h->pad = 0;
797         return sizeof(struct p_header100);
798 }
799
800 static unsigned int prepare_header(struct drbd_tconn *tconn, int vnr,
801                                    void *buffer, enum drbd_packet cmd, int size)
802 {
803         if (tconn->agreed_pro_version >= 100)
804                 return prepare_header100(buffer, cmd, size, vnr);
805         else if (tconn->agreed_pro_version >= 95 &&
806                  size > DRBD_MAX_SIZE_H80_PACKET)
807                 return prepare_header95(buffer, cmd, size);
808         else
809                 return prepare_header80(buffer, cmd, size);
810 }
811
812 static void *__conn_prepare_command(struct drbd_tconn *tconn,
813                                     struct drbd_socket *sock)
814 {
815         if (!sock->socket)
816                 return NULL;
817         return sock->sbuf + drbd_header_size(tconn);
818 }
819
820 void *conn_prepare_command(struct drbd_tconn *tconn, struct drbd_socket *sock)
821 {
822         void *p;
823
824         mutex_lock(&sock->mutex);
825         p = __conn_prepare_command(tconn, sock);
826         if (!p)
827                 mutex_unlock(&sock->mutex);
828
829         return p;
830 }
831
832 void *drbd_prepare_command(struct drbd_conf *mdev, struct drbd_socket *sock)
833 {
834         return conn_prepare_command(mdev->tconn, sock);
835 }
836
837 static int __send_command(struct drbd_tconn *tconn, int vnr,
838                           struct drbd_socket *sock, enum drbd_packet cmd,
839                           unsigned int header_size, void *data,
840                           unsigned int size)
841 {
842         int msg_flags;
843         int err;
844
845         /*
846          * Called with @data == NULL and the size of the data blocks in @size
847          * for commands that send data blocks.  For those commands, omit the
848          * MSG_MORE flag: this will increase the likelihood that data blocks
849          * which are page aligned on the sender will end up page aligned on the
850          * receiver.
851          */
852         msg_flags = data ? MSG_MORE : 0;
853
854         header_size += prepare_header(tconn, vnr, sock->sbuf, cmd,
855                                       header_size + size);
856         err = drbd_send_all(tconn, sock->socket, sock->sbuf, header_size,
857                             msg_flags);
858         if (data && !err)
859                 err = drbd_send_all(tconn, sock->socket, data, size, 0);
860         return err;
861 }
862
863 static int __conn_send_command(struct drbd_tconn *tconn, struct drbd_socket *sock,
864                                enum drbd_packet cmd, unsigned int header_size,
865                                void *data, unsigned int size)
866 {
867         return __send_command(tconn, 0, sock, cmd, header_size, data, size);
868 }
869
870 int conn_send_command(struct drbd_tconn *tconn, struct drbd_socket *sock,
871                       enum drbd_packet cmd, unsigned int header_size,
872                       void *data, unsigned int size)
873 {
874         int err;
875
876         err = __conn_send_command(tconn, sock, cmd, header_size, data, size);
877         mutex_unlock(&sock->mutex);
878         return err;
879 }
880
881 int drbd_send_command(struct drbd_conf *mdev, struct drbd_socket *sock,
882                       enum drbd_packet cmd, unsigned int header_size,
883                       void *data, unsigned int size)
884 {
885         int err;
886
887         err = __send_command(mdev->tconn, mdev->vnr, sock, cmd, header_size,
888                              data, size);
889         mutex_unlock(&sock->mutex);
890         return err;
891 }
892
893 int drbd_send_ping(struct drbd_tconn *tconn)
894 {
895         struct drbd_socket *sock;
896
897         sock = &tconn->meta;
898         if (!conn_prepare_command(tconn, sock))
899                 return -EIO;
900         return conn_send_command(tconn, sock, P_PING, 0, NULL, 0);
901 }
902
903 int drbd_send_ping_ack(struct drbd_tconn *tconn)
904 {
905         struct drbd_socket *sock;
906
907         sock = &tconn->meta;
908         if (!conn_prepare_command(tconn, sock))
909                 return -EIO;
910         return conn_send_command(tconn, sock, P_PING_ACK, 0, NULL, 0);
911 }
912
913 int drbd_send_sync_param(struct drbd_conf *mdev)
914 {
915         struct drbd_socket *sock;
916         struct p_rs_param_95 *p;
917         int size;
918         const int apv = mdev->tconn->agreed_pro_version;
919         enum drbd_packet cmd;
920         struct net_conf *nc;
921         struct disk_conf *dc;
922
923         sock = &mdev->tconn->data;
924         p = drbd_prepare_command(mdev, sock);
925         if (!p)
926                 return -EIO;
927
928         rcu_read_lock();
929         nc = rcu_dereference(mdev->tconn->net_conf);
930
931         size = apv <= 87 ? sizeof(struct p_rs_param)
932                 : apv == 88 ? sizeof(struct p_rs_param)
933                         + strlen(nc->verify_alg) + 1
934                 : apv <= 94 ? sizeof(struct p_rs_param_89)
935                 : /* apv >= 95 */ sizeof(struct p_rs_param_95);
936
937         cmd = apv >= 89 ? P_SYNC_PARAM89 : P_SYNC_PARAM;
938
939         /* initialize verify_alg and csums_alg */
940         memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
941
942         if (get_ldev(mdev)) {
943                 dc = rcu_dereference(mdev->ldev->disk_conf);
944                 p->resync_rate = cpu_to_be32(dc->resync_rate);
945                 p->c_plan_ahead = cpu_to_be32(dc->c_plan_ahead);
946                 p->c_delay_target = cpu_to_be32(dc->c_delay_target);
947                 p->c_fill_target = cpu_to_be32(dc->c_fill_target);
948                 p->c_max_rate = cpu_to_be32(dc->c_max_rate);
949                 put_ldev(mdev);
950         } else {
951                 p->resync_rate = cpu_to_be32(DRBD_RESYNC_RATE_DEF);
952                 p->c_plan_ahead = cpu_to_be32(DRBD_C_PLAN_AHEAD_DEF);
953                 p->c_delay_target = cpu_to_be32(DRBD_C_DELAY_TARGET_DEF);
954                 p->c_fill_target = cpu_to_be32(DRBD_C_FILL_TARGET_DEF);
955                 p->c_max_rate = cpu_to_be32(DRBD_C_MAX_RATE_DEF);
956         }
957
958         if (apv >= 88)
959                 strcpy(p->verify_alg, nc->verify_alg);
960         if (apv >= 89)
961                 strcpy(p->csums_alg, nc->csums_alg);
962         rcu_read_unlock();
963
964         return drbd_send_command(mdev, sock, cmd, size, NULL, 0);
965 }
966
967 int __drbd_send_protocol(struct drbd_tconn *tconn, enum drbd_packet cmd)
968 {
969         struct drbd_socket *sock;
970         struct p_protocol *p;
971         struct net_conf *nc;
972         int size, cf;
973
974         sock = &tconn->data;
975         p = __conn_prepare_command(tconn, sock);
976         if (!p)
977                 return -EIO;
978
979         rcu_read_lock();
980         nc = rcu_dereference(tconn->net_conf);
981
982         if (nc->tentative && tconn->agreed_pro_version < 92) {
983                 rcu_read_unlock();
984                 mutex_unlock(&sock->mutex);
985                 conn_err(tconn, "--dry-run is not supported by peer");
986                 return -EOPNOTSUPP;
987         }
988
989         size = sizeof(*p);
990         if (tconn->agreed_pro_version >= 87)
991                 size += strlen(nc->integrity_alg) + 1;
992
993         p->protocol      = cpu_to_be32(nc->wire_protocol);
994         p->after_sb_0p   = cpu_to_be32(nc->after_sb_0p);
995         p->after_sb_1p   = cpu_to_be32(nc->after_sb_1p);
996         p->after_sb_2p   = cpu_to_be32(nc->after_sb_2p);
997         p->two_primaries = cpu_to_be32(nc->two_primaries);
998         cf = 0;
999         if (nc->discard_my_data)
1000                 cf |= CF_DISCARD_MY_DATA;
1001         if (nc->tentative)
1002                 cf |= CF_DRY_RUN;
1003         p->conn_flags    = cpu_to_be32(cf);
1004
1005         if (tconn->agreed_pro_version >= 87)
1006                 strcpy(p->integrity_alg, nc->integrity_alg);
1007         rcu_read_unlock();
1008
1009         return __conn_send_command(tconn, sock, cmd, size, NULL, 0);
1010 }
1011
1012 int drbd_send_protocol(struct drbd_tconn *tconn)
1013 {
1014         int err;
1015
1016         mutex_lock(&tconn->data.mutex);
1017         err = __drbd_send_protocol(tconn, P_PROTOCOL);
1018         mutex_unlock(&tconn->data.mutex);
1019
1020         return err;
1021 }
1022
1023 int _drbd_send_uuids(struct drbd_conf *mdev, u64 uuid_flags)
1024 {
1025         struct drbd_socket *sock;
1026         struct p_uuids *p;
1027         int i;
1028
1029         if (!get_ldev_if_state(mdev, D_NEGOTIATING))
1030                 return 0;
1031
1032         sock = &mdev->tconn->data;
1033         p = drbd_prepare_command(mdev, sock);
1034         if (!p) {
1035                 put_ldev(mdev);
1036                 return -EIO;
1037         }
1038         for (i = UI_CURRENT; i < UI_SIZE; i++)
1039                 p->uuid[i] = mdev->ldev ? cpu_to_be64(mdev->ldev->md.uuid[i]) : 0;
1040
1041         mdev->comm_bm_set = drbd_bm_total_weight(mdev);
1042         p->uuid[UI_SIZE] = cpu_to_be64(mdev->comm_bm_set);
1043         rcu_read_lock();
1044         uuid_flags |= rcu_dereference(mdev->tconn->net_conf)->discard_my_data ? 1 : 0;
1045         rcu_read_unlock();
1046         uuid_flags |= test_bit(CRASHED_PRIMARY, &mdev->flags) ? 2 : 0;
1047         uuid_flags |= mdev->new_state_tmp.disk == D_INCONSISTENT ? 4 : 0;
1048         p->uuid[UI_FLAGS] = cpu_to_be64(uuid_flags);
1049
1050         put_ldev(mdev);
1051         return drbd_send_command(mdev, sock, P_UUIDS, sizeof(*p), NULL, 0);
1052 }
1053
1054 int drbd_send_uuids(struct drbd_conf *mdev)
1055 {
1056         return _drbd_send_uuids(mdev, 0);
1057 }
1058
1059 int drbd_send_uuids_skip_initial_sync(struct drbd_conf *mdev)
1060 {
1061         return _drbd_send_uuids(mdev, 8);
1062 }
1063
1064 void drbd_print_uuids(struct drbd_conf *mdev, const char *text)
1065 {
1066         if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
1067                 u64 *uuid = mdev->ldev->md.uuid;
1068                 dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX\n",
1069                      text,
1070                      (unsigned long long)uuid[UI_CURRENT],
1071                      (unsigned long long)uuid[UI_BITMAP],
1072                      (unsigned long long)uuid[UI_HISTORY_START],
1073                      (unsigned long long)uuid[UI_HISTORY_END]);
1074                 put_ldev(mdev);
1075         } else {
1076                 dev_info(DEV, "%s effective data uuid: %016llX\n",
1077                                 text,
1078                                 (unsigned long long)mdev->ed_uuid);
1079         }
1080 }
1081
1082 void drbd_gen_and_send_sync_uuid(struct drbd_conf *mdev)
1083 {
1084         struct drbd_socket *sock;
1085         struct p_rs_uuid *p;
1086         u64 uuid;
1087
1088         D_ASSERT(mdev->state.disk == D_UP_TO_DATE);
1089
1090         uuid = mdev->ldev->md.uuid[UI_BITMAP];
1091         if (uuid && uuid != UUID_JUST_CREATED)
1092                 uuid = uuid + UUID_NEW_BM_OFFSET;
1093         else
1094                 get_random_bytes(&uuid, sizeof(u64));
1095         drbd_uuid_set(mdev, UI_BITMAP, uuid);
1096         drbd_print_uuids(mdev, "updated sync UUID");
1097         drbd_md_sync(mdev);
1098
1099         sock = &mdev->tconn->data;
1100         p = drbd_prepare_command(mdev, sock);
1101         if (p) {
1102                 p->uuid = cpu_to_be64(uuid);
1103                 drbd_send_command(mdev, sock, P_SYNC_UUID, sizeof(*p), NULL, 0);
1104         }
1105 }
1106
1107 int drbd_send_sizes(struct drbd_conf *mdev, int trigger_reply, enum dds_flags flags)
1108 {
1109         struct drbd_socket *sock;
1110         struct p_sizes *p;
1111         sector_t d_size, u_size;
1112         int q_order_type, max_bio_size;
1113
1114         if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
1115                 D_ASSERT(mdev->ldev->backing_bdev);
1116                 d_size = drbd_get_max_capacity(mdev->ldev);
1117                 rcu_read_lock();
1118                 u_size = rcu_dereference(mdev->ldev->disk_conf)->disk_size;
1119                 rcu_read_unlock();
1120                 q_order_type = drbd_queue_order_type(mdev);
1121                 max_bio_size = queue_max_hw_sectors(mdev->ldev->backing_bdev->bd_disk->queue) << 9;
1122                 max_bio_size = min_t(int, max_bio_size, DRBD_MAX_BIO_SIZE);
1123                 put_ldev(mdev);
1124         } else {
1125                 d_size = 0;
1126                 u_size = 0;
1127                 q_order_type = QUEUE_ORDERED_NONE;
1128                 max_bio_size = DRBD_MAX_BIO_SIZE; /* ... multiple BIOs per peer_request */
1129         }
1130
1131         sock = &mdev->tconn->data;
1132         p = drbd_prepare_command(mdev, sock);
1133         if (!p)
1134                 return -EIO;
1135
1136         if (mdev->tconn->agreed_pro_version <= 94)
1137                 max_bio_size = min_t(int, max_bio_size, DRBD_MAX_SIZE_H80_PACKET);
1138         else if (mdev->tconn->agreed_pro_version < 100)
1139                 max_bio_size = min_t(int, max_bio_size, DRBD_MAX_BIO_SIZE_P95);
1140
1141         p->d_size = cpu_to_be64(d_size);
1142         p->u_size = cpu_to_be64(u_size);
1143         p->c_size = cpu_to_be64(trigger_reply ? 0 : drbd_get_capacity(mdev->this_bdev));
1144         p->max_bio_size = cpu_to_be32(max_bio_size);
1145         p->queue_order_type = cpu_to_be16(q_order_type);
1146         p->dds_flags = cpu_to_be16(flags);
1147         return drbd_send_command(mdev, sock, P_SIZES, sizeof(*p), NULL, 0);
1148 }
1149
1150 /**
1151  * drbd_send_current_state() - Sends the drbd state to the peer
1152  * @mdev:       DRBD device.
1153  */
1154 int drbd_send_current_state(struct drbd_conf *mdev)
1155 {
1156         struct drbd_socket *sock;
1157         struct p_state *p;
1158
1159         sock = &mdev->tconn->data;
1160         p = drbd_prepare_command(mdev, sock);
1161         if (!p)
1162                 return -EIO;
1163         p->state = cpu_to_be32(mdev->state.i); /* Within the send mutex */
1164         return drbd_send_command(mdev, sock, P_STATE, sizeof(*p), NULL, 0);
1165 }
1166
1167 /**
1168  * drbd_send_state() - After a state change, sends the new state to the peer
1169  * @mdev:      DRBD device.
1170  * @state:     the state to send, not necessarily the current state.
1171  *
1172  * Each state change queues an "after_state_ch" work, which will eventually
1173  * send the resulting new state to the peer. If more state changes happen
1174  * between queuing and processing of the after_state_ch work, we still
1175  * want to send each intermediary state in the order it occurred.
1176  */
1177 int drbd_send_state(struct drbd_conf *mdev, union drbd_state state)
1178 {
1179         struct drbd_socket *sock;
1180         struct p_state *p;
1181
1182         sock = &mdev->tconn->data;
1183         p = drbd_prepare_command(mdev, sock);
1184         if (!p)
1185                 return -EIO;
1186         p->state = cpu_to_be32(state.i); /* Within the send mutex */
1187         return drbd_send_command(mdev, sock, P_STATE, sizeof(*p), NULL, 0);
1188 }
1189
1190 int drbd_send_state_req(struct drbd_conf *mdev, union drbd_state mask, union drbd_state val)
1191 {
1192         struct drbd_socket *sock;
1193         struct p_req_state *p;
1194
1195         sock = &mdev->tconn->data;
1196         p = drbd_prepare_command(mdev, sock);
1197         if (!p)
1198                 return -EIO;
1199         p->mask = cpu_to_be32(mask.i);
1200         p->val = cpu_to_be32(val.i);
1201         return drbd_send_command(mdev, sock, P_STATE_CHG_REQ, sizeof(*p), NULL, 0);
1202 }
1203
1204 int conn_send_state_req(struct drbd_tconn *tconn, union drbd_state mask, union drbd_state val)
1205 {
1206         enum drbd_packet cmd;
1207         struct drbd_socket *sock;
1208         struct p_req_state *p;
1209
1210         cmd = tconn->agreed_pro_version < 100 ? P_STATE_CHG_REQ : P_CONN_ST_CHG_REQ;
1211         sock = &tconn->data;
1212         p = conn_prepare_command(tconn, sock);
1213         if (!p)
1214                 return -EIO;
1215         p->mask = cpu_to_be32(mask.i);
1216         p->val = cpu_to_be32(val.i);
1217         return conn_send_command(tconn, sock, cmd, sizeof(*p), NULL, 0);
1218 }
1219
1220 void drbd_send_sr_reply(struct drbd_conf *mdev, enum drbd_state_rv retcode)
1221 {
1222         struct drbd_socket *sock;
1223         struct p_req_state_reply *p;
1224
1225         sock = &mdev->tconn->meta;
1226         p = drbd_prepare_command(mdev, sock);
1227         if (p) {
1228                 p->retcode = cpu_to_be32(retcode);
1229                 drbd_send_command(mdev, sock, P_STATE_CHG_REPLY, sizeof(*p), NULL, 0);
1230         }
1231 }
1232
1233 void conn_send_sr_reply(struct drbd_tconn *tconn, enum drbd_state_rv retcode)
1234 {
1235         struct drbd_socket *sock;
1236         struct p_req_state_reply *p;
1237         enum drbd_packet cmd = tconn->agreed_pro_version < 100 ? P_STATE_CHG_REPLY : P_CONN_ST_CHG_REPLY;
1238
1239         sock = &tconn->meta;
1240         p = conn_prepare_command(tconn, sock);
1241         if (p) {
1242                 p->retcode = cpu_to_be32(retcode);
1243                 conn_send_command(tconn, sock, cmd, sizeof(*p), NULL, 0);
1244         }
1245 }
1246
1247 static void dcbp_set_code(struct p_compressed_bm *p, enum drbd_bitmap_code code)
1248 {
1249         BUG_ON(code & ~0xf);
1250         p->encoding = (p->encoding & ~0xf) | code;
1251 }
1252
1253 static void dcbp_set_start(struct p_compressed_bm *p, int set)
1254 {
1255         p->encoding = (p->encoding & ~0x80) | (set ? 0x80 : 0);
1256 }
1257
1258 static void dcbp_set_pad_bits(struct p_compressed_bm *p, int n)
1259 {
1260         BUG_ON(n & ~0x7);
1261         p->encoding = (p->encoding & (~0x7 << 4)) | (n << 4);
1262 }
1263
1264 int fill_bitmap_rle_bits(struct drbd_conf *mdev,
1265                          struct p_compressed_bm *p,
1266                          unsigned int size,
1267                          struct bm_xfer_ctx *c)
1268 {
1269         struct bitstream bs;
1270         unsigned long plain_bits;
1271         unsigned long tmp;
1272         unsigned long rl;
1273         unsigned len;
1274         unsigned toggle;
1275         int bits, use_rle;
1276
1277         /* may we use this feature? */
1278         rcu_read_lock();
1279         use_rle = rcu_dereference(mdev->tconn->net_conf)->use_rle;
1280         rcu_read_unlock();
1281         if (!use_rle || mdev->tconn->agreed_pro_version < 90)
1282                 return 0;
1283
1284         if (c->bit_offset >= c->bm_bits)
1285                 return 0; /* nothing to do. */
1286
1287         /* use at most thus many bytes */
1288         bitstream_init(&bs, p->code, size, 0);
1289         memset(p->code, 0, size);
1290         /* plain bits covered in this code string */
1291         plain_bits = 0;
1292
1293         /* p->encoding & 0x80 stores whether the first run length is set.
1294          * bit offset is implicit.
1295          * start with toggle == 2 to be able to tell the first iteration */
1296         toggle = 2;
1297
1298         /* see how much plain bits we can stuff into one packet
1299          * using RLE and VLI. */
1300         do {
1301                 tmp = (toggle == 0) ? _drbd_bm_find_next_zero(mdev, c->bit_offset)
1302                                     : _drbd_bm_find_next(mdev, c->bit_offset);
1303                 if (tmp == -1UL)
1304                         tmp = c->bm_bits;
1305                 rl = tmp - c->bit_offset;
1306
1307                 if (toggle == 2) { /* first iteration */
1308                         if (rl == 0) {
1309                                 /* the first checked bit was set,
1310                                  * store start value, */
1311                                 dcbp_set_start(p, 1);
1312                                 /* but skip encoding of zero run length */
1313                                 toggle = !toggle;
1314                                 continue;
1315                         }
1316                         dcbp_set_start(p, 0);
1317                 }
1318
1319                 /* paranoia: catch zero runlength.
1320                  * can only happen if bitmap is modified while we scan it. */
1321                 if (rl == 0) {
1322                         dev_err(DEV, "unexpected zero runlength while encoding bitmap "
1323                             "t:%u bo:%lu\n", toggle, c->bit_offset);
1324                         return -1;
1325                 }
1326
1327                 bits = vli_encode_bits(&bs, rl);
1328                 if (bits == -ENOBUFS) /* buffer full */
1329                         break;
1330                 if (bits <= 0) {
1331                         dev_err(DEV, "error while encoding bitmap: %d\n", bits);
1332                         return 0;
1333                 }
1334
1335                 toggle = !toggle;
1336                 plain_bits += rl;
1337                 c->bit_offset = tmp;
1338         } while (c->bit_offset < c->bm_bits);
1339
1340         len = bs.cur.b - p->code + !!bs.cur.bit;
1341
1342         if (plain_bits < (len << 3)) {
1343                 /* incompressible with this method.
1344                  * we need to rewind both word and bit position. */
1345                 c->bit_offset -= plain_bits;
1346                 bm_xfer_ctx_bit_to_word_offset(c);
1347                 c->bit_offset = c->word_offset * BITS_PER_LONG;
1348                 return 0;
1349         }
1350
1351         /* RLE + VLI was able to compress it just fine.
1352          * update c->word_offset. */
1353         bm_xfer_ctx_bit_to_word_offset(c);
1354
1355         /* store pad_bits */
1356         dcbp_set_pad_bits(p, (8 - bs.cur.bit) & 0x7);
1357
1358         return len;
1359 }
1360
1361 /**
1362  * send_bitmap_rle_or_plain
1363  *
1364  * Return 0 when done, 1 when another iteration is needed, and a negative error
1365  * code upon failure.
1366  */
1367 static int
1368 send_bitmap_rle_or_plain(struct drbd_conf *mdev, struct bm_xfer_ctx *c)
1369 {
1370         struct drbd_socket *sock = &mdev->tconn->data;
1371         unsigned int header_size = drbd_header_size(mdev->tconn);
1372         struct p_compressed_bm *p = sock->sbuf + header_size;
1373         int len, err;
1374
1375         len = fill_bitmap_rle_bits(mdev, p,
1376                         DRBD_SOCKET_BUFFER_SIZE - header_size - sizeof(*p), c);
1377         if (len < 0)
1378                 return -EIO;
1379
1380         if (len) {
1381                 dcbp_set_code(p, RLE_VLI_Bits);
1382                 err = __send_command(mdev->tconn, mdev->vnr, sock,
1383                                      P_COMPRESSED_BITMAP, sizeof(*p) + len,
1384                                      NULL, 0);
1385                 c->packets[0]++;
1386                 c->bytes[0] += header_size + sizeof(*p) + len;
1387
1388                 if (c->bit_offset >= c->bm_bits)
1389                         len = 0; /* DONE */
1390         } else {
1391                 /* was not compressible.
1392                  * send a buffer full of plain text bits instead. */
1393                 unsigned int data_size;
1394                 unsigned long num_words;
1395                 unsigned long *p = sock->sbuf + header_size;
1396
1397                 data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
1398                 num_words = min_t(size_t, data_size / sizeof(*p),
1399                                   c->bm_words - c->word_offset);
1400                 len = num_words * sizeof(*p);
1401                 if (len)
1402                         drbd_bm_get_lel(mdev, c->word_offset, num_words, p);
1403                 err = __send_command(mdev->tconn, mdev->vnr, sock, P_BITMAP, len, NULL, 0);
1404                 c->word_offset += num_words;
1405                 c->bit_offset = c->word_offset * BITS_PER_LONG;
1406
1407                 c->packets[1]++;
1408                 c->bytes[1] += header_size + len;
1409
1410                 if (c->bit_offset > c->bm_bits)
1411                         c->bit_offset = c->bm_bits;
1412         }
1413         if (!err) {
1414                 if (len == 0) {
1415                         INFO_bm_xfer_stats(mdev, "send", c);
1416                         return 0;
1417                 } else
1418                         return 1;
1419         }
1420         return -EIO;
1421 }
1422
1423 /* See the comment at receive_bitmap() */
1424 static int _drbd_send_bitmap(struct drbd_conf *mdev)
1425 {
1426         struct bm_xfer_ctx c;
1427         int err;
1428
1429         if (!expect(mdev->bitmap))
1430                 return false;
1431
1432         if (get_ldev(mdev)) {
1433                 if (drbd_md_test_flag(mdev->ldev, MDF_FULL_SYNC)) {
1434                         dev_info(DEV, "Writing the whole bitmap, MDF_FullSync was set.\n");
1435                         drbd_bm_set_all(mdev);
1436                         if (drbd_bm_write(mdev)) {
1437                                 /* write_bm did fail! Leave full sync flag set in Meta P_DATA
1438                                  * but otherwise process as per normal - need to tell other
1439                                  * side that a full resync is required! */
1440                                 dev_err(DEV, "Failed to write bitmap to disk!\n");
1441                         } else {
1442                                 drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
1443                                 drbd_md_sync(mdev);
1444                         }
1445                 }
1446                 put_ldev(mdev);
1447         }
1448
1449         c = (struct bm_xfer_ctx) {
1450                 .bm_bits = drbd_bm_bits(mdev),
1451                 .bm_words = drbd_bm_words(mdev),
1452         };
1453
1454         do {
1455                 err = send_bitmap_rle_or_plain(mdev, &c);
1456         } while (err > 0);
1457
1458         return err == 0;
1459 }
1460
1461 int drbd_send_bitmap(struct drbd_conf *mdev)
1462 {
1463         struct drbd_socket *sock = &mdev->tconn->data;
1464         int err = -1;
1465
1466         mutex_lock(&sock->mutex);
1467         if (sock->socket)
1468                 err = !_drbd_send_bitmap(mdev);
1469         mutex_unlock(&sock->mutex);
1470         return err;
1471 }
1472
1473 void drbd_send_b_ack(struct drbd_conf *mdev, u32 barrier_nr, u32 set_size)
1474 {
1475         struct drbd_socket *sock;
1476         struct p_barrier_ack *p;
1477
1478         if (mdev->state.conn < C_CONNECTED)
1479                 return;
1480
1481         sock = &mdev->tconn->meta;
1482         p = drbd_prepare_command(mdev, sock);
1483         if (!p)
1484                 return;
1485         p->barrier = barrier_nr;
1486         p->set_size = cpu_to_be32(set_size);
1487         drbd_send_command(mdev, sock, P_BARRIER_ACK, sizeof(*p), NULL, 0);
1488 }
1489
1490 /**
1491  * _drbd_send_ack() - Sends an ack packet
1492  * @mdev:       DRBD device.
1493  * @cmd:        Packet command code.
1494  * @sector:     sector, needs to be in big endian byte order
1495  * @blksize:    size in byte, needs to be in big endian byte order
1496  * @block_id:   Id, big endian byte order
1497  */
1498 static int _drbd_send_ack(struct drbd_conf *mdev, enum drbd_packet cmd,
1499                           u64 sector, u32 blksize, u64 block_id)
1500 {
1501         struct drbd_socket *sock;
1502         struct p_block_ack *p;
1503
1504         if (mdev->state.conn < C_CONNECTED)
1505                 return -EIO;
1506
1507         sock = &mdev->tconn->meta;
1508         p = drbd_prepare_command(mdev, sock);
1509         if (!p)
1510                 return -EIO;
1511         p->sector = sector;
1512         p->block_id = block_id;
1513         p->blksize = blksize;
1514         p->seq_num = cpu_to_be32(atomic_inc_return(&mdev->packet_seq));
1515         return drbd_send_command(mdev, sock, cmd, sizeof(*p), NULL, 0);
1516 }
1517
1518 /* dp->sector and dp->block_id already/still in network byte order,
1519  * data_size is payload size according to dp->head,
1520  * and may need to be corrected for digest size. */
1521 void drbd_send_ack_dp(struct drbd_conf *mdev, enum drbd_packet cmd,
1522                       struct p_data *dp, int data_size)
1523 {
1524         if (mdev->tconn->peer_integrity_tfm)
1525                 data_size -= crypto_hash_digestsize(mdev->tconn->peer_integrity_tfm);
1526         _drbd_send_ack(mdev, cmd, dp->sector, cpu_to_be32(data_size),
1527                        dp->block_id);
1528 }
1529
1530 void drbd_send_ack_rp(struct drbd_conf *mdev, enum drbd_packet cmd,
1531                       struct p_block_req *rp)
1532 {
1533         _drbd_send_ack(mdev, cmd, rp->sector, rp->blksize, rp->block_id);
1534 }
1535
1536 /**
1537  * drbd_send_ack() - Sends an ack packet
1538  * @mdev:       DRBD device
1539  * @cmd:        packet command code
1540  * @peer_req:   peer request
1541  */
1542 int drbd_send_ack(struct drbd_conf *mdev, enum drbd_packet cmd,
1543                   struct drbd_peer_request *peer_req)
1544 {
1545         return _drbd_send_ack(mdev, cmd,
1546                               cpu_to_be64(peer_req->i.sector),
1547                               cpu_to_be32(peer_req->i.size),
1548                               peer_req->block_id);
1549 }
1550
1551 /* This function misuses the block_id field to signal if the blocks
1552  * are is sync or not. */
1553 int drbd_send_ack_ex(struct drbd_conf *mdev, enum drbd_packet cmd,
1554                      sector_t sector, int blksize, u64 block_id)
1555 {
1556         return _drbd_send_ack(mdev, cmd,
1557                               cpu_to_be64(sector),
1558                               cpu_to_be32(blksize),
1559                               cpu_to_be64(block_id));
1560 }
1561
1562 int drbd_send_drequest(struct drbd_conf *mdev, int cmd,
1563                        sector_t sector, int size, u64 block_id)
1564 {
1565         struct drbd_socket *sock;
1566         struct p_block_req *p;
1567
1568         sock = &mdev->tconn->data;
1569         p = drbd_prepare_command(mdev, sock);
1570         if (!p)
1571                 return -EIO;
1572         p->sector = cpu_to_be64(sector);
1573         p->block_id = block_id;
1574         p->blksize = cpu_to_be32(size);
1575         return drbd_send_command(mdev, sock, cmd, sizeof(*p), NULL, 0);
1576 }
1577
1578 int drbd_send_drequest_csum(struct drbd_conf *mdev, sector_t sector, int size,
1579                             void *digest, int digest_size, enum drbd_packet cmd)
1580 {
1581         struct drbd_socket *sock;
1582         struct p_block_req *p;
1583
1584         /* FIXME: Put the digest into the preallocated socket buffer.  */
1585
1586         sock = &mdev->tconn->data;
1587         p = drbd_prepare_command(mdev, sock);
1588         if (!p)
1589                 return -EIO;
1590         p->sector = cpu_to_be64(sector);
1591         p->block_id = ID_SYNCER /* unused */;
1592         p->blksize = cpu_to_be32(size);
1593         return drbd_send_command(mdev, sock, cmd, sizeof(*p),
1594                                  digest, digest_size);
1595 }
1596
1597 int drbd_send_ov_request(struct drbd_conf *mdev, sector_t sector, int size)
1598 {
1599         struct drbd_socket *sock;
1600         struct p_block_req *p;
1601
1602         sock = &mdev->tconn->data;
1603         p = drbd_prepare_command(mdev, sock);
1604         if (!p)
1605                 return -EIO;
1606         p->sector = cpu_to_be64(sector);
1607         p->block_id = ID_SYNCER /* unused */;
1608         p->blksize = cpu_to_be32(size);
1609         return drbd_send_command(mdev, sock, P_OV_REQUEST, sizeof(*p), NULL, 0);
1610 }
1611
1612 /* called on sndtimeo
1613  * returns false if we should retry,
1614  * true if we think connection is dead
1615  */
1616 static int we_should_drop_the_connection(struct drbd_tconn *tconn, struct socket *sock)
1617 {
1618         int drop_it;
1619         /* long elapsed = (long)(jiffies - mdev->last_received); */
1620
1621         drop_it =   tconn->meta.socket == sock
1622                 || !tconn->asender.task
1623                 || get_t_state(&tconn->asender) != RUNNING
1624                 || tconn->cstate < C_WF_REPORT_PARAMS;
1625
1626         if (drop_it)
1627                 return true;
1628
1629         drop_it = !--tconn->ko_count;
1630         if (!drop_it) {
1631                 conn_err(tconn, "[%s/%d] sock_sendmsg time expired, ko = %u\n",
1632                          current->comm, current->pid, tconn->ko_count);
1633                 request_ping(tconn);
1634         }
1635
1636         return drop_it; /* && (mdev->state == R_PRIMARY) */;
1637 }
1638
1639 static void drbd_update_congested(struct drbd_tconn *tconn)
1640 {
1641         struct sock *sk = tconn->data.socket->sk;
1642         if (sk->sk_wmem_queued > sk->sk_sndbuf * 4 / 5)
1643                 set_bit(NET_CONGESTED, &tconn->flags);
1644 }
1645
1646 /* The idea of sendpage seems to be to put some kind of reference
1647  * to the page into the skb, and to hand it over to the NIC. In
1648  * this process get_page() gets called.
1649  *
1650  * As soon as the page was really sent over the network put_page()
1651  * gets called by some part of the network layer. [ NIC driver? ]
1652  *
1653  * [ get_page() / put_page() increment/decrement the count. If count
1654  *   reaches 0 the page will be freed. ]
1655  *
1656  * This works nicely with pages from FSs.
1657  * But this means that in protocol A we might signal IO completion too early!
1658  *
1659  * In order not to corrupt data during a resync we must make sure
1660  * that we do not reuse our own buffer pages (EEs) to early, therefore
1661  * we have the net_ee list.
1662  *
1663  * XFS seems to have problems, still, it submits pages with page_count == 0!
1664  * As a workaround, we disable sendpage on pages
1665  * with page_count == 0 or PageSlab.
1666  */
1667 static int _drbd_no_send_page(struct drbd_conf *mdev, struct page *page,
1668                               int offset, size_t size, unsigned msg_flags)
1669 {
1670         struct socket *socket;
1671         void *addr;
1672         int err;
1673
1674         socket = mdev->tconn->data.socket;
1675         addr = kmap(page) + offset;
1676         err = drbd_send_all(mdev->tconn, socket, addr, size, msg_flags);
1677         kunmap(page);
1678         if (!err)
1679                 mdev->send_cnt += size >> 9;
1680         return err;
1681 }
1682
1683 static int _drbd_send_page(struct drbd_conf *mdev, struct page *page,
1684                     int offset, size_t size, unsigned msg_flags)
1685 {
1686         struct socket *socket = mdev->tconn->data.socket;
1687         mm_segment_t oldfs = get_fs();
1688         int len = size;
1689         int err = -EIO;
1690
1691         /* e.g. XFS meta- & log-data is in slab pages, which have a
1692          * page_count of 0 and/or have PageSlab() set.
1693          * we cannot use send_page for those, as that does get_page();
1694          * put_page(); and would cause either a VM_BUG directly, or
1695          * __page_cache_release a page that would actually still be referenced
1696          * by someone, leading to some obscure delayed Oops somewhere else. */
1697         if (disable_sendpage || (page_count(page) < 1) || PageSlab(page))
1698                 return _drbd_no_send_page(mdev, page, offset, size, msg_flags);
1699
1700         msg_flags |= MSG_NOSIGNAL;
1701         drbd_update_congested(mdev->tconn);
1702         set_fs(KERNEL_DS);
1703         do {
1704                 int sent;
1705
1706                 sent = socket->ops->sendpage(socket, page, offset, len, msg_flags);
1707                 if (sent <= 0) {
1708                         if (sent == -EAGAIN) {
1709                                 if (we_should_drop_the_connection(mdev->tconn, socket))
1710                                         break;
1711                                 continue;
1712                         }
1713                         dev_warn(DEV, "%s: size=%d len=%d sent=%d\n",
1714                              __func__, (int)size, len, sent);
1715                         if (sent < 0)
1716                                 err = sent;
1717                         break;
1718                 }
1719                 len    -= sent;
1720                 offset += sent;
1721         } while (len > 0 /* THINK && mdev->cstate >= C_CONNECTED*/);
1722         set_fs(oldfs);
1723         clear_bit(NET_CONGESTED, &mdev->tconn->flags);
1724
1725         if (len == 0) {
1726                 err = 0;
1727                 mdev->send_cnt += size >> 9;
1728         }
1729         return err;
1730 }
1731
1732 static int _drbd_send_bio(struct drbd_conf *mdev, struct bio *bio)
1733 {
1734         struct bio_vec *bvec;
1735         int i;
1736         /* hint all but last page with MSG_MORE */
1737         __bio_for_each_segment(bvec, bio, i, 0) {
1738                 int err;
1739
1740                 err = _drbd_no_send_page(mdev, bvec->bv_page,
1741                                          bvec->bv_offset, bvec->bv_len,
1742                                          i == bio->bi_vcnt - 1 ? 0 : MSG_MORE);
1743                 if (err)
1744                         return err;
1745         }
1746         return 0;
1747 }
1748
1749 static int _drbd_send_zc_bio(struct drbd_conf *mdev, struct bio *bio)
1750 {
1751         struct bio_vec *bvec;
1752         int i;
1753         /* hint all but last page with MSG_MORE */
1754         __bio_for_each_segment(bvec, bio, i, 0) {
1755                 int err;
1756
1757                 err = _drbd_send_page(mdev, bvec->bv_page,
1758                                       bvec->bv_offset, bvec->bv_len,
1759                                       i == bio->bi_vcnt - 1 ? 0 : MSG_MORE);
1760                 if (err)
1761                         return err;
1762         }
1763         return 0;
1764 }
1765
1766 static int _drbd_send_zc_ee(struct drbd_conf *mdev,
1767                             struct drbd_peer_request *peer_req)
1768 {
1769         struct page *page = peer_req->pages;
1770         unsigned len = peer_req->i.size;
1771         int err;
1772
1773         /* hint all but last page with MSG_MORE */
1774         page_chain_for_each(page) {
1775                 unsigned l = min_t(unsigned, len, PAGE_SIZE);
1776
1777                 err = _drbd_send_page(mdev, page, 0, l,
1778                                       page_chain_next(page) ? MSG_MORE : 0);
1779                 if (err)
1780                         return err;
1781                 len -= l;
1782         }
1783         return 0;
1784 }
1785
1786 static u32 bio_flags_to_wire(struct drbd_conf *mdev, unsigned long bi_rw)
1787 {
1788         if (mdev->tconn->agreed_pro_version >= 95)
1789                 return  (bi_rw & REQ_SYNC ? DP_RW_SYNC : 0) |
1790                         (bi_rw & REQ_FUA ? DP_FUA : 0) |
1791                         (bi_rw & REQ_FLUSH ? DP_FLUSH : 0) |
1792                         (bi_rw & REQ_DISCARD ? DP_DISCARD : 0);
1793         else
1794                 return bi_rw & REQ_SYNC ? DP_RW_SYNC : 0;
1795 }
1796
1797 /* Used to send write requests
1798  * R_PRIMARY -> Peer    (P_DATA)
1799  */
1800 int drbd_send_dblock(struct drbd_conf *mdev, struct drbd_request *req)
1801 {
1802         struct drbd_socket *sock;
1803         struct p_data *p;
1804         unsigned int dp_flags = 0;
1805         int dgs;
1806         int err;
1807
1808         sock = &mdev->tconn->data;
1809         p = drbd_prepare_command(mdev, sock);
1810         dgs = mdev->tconn->integrity_tfm ? crypto_hash_digestsize(mdev->tconn->integrity_tfm) : 0;
1811
1812         if (!p)
1813                 return -EIO;
1814         p->sector = cpu_to_be64(req->i.sector);
1815         p->block_id = (unsigned long)req;
1816         p->seq_num = cpu_to_be32(req->seq_num = atomic_inc_return(&mdev->packet_seq));
1817         dp_flags = bio_flags_to_wire(mdev, req->master_bio->bi_rw);
1818         if (mdev->state.conn >= C_SYNC_SOURCE &&
1819             mdev->state.conn <= C_PAUSED_SYNC_T)
1820                 dp_flags |= DP_MAY_SET_IN_SYNC;
1821         if (mdev->tconn->agreed_pro_version >= 100) {
1822                 if (req->rq_state & RQ_EXP_RECEIVE_ACK)
1823                         dp_flags |= DP_SEND_RECEIVE_ACK;
1824                 if (req->rq_state & RQ_EXP_WRITE_ACK)
1825                         dp_flags |= DP_SEND_WRITE_ACK;
1826         }
1827         p->dp_flags = cpu_to_be32(dp_flags);
1828         if (dgs)
1829                 drbd_csum_bio(mdev, mdev->tconn->integrity_tfm, req->master_bio, p + 1);
1830         err = __send_command(mdev->tconn, mdev->vnr, sock, P_DATA, sizeof(*p) + dgs, NULL, req->i.size);
1831         if (!err) {
1832                 /* For protocol A, we have to memcpy the payload into
1833                  * socket buffers, as we may complete right away
1834                  * as soon as we handed it over to tcp, at which point the data
1835                  * pages may become invalid.
1836                  *
1837                  * For data-integrity enabled, we copy it as well, so we can be
1838                  * sure that even if the bio pages may still be modified, it
1839                  * won't change the data on the wire, thus if the digest checks
1840                  * out ok after sending on this side, but does not fit on the
1841                  * receiving side, we sure have detected corruption elsewhere.
1842                  */
1843                 if (!(req->rq_state & (RQ_EXP_RECEIVE_ACK | RQ_EXP_WRITE_ACK)) || dgs)
1844                         err = _drbd_send_bio(mdev, req->master_bio);
1845                 else
1846                         err = _drbd_send_zc_bio(mdev, req->master_bio);
1847
1848                 /* double check digest, sometimes buffers have been modified in flight. */
1849                 if (dgs > 0 && dgs <= 64) {
1850                         /* 64 byte, 512 bit, is the largest digest size
1851                          * currently supported in kernel crypto. */
1852                         unsigned char digest[64];
1853                         drbd_csum_bio(mdev, mdev->tconn->integrity_tfm, req->master_bio, digest);
1854                         if (memcmp(p + 1, digest, dgs)) {
1855                                 dev_warn(DEV,
1856                                         "Digest mismatch, buffer modified by upper layers during write: %llus +%u\n",
1857                                         (unsigned long long)req->i.sector, req->i.size);
1858                         }
1859                 } /* else if (dgs > 64) {
1860                      ... Be noisy about digest too large ...
1861                 } */
1862         }
1863         mutex_unlock(&sock->mutex);  /* locked by drbd_prepare_command() */
1864
1865         return err;
1866 }
1867
1868 /* answer packet, used to send data back for read requests:
1869  *  Peer       -> (diskless) R_PRIMARY   (P_DATA_REPLY)
1870  *  C_SYNC_SOURCE -> C_SYNC_TARGET         (P_RS_DATA_REPLY)
1871  */
1872 int drbd_send_block(struct drbd_conf *mdev, enum drbd_packet cmd,
1873                     struct drbd_peer_request *peer_req)
1874 {
1875         struct drbd_socket *sock;
1876         struct p_data *p;
1877         int err;
1878         int dgs;
1879
1880         sock = &mdev->tconn->data;
1881         p = drbd_prepare_command(mdev, sock);
1882
1883         dgs = mdev->tconn->integrity_tfm ? crypto_hash_digestsize(mdev->tconn->integrity_tfm) : 0;
1884
1885         if (!p)
1886                 return -EIO;
1887         p->sector = cpu_to_be64(peer_req->i.sector);
1888         p->block_id = peer_req->block_id;
1889         p->seq_num = 0;  /* unused */
1890         if (dgs)
1891                 drbd_csum_ee(mdev, mdev->tconn->integrity_tfm, peer_req, p + 1);
1892         err = __send_command(mdev->tconn, mdev->vnr, sock, cmd, sizeof(*p) + dgs, NULL, peer_req->i.size);
1893         if (!err)
1894                 err = _drbd_send_zc_ee(mdev, peer_req);
1895         mutex_unlock(&sock->mutex);  /* locked by drbd_prepare_command() */
1896
1897         return err;
1898 }
1899
1900 int drbd_send_out_of_sync(struct drbd_conf *mdev, struct drbd_request *req)
1901 {
1902         struct drbd_socket *sock;
1903         struct p_block_desc *p;
1904
1905         sock = &mdev->tconn->data;
1906         p = drbd_prepare_command(mdev, sock);
1907         if (!p)
1908                 return -EIO;
1909         p->sector = cpu_to_be64(req->i.sector);
1910         p->blksize = cpu_to_be32(req->i.size);
1911         return drbd_send_command(mdev, sock, P_OUT_OF_SYNC, sizeof(*p), NULL, 0);
1912 }
1913
1914 /*
1915   drbd_send distinguishes two cases:
1916
1917   Packets sent via the data socket "sock"
1918   and packets sent via the meta data socket "msock"
1919
1920                     sock                      msock
1921   -----------------+-------------------------+------------------------------
1922   timeout           conf.timeout / 2          conf.timeout / 2
1923   timeout action    send a ping via msock     Abort communication
1924                                               and close all sockets
1925 */
1926
1927 /*
1928  * you must have down()ed the appropriate [m]sock_mutex elsewhere!
1929  */
1930 int drbd_send(struct drbd_tconn *tconn, struct socket *sock,
1931               void *buf, size_t size, unsigned msg_flags)
1932 {
1933         struct kvec iov;
1934         struct msghdr msg;
1935         int rv, sent = 0;
1936
1937         if (!sock)
1938                 return -EBADR;
1939
1940         /* THINK  if (signal_pending) return ... ? */
1941
1942         iov.iov_base = buf;
1943         iov.iov_len  = size;
1944
1945         msg.msg_name       = NULL;
1946         msg.msg_namelen    = 0;
1947         msg.msg_control    = NULL;
1948         msg.msg_controllen = 0;
1949         msg.msg_flags      = msg_flags | MSG_NOSIGNAL;
1950
1951         if (sock == tconn->data.socket) {
1952                 rcu_read_lock();
1953                 tconn->ko_count = rcu_dereference(tconn->net_conf)->ko_count;
1954                 rcu_read_unlock();
1955                 drbd_update_congested(tconn);
1956         }
1957         do {
1958                 /* STRANGE
1959                  * tcp_sendmsg does _not_ use its size parameter at all ?
1960                  *
1961                  * -EAGAIN on timeout, -EINTR on signal.
1962                  */
1963 /* THINK
1964  * do we need to block DRBD_SIG if sock == &meta.socket ??
1965  * otherwise wake_asender() might interrupt some send_*Ack !
1966  */
1967                 rv = kernel_sendmsg(sock, &msg, &iov, 1, size);
1968                 if (rv == -EAGAIN) {
1969                         if (we_should_drop_the_connection(tconn, sock))
1970                                 break;
1971                         else
1972                                 continue;
1973                 }
1974                 if (rv == -EINTR) {
1975                         flush_signals(current);
1976                         rv = 0;
1977                 }
1978                 if (rv < 0)
1979                         break;
1980                 sent += rv;
1981                 iov.iov_base += rv;
1982                 iov.iov_len  -= rv;
1983         } while (sent < size);
1984
1985         if (sock == tconn->data.socket)
1986                 clear_bit(NET_CONGESTED, &tconn->flags);
1987
1988         if (rv <= 0) {
1989                 if (rv != -EAGAIN) {
1990                         conn_err(tconn, "%s_sendmsg returned %d\n",
1991                                  sock == tconn->meta.socket ? "msock" : "sock",
1992                                  rv);
1993                         conn_request_state(tconn, NS(conn, C_BROKEN_PIPE), CS_HARD);
1994                 } else
1995                         conn_request_state(tconn, NS(conn, C_TIMEOUT), CS_HARD);
1996         }
1997
1998         return sent;
1999 }
2000
2001 /**
2002  * drbd_send_all  -  Send an entire buffer
2003  *
2004  * Returns 0 upon success and a negative error value otherwise.
2005  */
2006 int drbd_send_all(struct drbd_tconn *tconn, struct socket *sock, void *buffer,
2007                   size_t size, unsigned msg_flags)
2008 {
2009         int err;
2010
2011         err = drbd_send(tconn, sock, buffer, size, msg_flags);
2012         if (err < 0)
2013                 return err;
2014         if (err != size)
2015                 return -EIO;
2016         return 0;
2017 }
2018
2019 static int drbd_open(struct block_device *bdev, fmode_t mode)
2020 {
2021         struct drbd_conf *mdev = bdev->bd_disk->private_data;
2022         unsigned long flags;
2023         int rv = 0;
2024
2025         mutex_lock(&drbd_main_mutex);
2026         spin_lock_irqsave(&mdev->tconn->req_lock, flags);
2027         /* to have a stable mdev->state.role
2028          * and no race with updating open_cnt */
2029
2030         if (mdev->state.role != R_PRIMARY) {
2031                 if (mode & FMODE_WRITE)
2032                         rv = -EROFS;
2033                 else if (!allow_oos)
2034                         rv = -EMEDIUMTYPE;
2035         }
2036
2037         if (!rv)
2038                 mdev->open_cnt++;
2039         spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
2040         mutex_unlock(&drbd_main_mutex);
2041
2042         return rv;
2043 }
2044
2045 static int drbd_release(struct gendisk *gd, fmode_t mode)
2046 {
2047         struct drbd_conf *mdev = gd->private_data;
2048         mutex_lock(&drbd_main_mutex);
2049         mdev->open_cnt--;
2050         mutex_unlock(&drbd_main_mutex);
2051         return 0;
2052 }
2053
2054 static void drbd_set_defaults(struct drbd_conf *mdev)
2055 {
2056         /* Beware! The actual layout differs
2057          * between big endian and little endian */
2058         mdev->state = (union drbd_dev_state) {
2059                 { .role = R_SECONDARY,
2060                   .peer = R_UNKNOWN,
2061                   .conn = C_STANDALONE,
2062                   .disk = D_DISKLESS,
2063                   .pdsk = D_UNKNOWN,
2064                 } };
2065 }
2066
2067 void drbd_init_set_defaults(struct drbd_conf *mdev)
2068 {
2069         /* the memset(,0,) did most of this.
2070          * note: only assignments, no allocation in here */
2071
2072         drbd_set_defaults(mdev);
2073
2074         atomic_set(&mdev->ap_bio_cnt, 0);
2075         atomic_set(&mdev->ap_pending_cnt, 0);
2076         atomic_set(&mdev->rs_pending_cnt, 0);
2077         atomic_set(&mdev->unacked_cnt, 0);
2078         atomic_set(&mdev->local_cnt, 0);
2079         atomic_set(&mdev->pp_in_use_by_net, 0);
2080         atomic_set(&mdev->rs_sect_in, 0);
2081         atomic_set(&mdev->rs_sect_ev, 0);
2082         atomic_set(&mdev->ap_in_flight, 0);
2083         atomic_set(&mdev->md_io_in_use, 0);
2084
2085         mutex_init(&mdev->own_state_mutex);
2086         mdev->state_mutex = &mdev->own_state_mutex;
2087
2088         spin_lock_init(&mdev->al_lock);
2089         spin_lock_init(&mdev->peer_seq_lock);
2090         spin_lock_init(&mdev->epoch_lock);
2091
2092         INIT_LIST_HEAD(&mdev->active_ee);
2093         INIT_LIST_HEAD(&mdev->sync_ee);
2094         INIT_LIST_HEAD(&mdev->done_ee);
2095         INIT_LIST_HEAD(&mdev->read_ee);
2096         INIT_LIST_HEAD(&mdev->net_ee);
2097         INIT_LIST_HEAD(&mdev->resync_reads);
2098         INIT_LIST_HEAD(&mdev->resync_work.list);
2099         INIT_LIST_HEAD(&mdev->unplug_work.list);
2100         INIT_LIST_HEAD(&mdev->go_diskless.list);
2101         INIT_LIST_HEAD(&mdev->md_sync_work.list);
2102         INIT_LIST_HEAD(&mdev->start_resync_work.list);
2103         INIT_LIST_HEAD(&mdev->bm_io_work.w.list);
2104
2105         mdev->resync_work.cb  = w_resync_timer;
2106         mdev->unplug_work.cb  = w_send_write_hint;
2107         mdev->go_diskless.cb  = w_go_diskless;
2108         mdev->md_sync_work.cb = w_md_sync;
2109         mdev->bm_io_work.w.cb = w_bitmap_io;
2110         mdev->start_resync_work.cb = w_start_resync;
2111
2112         mdev->resync_work.mdev  = mdev;
2113         mdev->unplug_work.mdev  = mdev;
2114         mdev->go_diskless.mdev  = mdev;
2115         mdev->md_sync_work.mdev = mdev;
2116         mdev->bm_io_work.w.mdev = mdev;
2117         mdev->start_resync_work.mdev = mdev;
2118
2119         init_timer(&mdev->resync_timer);
2120         init_timer(&mdev->md_sync_timer);
2121         init_timer(&mdev->start_resync_timer);
2122         init_timer(&mdev->request_timer);
2123         mdev->resync_timer.function = resync_timer_fn;
2124         mdev->resync_timer.data = (unsigned long) mdev;
2125         mdev->md_sync_timer.function = md_sync_timer_fn;
2126         mdev->md_sync_timer.data = (unsigned long) mdev;
2127         mdev->start_resync_timer.function = start_resync_timer_fn;
2128         mdev->start_resync_timer.data = (unsigned long) mdev;
2129         mdev->request_timer.function = request_timer_fn;
2130         mdev->request_timer.data = (unsigned long) mdev;
2131
2132         init_waitqueue_head(&mdev->misc_wait);
2133         init_waitqueue_head(&mdev->state_wait);
2134         init_waitqueue_head(&mdev->ee_wait);
2135         init_waitqueue_head(&mdev->al_wait);
2136         init_waitqueue_head(&mdev->seq_wait);
2137
2138         mdev->write_ordering = WO_bdev_flush;
2139         mdev->resync_wenr = LC_FREE;
2140         mdev->peer_max_bio_size = DRBD_MAX_BIO_SIZE_SAFE;
2141         mdev->local_max_bio_size = DRBD_MAX_BIO_SIZE_SAFE;
2142 }
2143
2144 void drbd_mdev_cleanup(struct drbd_conf *mdev)
2145 {
2146         int i;
2147         if (mdev->tconn->receiver.t_state != NONE)
2148                 dev_err(DEV, "ASSERT FAILED: receiver t_state == %d expected 0.\n",
2149                                 mdev->tconn->receiver.t_state);
2150
2151         /* no need to lock it, I'm the only thread alive */
2152         if (atomic_read(&mdev->current_epoch->epoch_size) !=  0)
2153                 dev_err(DEV, "epoch_size:%d\n", atomic_read(&mdev->current_epoch->epoch_size));
2154         mdev->al_writ_cnt  =
2155         mdev->bm_writ_cnt  =
2156         mdev->read_cnt     =
2157         mdev->recv_cnt     =
2158         mdev->send_cnt     =
2159         mdev->writ_cnt     =
2160         mdev->p_size       =
2161         mdev->rs_start     =
2162         mdev->rs_total     =
2163         mdev->rs_failed    = 0;
2164         mdev->rs_last_events = 0;
2165         mdev->rs_last_sect_ev = 0;
2166         for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2167                 mdev->rs_mark_left[i] = 0;
2168                 mdev->rs_mark_time[i] = 0;
2169         }
2170         D_ASSERT(mdev->tconn->net_conf == NULL);
2171
2172         drbd_set_my_capacity(mdev, 0);
2173         if (mdev->bitmap) {
2174                 /* maybe never allocated. */
2175                 drbd_bm_resize(mdev, 0, 1);
2176                 drbd_bm_cleanup(mdev);
2177         }
2178
2179         drbd_free_bc(mdev->ldev);
2180         mdev->ldev = NULL;
2181
2182         clear_bit(AL_SUSPENDED, &mdev->flags);
2183
2184         D_ASSERT(list_empty(&mdev->active_ee));
2185         D_ASSERT(list_empty(&mdev->sync_ee));
2186         D_ASSERT(list_empty(&mdev->done_ee));
2187         D_ASSERT(list_empty(&mdev->read_ee));
2188         D_ASSERT(list_empty(&mdev->net_ee));
2189         D_ASSERT(list_empty(&mdev->resync_reads));
2190         D_ASSERT(list_empty(&mdev->tconn->data.work.q));
2191         D_ASSERT(list_empty(&mdev->tconn->meta.work.q));
2192         D_ASSERT(list_empty(&mdev->resync_work.list));
2193         D_ASSERT(list_empty(&mdev->unplug_work.list));
2194         D_ASSERT(list_empty(&mdev->go_diskless.list));
2195
2196         drbd_set_defaults(mdev);
2197 }
2198
2199
2200 static void drbd_destroy_mempools(void)
2201 {
2202         struct page *page;
2203
2204         while (drbd_pp_pool) {
2205                 page = drbd_pp_pool;
2206                 drbd_pp_pool = (struct page *)page_private(page);
2207                 __free_page(page);
2208                 drbd_pp_vacant--;
2209         }
2210
2211         /* D_ASSERT(atomic_read(&drbd_pp_vacant)==0); */
2212
2213         if (drbd_md_io_bio_set)
2214                 bioset_free(drbd_md_io_bio_set);
2215         if (drbd_md_io_page_pool)
2216                 mempool_destroy(drbd_md_io_page_pool);
2217         if (drbd_ee_mempool)
2218                 mempool_destroy(drbd_ee_mempool);
2219         if (drbd_request_mempool)
2220                 mempool_destroy(drbd_request_mempool);
2221         if (drbd_ee_cache)
2222                 kmem_cache_destroy(drbd_ee_cache);
2223         if (drbd_request_cache)
2224                 kmem_cache_destroy(drbd_request_cache);
2225         if (drbd_bm_ext_cache)
2226                 kmem_cache_destroy(drbd_bm_ext_cache);
2227         if (drbd_al_ext_cache)
2228                 kmem_cache_destroy(drbd_al_ext_cache);
2229
2230         drbd_md_io_bio_set   = NULL;
2231         drbd_md_io_page_pool = NULL;
2232         drbd_ee_mempool      = NULL;
2233         drbd_request_mempool = NULL;
2234         drbd_ee_cache        = NULL;
2235         drbd_request_cache   = NULL;
2236         drbd_bm_ext_cache    = NULL;
2237         drbd_al_ext_cache    = NULL;
2238
2239         return;
2240 }
2241
2242 static int drbd_create_mempools(void)
2243 {
2244         struct page *page;
2245         const int number = (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count;
2246         int i;
2247
2248         /* prepare our caches and mempools */
2249         drbd_request_mempool = NULL;
2250         drbd_ee_cache        = NULL;
2251         drbd_request_cache   = NULL;
2252         drbd_bm_ext_cache    = NULL;
2253         drbd_al_ext_cache    = NULL;
2254         drbd_pp_pool         = NULL;
2255         drbd_md_io_page_pool = NULL;
2256         drbd_md_io_bio_set   = NULL;
2257
2258         /* caches */
2259         drbd_request_cache = kmem_cache_create(
2260                 "drbd_req", sizeof(struct drbd_request), 0, 0, NULL);
2261         if (drbd_request_cache == NULL)
2262                 goto Enomem;
2263
2264         drbd_ee_cache = kmem_cache_create(
2265                 "drbd_ee", sizeof(struct drbd_peer_request), 0, 0, NULL);
2266         if (drbd_ee_cache == NULL)
2267                 goto Enomem;
2268
2269         drbd_bm_ext_cache = kmem_cache_create(
2270                 "drbd_bm", sizeof(struct bm_extent), 0, 0, NULL);
2271         if (drbd_bm_ext_cache == NULL)
2272                 goto Enomem;
2273
2274         drbd_al_ext_cache = kmem_cache_create(
2275                 "drbd_al", sizeof(struct lc_element), 0, 0, NULL);
2276         if (drbd_al_ext_cache == NULL)
2277                 goto Enomem;
2278
2279         /* mempools */
2280         drbd_md_io_bio_set = bioset_create(DRBD_MIN_POOL_PAGES, 0);
2281         if (drbd_md_io_bio_set == NULL)
2282                 goto Enomem;
2283
2284         drbd_md_io_page_pool = mempool_create_page_pool(DRBD_MIN_POOL_PAGES, 0);
2285         if (drbd_md_io_page_pool == NULL)
2286                 goto Enomem;
2287
2288         drbd_request_mempool = mempool_create(number,
2289                 mempool_alloc_slab, mempool_free_slab, drbd_request_cache);
2290         if (drbd_request_mempool == NULL)
2291                 goto Enomem;
2292
2293         drbd_ee_mempool = mempool_create(number,
2294                 mempool_alloc_slab, mempool_free_slab, drbd_ee_cache);
2295         if (drbd_ee_mempool == NULL)
2296                 goto Enomem;
2297
2298         /* drbd's page pool */
2299         spin_lock_init(&drbd_pp_lock);
2300
2301         for (i = 0; i < number; i++) {
2302                 page = alloc_page(GFP_HIGHUSER);
2303                 if (!page)
2304                         goto Enomem;
2305                 set_page_private(page, (unsigned long)drbd_pp_pool);
2306                 drbd_pp_pool = page;
2307         }
2308         drbd_pp_vacant = number;
2309
2310         return 0;
2311
2312 Enomem:
2313         drbd_destroy_mempools(); /* in case we allocated some */
2314         return -ENOMEM;
2315 }
2316
2317 static int drbd_notify_sys(struct notifier_block *this, unsigned long code,
2318         void *unused)
2319 {
2320         /* just so we have it.  you never know what interesting things we
2321          * might want to do here some day...
2322          */
2323
2324         return NOTIFY_DONE;
2325 }
2326
2327 static struct notifier_block drbd_notifier = {
2328         .notifier_call = drbd_notify_sys,
2329 };
2330
2331 static void drbd_release_all_peer_reqs(struct drbd_conf *mdev)
2332 {
2333         int rr;
2334
2335         rr = drbd_free_peer_reqs(mdev, &mdev->active_ee);
2336         if (rr)
2337                 dev_err(DEV, "%d EEs in active list found!\n", rr);
2338
2339         rr = drbd_free_peer_reqs(mdev, &mdev->sync_ee);
2340         if (rr)
2341                 dev_err(DEV, "%d EEs in sync list found!\n", rr);
2342
2343         rr = drbd_free_peer_reqs(mdev, &mdev->read_ee);
2344         if (rr)
2345                 dev_err(DEV, "%d EEs in read list found!\n", rr);
2346
2347         rr = drbd_free_peer_reqs(mdev, &mdev->done_ee);
2348         if (rr)
2349                 dev_err(DEV, "%d EEs in done list found!\n", rr);
2350
2351         rr = drbd_free_peer_reqs(mdev, &mdev->net_ee);
2352         if (rr)
2353                 dev_err(DEV, "%d EEs in net list found!\n", rr);
2354 }
2355
2356 /* caution. no locking. */
2357 void drbd_minor_destroy(struct kref *kref)
2358 {
2359         struct drbd_conf *mdev = container_of(kref, struct drbd_conf, kref);
2360         struct drbd_tconn *tconn = mdev->tconn;
2361
2362         del_timer_sync(&mdev->request_timer);
2363
2364         /* paranoia asserts */
2365         D_ASSERT(mdev->open_cnt == 0);
2366         D_ASSERT(list_empty(&mdev->tconn->data.work.q));
2367         /* end paranoia asserts */
2368
2369         /* cleanup stuff that may have been allocated during
2370          * device (re-)configuration or state changes */
2371
2372         if (mdev->this_bdev)
2373                 bdput(mdev->this_bdev);
2374
2375         drbd_free_bc(mdev->ldev);
2376         mdev->ldev = NULL;
2377
2378         drbd_release_all_peer_reqs(mdev);
2379
2380         lc_destroy(mdev->act_log);
2381         lc_destroy(mdev->resync);
2382
2383         kfree(mdev->p_uuid);
2384         /* mdev->p_uuid = NULL; */
2385
2386         kfree(mdev->current_epoch);
2387         if (mdev->bitmap) /* should no longer be there. */
2388                 drbd_bm_cleanup(mdev);
2389         __free_page(mdev->md_io_page);
2390         put_disk(mdev->vdisk);
2391         blk_cleanup_queue(mdev->rq_queue);
2392         kfree(mdev->rs_plan_s);
2393         kfree(mdev);
2394
2395         kref_put(&tconn->kref, &conn_destroy);
2396 }
2397
2398 static void drbd_cleanup(void)
2399 {
2400         unsigned int i;
2401         struct drbd_conf *mdev;
2402         struct drbd_tconn *tconn, *tmp;
2403
2404         unregister_reboot_notifier(&drbd_notifier);
2405
2406         /* first remove proc,
2407          * drbdsetup uses it's presence to detect
2408          * whether DRBD is loaded.
2409          * If we would get stuck in proc removal,
2410          * but have netlink already deregistered,
2411          * some drbdsetup commands may wait forever
2412          * for an answer.
2413          */
2414         if (drbd_proc)
2415                 remove_proc_entry("drbd", NULL);
2416
2417         drbd_genl_unregister();
2418
2419         idr_for_each_entry(&minors, mdev, i) {
2420                 idr_remove(&minors, mdev_to_minor(mdev));
2421                 idr_remove(&mdev->tconn->volumes, mdev->vnr);
2422                 del_gendisk(mdev->vdisk);
2423                 /* synchronize_rcu(); No other threads running at this point */
2424                 kref_put(&mdev->kref, &drbd_minor_destroy);
2425         }
2426
2427         /* not _rcu since, no other updater anymore. Genl already unregistered */
2428         list_for_each_entry_safe(tconn, tmp, &drbd_tconns, all_tconn) {
2429                 list_del(&tconn->all_tconn); /* not _rcu no proc, not other threads */
2430                 /* synchronize_rcu(); */
2431                 kref_put(&tconn->kref, &conn_destroy);
2432         }
2433
2434         drbd_destroy_mempools();
2435         unregister_blkdev(DRBD_MAJOR, "drbd");
2436
2437         idr_destroy(&minors);
2438
2439         printk(KERN_INFO "drbd: module cleanup done.\n");
2440 }
2441
2442 /**
2443  * drbd_congested() - Callback for pdflush
2444  * @congested_data:     User data
2445  * @bdi_bits:           Bits pdflush is currently interested in
2446  *
2447  * Returns 1<<BDI_async_congested and/or 1<<BDI_sync_congested if we are congested.
2448  */
2449 static int drbd_congested(void *congested_data, int bdi_bits)
2450 {
2451         struct drbd_conf *mdev = congested_data;
2452         struct request_queue *q;
2453         char reason = '-';
2454         int r = 0;
2455
2456         if (!may_inc_ap_bio(mdev)) {
2457                 /* DRBD has frozen IO */
2458                 r = bdi_bits;
2459                 reason = 'd';
2460                 goto out;
2461         }
2462
2463         if (get_ldev(mdev)) {
2464                 q = bdev_get_queue(mdev->ldev->backing_bdev);
2465                 r = bdi_congested(&q->backing_dev_info, bdi_bits);
2466                 put_ldev(mdev);
2467                 if (r)
2468                         reason = 'b';
2469         }
2470
2471         if (bdi_bits & (1 << BDI_async_congested) && test_bit(NET_CONGESTED, &mdev->tconn->flags)) {
2472                 r |= (1 << BDI_async_congested);
2473                 reason = reason == 'b' ? 'a' : 'n';
2474         }
2475
2476 out:
2477         mdev->congestion_reason = reason;
2478         return r;
2479 }
2480
2481 static void drbd_init_workqueue(struct drbd_work_queue* wq)
2482 {
2483         sema_init(&wq->s, 0);
2484         spin_lock_init(&wq->q_lock);
2485         INIT_LIST_HEAD(&wq->q);
2486 }
2487
2488 struct drbd_tconn *conn_get_by_name(const char *name)
2489 {
2490         struct drbd_tconn *tconn;
2491
2492         if (!name || !name[0])
2493                 return NULL;
2494
2495         rcu_read_lock();
2496         list_for_each_entry_rcu(tconn, &drbd_tconns, all_tconn) {
2497                 if (!strcmp(tconn->name, name)) {
2498                         kref_get(&tconn->kref);
2499                         goto found;
2500                 }
2501         }
2502         tconn = NULL;
2503 found:
2504         rcu_read_unlock();
2505         return tconn;
2506 }
2507
2508 struct drbd_tconn *conn_get_by_addrs(void *my_addr, int my_addr_len,
2509                                      void *peer_addr, int peer_addr_len)
2510 {
2511         struct drbd_tconn *tconn;
2512
2513         rcu_read_lock();
2514         list_for_each_entry_rcu(tconn, &drbd_tconns, all_tconn) {
2515                 if (tconn->my_addr_len == my_addr_len &&
2516                     tconn->peer_addr_len == peer_addr_len &&
2517                     !memcmp(&tconn->my_addr, my_addr, my_addr_len) &&
2518                     !memcmp(&tconn->peer_addr, peer_addr, peer_addr_len)) {
2519                         kref_get(&tconn->kref);
2520                         goto found;
2521                 }
2522         }
2523         tconn = NULL;
2524 found:
2525         rcu_read_unlock();
2526         return tconn;
2527 }
2528
2529 static int drbd_alloc_socket(struct drbd_socket *socket)
2530 {
2531         socket->rbuf = (void *) __get_free_page(GFP_KERNEL);
2532         if (!socket->rbuf)
2533                 return -ENOMEM;
2534         socket->sbuf = (void *) __get_free_page(GFP_KERNEL);
2535         if (!socket->sbuf)
2536                 return -ENOMEM;
2537         return 0;
2538 }
2539
2540 static void drbd_free_socket(struct drbd_socket *socket)
2541 {
2542         free_page((unsigned long) socket->sbuf);
2543         free_page((unsigned long) socket->rbuf);
2544 }
2545
2546 void conn_free_crypto(struct drbd_tconn *tconn)
2547 {
2548         drbd_free_sock(tconn);
2549
2550         crypto_free_hash(tconn->csums_tfm);
2551         crypto_free_hash(tconn->verify_tfm);
2552         crypto_free_hash(tconn->cram_hmac_tfm);
2553         crypto_free_hash(tconn->integrity_tfm);
2554         crypto_free_hash(tconn->peer_integrity_tfm);
2555         kfree(tconn->int_dig_in);
2556         kfree(tconn->int_dig_vv);
2557
2558         tconn->csums_tfm = NULL;
2559         tconn->verify_tfm = NULL;
2560         tconn->cram_hmac_tfm = NULL;
2561         tconn->integrity_tfm = NULL;
2562         tconn->peer_integrity_tfm = NULL;
2563         tconn->int_dig_in = NULL;
2564         tconn->int_dig_vv = NULL;
2565 }
2566
2567 int set_resource_options(struct drbd_tconn *tconn, struct res_opts *res_opts)
2568 {
2569         cpumask_var_t new_cpu_mask;
2570         int err;
2571
2572         if (!zalloc_cpumask_var(&new_cpu_mask, GFP_KERNEL))
2573                 return -ENOMEM;
2574                 /*
2575                 retcode = ERR_NOMEM;
2576                 drbd_msg_put_info("unable to allocate cpumask");
2577                 */
2578
2579         /* silently ignore cpu mask on UP kernel */
2580         if (nr_cpu_ids > 1 && res_opts->cpu_mask[0] != 0) {
2581                 /* FIXME: Get rid of constant 32 here */
2582                 err = __bitmap_parse(res_opts->cpu_mask, 32, 0,
2583                                 cpumask_bits(new_cpu_mask), nr_cpu_ids);
2584                 if (err) {
2585                         conn_warn(tconn, "__bitmap_parse() failed with %d\n", err);
2586                         /* retcode = ERR_CPU_MASK_PARSE; */
2587                         goto fail;
2588                 }
2589         }
2590         tconn->res_opts = *res_opts;
2591         if (!cpumask_equal(tconn->cpu_mask, new_cpu_mask)) {
2592                 cpumask_copy(tconn->cpu_mask, new_cpu_mask);
2593                 drbd_calc_cpu_mask(tconn);
2594                 tconn->receiver.reset_cpu_mask = 1;
2595                 tconn->asender.reset_cpu_mask = 1;
2596                 tconn->worker.reset_cpu_mask = 1;
2597         }
2598         err = 0;
2599
2600 fail:
2601         free_cpumask_var(new_cpu_mask);
2602         return err;
2603
2604 }
2605
2606 /* caller must be under genl_lock() */
2607 struct drbd_tconn *conn_create(const char *name, struct res_opts *res_opts)
2608 {
2609         struct drbd_tconn *tconn;
2610
2611         tconn = kzalloc(sizeof(struct drbd_tconn), GFP_KERNEL);
2612         if (!tconn)
2613                 return NULL;
2614
2615         tconn->name = kstrdup(name, GFP_KERNEL);
2616         if (!tconn->name)
2617                 goto fail;
2618
2619         if (drbd_alloc_socket(&tconn->data))
2620                 goto fail;
2621         if (drbd_alloc_socket(&tconn->meta))
2622                 goto fail;
2623
2624         if (!zalloc_cpumask_var(&tconn->cpu_mask, GFP_KERNEL))
2625                 goto fail;
2626
2627         if (set_resource_options(tconn, res_opts))
2628                 goto fail;
2629
2630         if (!tl_init(tconn))
2631                 goto fail;
2632
2633         tconn->cstate = C_STANDALONE;
2634         mutex_init(&tconn->cstate_mutex);
2635         spin_lock_init(&tconn->req_lock);
2636         mutex_init(&tconn->conf_update);
2637         init_waitqueue_head(&tconn->ping_wait);
2638         idr_init(&tconn->volumes);
2639
2640         drbd_init_workqueue(&tconn->data.work);
2641         mutex_init(&tconn->data.mutex);
2642
2643         drbd_init_workqueue(&tconn->meta.work);
2644         mutex_init(&tconn->meta.mutex);
2645
2646         drbd_thread_init(tconn, &tconn->receiver, drbdd_init, "receiver");
2647         drbd_thread_init(tconn, &tconn->worker, drbd_worker, "worker");
2648         drbd_thread_init(tconn, &tconn->asender, drbd_asender, "asender");
2649
2650         kref_init(&tconn->kref);
2651         list_add_tail_rcu(&tconn->all_tconn, &drbd_tconns);
2652
2653         return tconn;
2654
2655 fail:
2656         tl_cleanup(tconn);
2657         free_cpumask_var(tconn->cpu_mask);
2658         drbd_free_socket(&tconn->meta);
2659         drbd_free_socket(&tconn->data);
2660         kfree(tconn->name);
2661         kfree(tconn);
2662
2663         return NULL;
2664 }
2665
2666 void conn_destroy(struct kref *kref)
2667 {
2668         struct drbd_tconn *tconn = container_of(kref, struct drbd_tconn, kref);
2669
2670         idr_destroy(&tconn->volumes);
2671
2672         free_cpumask_var(tconn->cpu_mask);
2673         drbd_free_socket(&tconn->meta);
2674         drbd_free_socket(&tconn->data);
2675         kfree(tconn->name);
2676         kfree(tconn->int_dig_in);
2677         kfree(tconn->int_dig_vv);
2678         kfree(tconn);
2679 }
2680
2681 enum drbd_ret_code conn_new_minor(struct drbd_tconn *tconn, unsigned int minor, int vnr)
2682 {
2683         struct drbd_conf *mdev;
2684         struct gendisk *disk;
2685         struct request_queue *q;
2686         int vnr_got = vnr;
2687         int minor_got = minor;
2688         enum drbd_ret_code err = ERR_NOMEM;
2689
2690         mdev = minor_to_mdev(minor);
2691         if (mdev)
2692                 return ERR_MINOR_EXISTS;
2693
2694         /* GFP_KERNEL, we are outside of all write-out paths */
2695         mdev = kzalloc(sizeof(struct drbd_conf), GFP_KERNEL);
2696         if (!mdev)
2697                 return ERR_NOMEM;
2698
2699         kref_get(&tconn->kref);
2700         mdev->tconn = tconn;
2701
2702         mdev->minor = minor;
2703         mdev->vnr = vnr;
2704
2705         drbd_init_set_defaults(mdev);
2706
2707         q = blk_alloc_queue(GFP_KERNEL);
2708         if (!q)
2709                 goto out_no_q;
2710         mdev->rq_queue = q;
2711         q->queuedata   = mdev;
2712
2713         disk = alloc_disk(1);
2714         if (!disk)
2715                 goto out_no_disk;
2716         mdev->vdisk = disk;
2717
2718         set_disk_ro(disk, true);
2719
2720         disk->queue = q;
2721         disk->major = DRBD_MAJOR;
2722         disk->first_minor = minor;
2723         disk->fops = &drbd_ops;
2724         sprintf(disk->disk_name, "drbd%d", minor);
2725         disk->private_data = mdev;
2726
2727         mdev->this_bdev = bdget(MKDEV(DRBD_MAJOR, minor));
2728         /* we have no partitions. we contain only ourselves. */
2729         mdev->this_bdev->bd_contains = mdev->this_bdev;
2730
2731         q->backing_dev_info.congested_fn = drbd_congested;
2732         q->backing_dev_info.congested_data = mdev;
2733
2734         blk_queue_make_request(q, drbd_make_request);
2735         /* Setting the max_hw_sectors to an odd value of 8kibyte here
2736            This triggers a max_bio_size message upon first attach or connect */
2737         blk_queue_max_hw_sectors(q, DRBD_MAX_BIO_SIZE_SAFE >> 8);
2738         blk_queue_bounce_limit(q, BLK_BOUNCE_ANY);
2739         blk_queue_merge_bvec(q, drbd_merge_bvec);
2740         q->queue_lock = &mdev->tconn->req_lock; /* needed since we use */
2741
2742         mdev->md_io_page = alloc_page(GFP_KERNEL);
2743         if (!mdev->md_io_page)
2744                 goto out_no_io_page;
2745
2746         if (drbd_bm_init(mdev))
2747                 goto out_no_bitmap;
2748         mdev->read_requests = RB_ROOT;
2749         mdev->write_requests = RB_ROOT;
2750
2751         mdev->current_epoch = kzalloc(sizeof(struct drbd_epoch), GFP_KERNEL);
2752         if (!mdev->current_epoch)
2753                 goto out_no_epoch;
2754
2755         INIT_LIST_HEAD(&mdev->current_epoch->list);
2756         mdev->epochs = 1;
2757
2758         if (!idr_pre_get(&minors, GFP_KERNEL))
2759                 goto out_no_minor_idr;
2760         if (idr_get_new_above(&minors, mdev, minor, &minor_got))
2761                 goto out_no_minor_idr;
2762         if (minor_got != minor) {
2763                 err = ERR_MINOR_EXISTS;
2764                 drbd_msg_put_info("requested minor exists already");
2765                 goto out_idr_remove_minor;
2766         }
2767
2768         if (!idr_pre_get(&tconn->volumes, GFP_KERNEL))
2769                 goto out_idr_remove_minor;
2770         if (idr_get_new_above(&tconn->volumes, mdev, vnr, &vnr_got))
2771                 goto out_idr_remove_minor;
2772         if (vnr_got != vnr) {
2773                 err = ERR_INVALID_REQUEST;
2774                 drbd_msg_put_info("requested volume exists already");
2775                 goto out_idr_remove_vol;
2776         }
2777         add_disk(disk);
2778         kref_init(&mdev->kref); /* one ref for both idrs and the the add_disk */
2779
2780         /* inherit the connection state */
2781         mdev->state.conn = tconn->cstate;
2782         if (mdev->state.conn == C_WF_REPORT_PARAMS)
2783                 drbd_connected(mdev);
2784
2785         return NO_ERROR;
2786
2787 out_idr_remove_vol:
2788         idr_remove(&tconn->volumes, vnr_got);
2789 out_idr_remove_minor:
2790         idr_remove(&minors, minor_got);
2791         synchronize_rcu();
2792 out_no_minor_idr:
2793         kfree(mdev->current_epoch);
2794 out_no_epoch:
2795         drbd_bm_cleanup(mdev);
2796 out_no_bitmap:
2797         __free_page(mdev->md_io_page);
2798 out_no_io_page:
2799         put_disk(disk);
2800 out_no_disk:
2801         blk_cleanup_queue(q);
2802 out_no_q:
2803         kfree(mdev);
2804         kref_put(&tconn->kref, &conn_destroy);
2805         return err;
2806 }
2807
2808 int __init drbd_init(void)
2809 {
2810         int err;
2811
2812         if (minor_count < DRBD_MINOR_COUNT_MIN || minor_count > DRBD_MINOR_COUNT_MAX) {
2813                 printk(KERN_ERR
2814                        "drbd: invalid minor_count (%d)\n", minor_count);
2815 #ifdef MODULE
2816                 return -EINVAL;
2817 #else
2818                 minor_count = DRBD_MINOR_COUNT_DEF;
2819 #endif
2820         }
2821
2822         err = register_blkdev(DRBD_MAJOR, "drbd");
2823         if (err) {
2824                 printk(KERN_ERR
2825                        "drbd: unable to register block device major %d\n",
2826                        DRBD_MAJOR);
2827                 return err;
2828         }
2829
2830         err = drbd_genl_register();
2831         if (err) {
2832                 printk(KERN_ERR "drbd: unable to register generic netlink family\n");
2833                 goto fail;
2834         }
2835
2836
2837         register_reboot_notifier(&drbd_notifier);
2838
2839         /*
2840          * allocate all necessary structs
2841          */
2842         err = -ENOMEM;
2843
2844         init_waitqueue_head(&drbd_pp_wait);
2845
2846         drbd_proc = NULL; /* play safe for drbd_cleanup */
2847         idr_init(&minors);
2848
2849         err = drbd_create_mempools();
2850         if (err)
2851                 goto fail;
2852
2853         drbd_proc = proc_create_data("drbd", S_IFREG | S_IRUGO , NULL, &drbd_proc_fops, NULL);
2854         if (!drbd_proc) {
2855                 printk(KERN_ERR "drbd: unable to register proc file\n");
2856                 goto fail;
2857         }
2858
2859         rwlock_init(&global_state_lock);
2860         INIT_LIST_HEAD(&drbd_tconns);
2861
2862         printk(KERN_INFO "drbd: initialized. "
2863                "Version: " REL_VERSION " (api:%d/proto:%d-%d)\n",
2864                API_VERSION, PRO_VERSION_MIN, PRO_VERSION_MAX);
2865         printk(KERN_INFO "drbd: %s\n", drbd_buildtag());
2866         printk(KERN_INFO "drbd: registered as block device major %d\n",
2867                 DRBD_MAJOR);
2868
2869         return 0; /* Success! */
2870
2871 fail:
2872         drbd_cleanup();
2873         if (err == -ENOMEM)
2874                 /* currently always the case */
2875                 printk(KERN_ERR "drbd: ran out of memory\n");
2876         else
2877                 printk(KERN_ERR "drbd: initialization failure\n");
2878         return err;
2879 }
2880
2881 void drbd_free_bc(struct drbd_backing_dev *ldev)
2882 {
2883         if (ldev == NULL)
2884                 return;
2885
2886         blkdev_put(ldev->backing_bdev, FMODE_READ | FMODE_WRITE | FMODE_EXCL);
2887         blkdev_put(ldev->md_bdev, FMODE_READ | FMODE_WRITE | FMODE_EXCL);
2888
2889         kfree(ldev);
2890 }
2891
2892 void drbd_free_sock(struct drbd_tconn *tconn)
2893 {
2894         if (tconn->data.socket) {
2895                 mutex_lock(&tconn->data.mutex);
2896                 kernel_sock_shutdown(tconn->data.socket, SHUT_RDWR);
2897                 sock_release(tconn->data.socket);
2898                 tconn->data.socket = NULL;
2899                 mutex_unlock(&tconn->data.mutex);
2900         }
2901         if (tconn->meta.socket) {
2902                 mutex_lock(&tconn->meta.mutex);
2903                 kernel_sock_shutdown(tconn->meta.socket, SHUT_RDWR);
2904                 sock_release(tconn->meta.socket);
2905                 tconn->meta.socket = NULL;
2906                 mutex_unlock(&tconn->meta.mutex);
2907         }
2908 }
2909
2910 /* meta data management */
2911
2912 struct meta_data_on_disk {
2913         u64 la_size;           /* last agreed size. */
2914         u64 uuid[UI_SIZE];   /* UUIDs. */
2915         u64 device_uuid;
2916         u64 reserved_u64_1;
2917         u32 flags;             /* MDF */
2918         u32 magic;
2919         u32 md_size_sect;
2920         u32 al_offset;         /* offset to this block */
2921         u32 al_nr_extents;     /* important for restoring the AL */
2922               /* `-- act_log->nr_elements <-- ldev->dc.al_extents */
2923         u32 bm_offset;         /* offset to the bitmap, from here */
2924         u32 bm_bytes_per_bit;  /* BM_BLOCK_SIZE */
2925         u32 la_peer_max_bio_size;   /* last peer max_bio_size */
2926         u32 reserved_u32[3];
2927
2928 } __packed;
2929
2930 /**
2931  * drbd_md_sync() - Writes the meta data super block if the MD_DIRTY flag bit is set
2932  * @mdev:       DRBD device.
2933  */
2934 void drbd_md_sync(struct drbd_conf *mdev)
2935 {
2936         struct meta_data_on_disk *buffer;
2937         sector_t sector;
2938         int i;
2939
2940         del_timer(&mdev->md_sync_timer);
2941         /* timer may be rearmed by drbd_md_mark_dirty() now. */
2942         if (!test_and_clear_bit(MD_DIRTY, &mdev->flags))
2943                 return;
2944
2945         /* We use here D_FAILED and not D_ATTACHING because we try to write
2946          * metadata even if we detach due to a disk failure! */
2947         if (!get_ldev_if_state(mdev, D_FAILED))
2948                 return;
2949
2950         buffer = drbd_md_get_buffer(mdev);
2951         if (!buffer)
2952                 goto out;
2953
2954         memset(buffer, 0, 512);
2955
2956         buffer->la_size = cpu_to_be64(drbd_get_capacity(mdev->this_bdev));
2957         for (i = UI_CURRENT; i < UI_SIZE; i++)
2958                 buffer->uuid[i] = cpu_to_be64(mdev->ldev->md.uuid[i]);
2959         buffer->flags = cpu_to_be32(mdev->ldev->md.flags);
2960         buffer->magic = cpu_to_be32(DRBD_MD_MAGIC_84_UNCLEAN);
2961
2962         buffer->md_size_sect  = cpu_to_be32(mdev->ldev->md.md_size_sect);
2963         buffer->al_offset     = cpu_to_be32(mdev->ldev->md.al_offset);
2964         buffer->al_nr_extents = cpu_to_be32(mdev->act_log->nr_elements);
2965         buffer->bm_bytes_per_bit = cpu_to_be32(BM_BLOCK_SIZE);
2966         buffer->device_uuid = cpu_to_be64(mdev->ldev->md.device_uuid);
2967
2968         buffer->bm_offset = cpu_to_be32(mdev->ldev->md.bm_offset);
2969         buffer->la_peer_max_bio_size = cpu_to_be32(mdev->peer_max_bio_size);
2970
2971         D_ASSERT(drbd_md_ss__(mdev, mdev->ldev) == mdev->ldev->md.md_offset);
2972         sector = mdev->ldev->md.md_offset;
2973
2974         if (drbd_md_sync_page_io(mdev, mdev->ldev, sector, WRITE)) {
2975                 /* this was a try anyways ... */
2976                 dev_err(DEV, "meta data update failed!\n");
2977                 drbd_chk_io_error(mdev, 1, true);
2978         }
2979
2980         /* Update mdev->ldev->md.la_size_sect,
2981          * since we updated it on metadata. */
2982         mdev->ldev->md.la_size_sect = drbd_get_capacity(mdev->this_bdev);
2983
2984         drbd_md_put_buffer(mdev);
2985 out:
2986         put_ldev(mdev);
2987 }
2988
2989 /**
2990  * drbd_md_read() - Reads in the meta data super block
2991  * @mdev:       DRBD device.
2992  * @bdev:       Device from which the meta data should be read in.
2993  *
2994  * Return 0 (NO_ERROR) on success, and an enum drbd_ret_code in case
2995  * something goes wrong.
2996  */
2997 int drbd_md_read(struct drbd_conf *mdev, struct drbd_backing_dev *bdev)
2998 {
2999         struct meta_data_on_disk *buffer;
3000         u32 magic, flags;
3001         int i, rv = NO_ERROR;
3002
3003         if (!get_ldev_if_state(mdev, D_ATTACHING))
3004                 return ERR_IO_MD_DISK;
3005
3006         buffer = drbd_md_get_buffer(mdev);
3007         if (!buffer)
3008                 goto out;
3009
3010         if (drbd_md_sync_page_io(mdev, bdev, bdev->md.md_offset, READ)) {
3011                 /* NOTE: can't do normal error processing here as this is
3012                    called BEFORE disk is attached */
3013                 dev_err(DEV, "Error while reading metadata.\n");
3014                 rv = ERR_IO_MD_DISK;
3015                 goto err;
3016         }
3017
3018         magic = be32_to_cpu(buffer->magic);
3019         flags = be32_to_cpu(buffer->flags);
3020         if (magic == DRBD_MD_MAGIC_84_UNCLEAN ||
3021             (magic == DRBD_MD_MAGIC_08 && !(flags & MDF_AL_CLEAN))) {
3022                         /* btw: that's Activity Log clean, not "all" clean. */
3023                 dev_err(DEV, "Found unclean meta data. Did you \"drbdadm apply-al\"?\n");
3024                 rv = ERR_MD_UNCLEAN;
3025                 goto err;
3026         }
3027         if (magic != DRBD_MD_MAGIC_08) {
3028                 if (magic == DRBD_MD_MAGIC_07)
3029                         dev_err(DEV, "Found old (0.7) meta data magic. Did you \"drbdadm create-md\"?\n");
3030                 else
3031                         dev_err(DEV, "Meta data magic not found. Did you \"drbdadm create-md\"?\n");
3032                 rv = ERR_MD_INVALID;
3033                 goto err;
3034         }
3035         if (be32_to_cpu(buffer->al_offset) != bdev->md.al_offset) {
3036                 dev_err(DEV, "unexpected al_offset: %d (expected %d)\n",
3037                     be32_to_cpu(buffer->al_offset), bdev->md.al_offset);
3038                 rv = ERR_MD_INVALID;
3039                 goto err;
3040         }
3041         if (be32_to_cpu(buffer->bm_offset) != bdev->md.bm_offset) {
3042                 dev_err(DEV, "unexpected bm_offset: %d (expected %d)\n",
3043                     be32_to_cpu(buffer->bm_offset), bdev->md.bm_offset);
3044                 rv = ERR_MD_INVALID;
3045                 goto err;
3046         }
3047         if (be32_to_cpu(buffer->md_size_sect) != bdev->md.md_size_sect) {
3048                 dev_err(DEV, "unexpected md_size: %u (expected %u)\n",
3049                     be32_to_cpu(buffer->md_size_sect), bdev->md.md_size_sect);
3050                 rv = ERR_MD_INVALID;
3051                 goto err;
3052         }
3053
3054         if (be32_to_cpu(buffer->bm_bytes_per_bit) != BM_BLOCK_SIZE) {
3055                 dev_err(DEV, "unexpected bm_bytes_per_bit: %u (expected %u)\n",
3056                     be32_to_cpu(buffer->bm_bytes_per_bit), BM_BLOCK_SIZE);
3057                 rv = ERR_MD_INVALID;
3058                 goto err;
3059         }
3060
3061         bdev->md.la_size_sect = be64_to_cpu(buffer->la_size);
3062         for (i = UI_CURRENT; i < UI_SIZE; i++)
3063                 bdev->md.uuid[i] = be64_to_cpu(buffer->uuid[i]);
3064         bdev->md.flags = be32_to_cpu(buffer->flags);
3065         bdev->md.device_uuid = be64_to_cpu(buffer->device_uuid);
3066
3067         spin_lock_irq(&mdev->tconn->req_lock);
3068         if (mdev->state.conn < C_CONNECTED) {
3069                 int peer;
3070                 peer = be32_to_cpu(buffer->la_peer_max_bio_size);
3071                 peer = max_t(int, peer, DRBD_MAX_BIO_SIZE_SAFE);
3072                 mdev->peer_max_bio_size = peer;
3073         }
3074         spin_unlock_irq(&mdev->tconn->req_lock);
3075
3076  err:
3077         drbd_md_put_buffer(mdev);
3078  out:
3079         put_ldev(mdev);
3080
3081         return rv;
3082 }
3083
3084 /**
3085  * drbd_md_mark_dirty() - Mark meta data super block as dirty
3086  * @mdev:       DRBD device.
3087  *
3088  * Call this function if you change anything that should be written to
3089  * the meta-data super block. This function sets MD_DIRTY, and starts a
3090  * timer that ensures that within five seconds you have to call drbd_md_sync().
3091  */
3092 #ifdef DEBUG
3093 void drbd_md_mark_dirty_(struct drbd_conf *mdev, unsigned int line, const char *func)
3094 {
3095         if (!test_and_set_bit(MD_DIRTY, &mdev->flags)) {
3096                 mod_timer(&mdev->md_sync_timer, jiffies + HZ);
3097                 mdev->last_md_mark_dirty.line = line;
3098                 mdev->last_md_mark_dirty.func = func;
3099         }
3100 }
3101 #else
3102 void drbd_md_mark_dirty(struct drbd_conf *mdev)
3103 {
3104         if (!test_and_set_bit(MD_DIRTY, &mdev->flags))
3105                 mod_timer(&mdev->md_sync_timer, jiffies + 5*HZ);
3106 }
3107 #endif
3108
3109 static void drbd_uuid_move_history(struct drbd_conf *mdev) __must_hold(local)
3110 {
3111         int i;
3112
3113         for (i = UI_HISTORY_START; i < UI_HISTORY_END; i++)
3114                 mdev->ldev->md.uuid[i+1] = mdev->ldev->md.uuid[i];
3115 }
3116
3117 void _drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
3118 {
3119         if (idx == UI_CURRENT) {
3120                 if (mdev->state.role == R_PRIMARY)
3121                         val |= 1;
3122                 else
3123                         val &= ~((u64)1);
3124
3125                 drbd_set_ed_uuid(mdev, val);
3126         }
3127
3128         mdev->ldev->md.uuid[idx] = val;
3129         drbd_md_mark_dirty(mdev);
3130 }
3131
3132
3133 void drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
3134 {
3135         if (mdev->ldev->md.uuid[idx]) {
3136                 drbd_uuid_move_history(mdev);
3137                 mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[idx];
3138         }
3139         _drbd_uuid_set(mdev, idx, val);
3140 }
3141
3142 /**
3143  * drbd_uuid_new_current() - Creates a new current UUID
3144  * @mdev:       DRBD device.
3145  *
3146  * Creates a new current UUID, and rotates the old current UUID into
3147  * the bitmap slot. Causes an incremental resync upon next connect.
3148  */
3149 void drbd_uuid_new_current(struct drbd_conf *mdev) __must_hold(local)
3150 {
3151         u64 val;
3152         unsigned long long bm_uuid = mdev->ldev->md.uuid[UI_BITMAP];
3153
3154         if (bm_uuid)
3155                 dev_warn(DEV, "bm UUID was already set: %llX\n", bm_uuid);
3156
3157         mdev->ldev->md.uuid[UI_BITMAP] = mdev->ldev->md.uuid[UI_CURRENT];
3158
3159         get_random_bytes(&val, sizeof(u64));
3160         _drbd_uuid_set(mdev, UI_CURRENT, val);
3161         drbd_print_uuids(mdev, "new current UUID");
3162         /* get it to stable storage _now_ */
3163         drbd_md_sync(mdev);
3164 }
3165
3166 void drbd_uuid_set_bm(struct drbd_conf *mdev, u64 val) __must_hold(local)
3167 {
3168         if (mdev->ldev->md.uuid[UI_BITMAP] == 0 && val == 0)
3169                 return;
3170
3171         if (val == 0) {
3172                 drbd_uuid_move_history(mdev);
3173                 mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[UI_BITMAP];
3174                 mdev->ldev->md.uuid[UI_BITMAP] = 0;
3175         } else {
3176                 unsigned long long bm_uuid = mdev->ldev->md.uuid[UI_BITMAP];
3177                 if (bm_uuid)
3178                         dev_warn(DEV, "bm UUID was already set: %llX\n", bm_uuid);
3179
3180                 mdev->ldev->md.uuid[UI_BITMAP] = val & ~((u64)1);
3181         }
3182         drbd_md_mark_dirty(mdev);
3183 }
3184
3185 /**
3186  * drbd_bmio_set_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
3187  * @mdev:       DRBD device.
3188  *
3189  * Sets all bits in the bitmap and writes the whole bitmap to stable storage.
3190  */
3191 int drbd_bmio_set_n_write(struct drbd_conf *mdev)
3192 {
3193         int rv = -EIO;
3194
3195         if (get_ldev_if_state(mdev, D_ATTACHING)) {
3196                 drbd_md_set_flag(mdev, MDF_FULL_SYNC);
3197                 drbd_md_sync(mdev);
3198                 drbd_bm_set_all(mdev);
3199
3200                 rv = drbd_bm_write(mdev);
3201
3202                 if (!rv) {
3203                         drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
3204                         drbd_md_sync(mdev);
3205                 }
3206
3207                 put_ldev(mdev);
3208         }
3209
3210         return rv;
3211 }
3212
3213 /**
3214  * drbd_bmio_clear_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
3215  * @mdev:       DRBD device.
3216  *
3217  * Clears all bits in the bitmap and writes the whole bitmap to stable storage.
3218  */
3219 int drbd_bmio_clear_n_write(struct drbd_conf *mdev)
3220 {
3221         int rv = -EIO;
3222
3223         drbd_resume_al(mdev);
3224         if (get_ldev_if_state(mdev, D_ATTACHING)) {
3225                 drbd_bm_clear_all(mdev);
3226                 rv = drbd_bm_write(mdev);
3227                 put_ldev(mdev);
3228         }
3229
3230         return rv;
3231 }
3232
3233 static int w_bitmap_io(struct drbd_work *w, int unused)
3234 {
3235         struct bm_io_work *work = container_of(w, struct bm_io_work, w);
3236         struct drbd_conf *mdev = w->mdev;
3237         int rv = -EIO;
3238
3239         D_ASSERT(atomic_read(&mdev->ap_bio_cnt) == 0);
3240
3241         if (get_ldev(mdev)) {
3242                 drbd_bm_lock(mdev, work->why, work->flags);
3243                 rv = work->io_fn(mdev);
3244                 drbd_bm_unlock(mdev);
3245                 put_ldev(mdev);
3246         }
3247
3248         clear_bit_unlock(BITMAP_IO, &mdev->flags);
3249         wake_up(&mdev->misc_wait);
3250
3251         if (work->done)
3252                 work->done(mdev, rv);
3253
3254         clear_bit(BITMAP_IO_QUEUED, &mdev->flags);
3255         work->why = NULL;
3256         work->flags = 0;
3257
3258         return 0;
3259 }
3260
3261 void drbd_ldev_destroy(struct drbd_conf *mdev)
3262 {
3263         lc_destroy(mdev->resync);
3264         mdev->resync = NULL;
3265         lc_destroy(mdev->act_log);
3266         mdev->act_log = NULL;
3267         __no_warn(local,
3268                 drbd_free_bc(mdev->ldev);
3269                 mdev->ldev = NULL;);
3270
3271         clear_bit(GO_DISKLESS, &mdev->flags);
3272 }
3273
3274 static int w_go_diskless(struct drbd_work *w, int unused)
3275 {
3276         struct drbd_conf *mdev = w->mdev;
3277
3278         D_ASSERT(mdev->state.disk == D_FAILED);
3279         /* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
3280          * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
3281          * the protected members anymore, though, so once put_ldev reaches zero
3282          * again, it will be safe to free them. */
3283         drbd_force_state(mdev, NS(disk, D_DISKLESS));
3284         return 0;
3285 }
3286
3287 void drbd_go_diskless(struct drbd_conf *mdev)
3288 {
3289         D_ASSERT(mdev->state.disk == D_FAILED);
3290         if (!test_and_set_bit(GO_DISKLESS, &mdev->flags))
3291                 drbd_queue_work(&mdev->tconn->data.work, &mdev->go_diskless);
3292 }
3293
3294 /**
3295  * drbd_queue_bitmap_io() - Queues an IO operation on the whole bitmap
3296  * @mdev:       DRBD device.
3297  * @io_fn:      IO callback to be called when bitmap IO is possible
3298  * @done:       callback to be called after the bitmap IO was performed
3299  * @why:        Descriptive text of the reason for doing the IO
3300  *
3301  * While IO on the bitmap happens we freeze application IO thus we ensure
3302  * that drbd_set_out_of_sync() can not be called. This function MAY ONLY be
3303  * called from worker context. It MUST NOT be used while a previous such
3304  * work is still pending!
3305  */
3306 void drbd_queue_bitmap_io(struct drbd_conf *mdev,
3307                           int (*io_fn)(struct drbd_conf *),
3308                           void (*done)(struct drbd_conf *, int),
3309                           char *why, enum bm_flag flags)
3310 {
3311         D_ASSERT(current == mdev->tconn->worker.task);
3312
3313         D_ASSERT(!test_bit(BITMAP_IO_QUEUED, &mdev->flags));
3314         D_ASSERT(!test_bit(BITMAP_IO, &mdev->flags));
3315         D_ASSERT(list_empty(&mdev->bm_io_work.w.list));
3316         if (mdev->bm_io_work.why)
3317                 dev_err(DEV, "FIXME going to queue '%s' but '%s' still pending?\n",
3318                         why, mdev->bm_io_work.why);
3319
3320         mdev->bm_io_work.io_fn = io_fn;
3321         mdev->bm_io_work.done = done;
3322         mdev->bm_io_work.why = why;
3323         mdev->bm_io_work.flags = flags;
3324
3325         spin_lock_irq(&mdev->tconn->req_lock);
3326         set_bit(BITMAP_IO, &mdev->flags);
3327         if (atomic_read(&mdev->ap_bio_cnt) == 0) {
3328                 if (!test_and_set_bit(BITMAP_IO_QUEUED, &mdev->flags))
3329                         drbd_queue_work(&mdev->tconn->data.work, &mdev->bm_io_work.w);
3330         }
3331         spin_unlock_irq(&mdev->tconn->req_lock);
3332 }
3333
3334 /**
3335  * drbd_bitmap_io() -  Does an IO operation on the whole bitmap
3336  * @mdev:       DRBD device.
3337  * @io_fn:      IO callback to be called when bitmap IO is possible
3338  * @why:        Descriptive text of the reason for doing the IO
3339  *
3340  * freezes application IO while that the actual IO operations runs. This
3341  * functions MAY NOT be called from worker context.
3342  */
3343 int drbd_bitmap_io(struct drbd_conf *mdev, int (*io_fn)(struct drbd_conf *),
3344                 char *why, enum bm_flag flags)
3345 {
3346         int rv;
3347
3348         D_ASSERT(current != mdev->tconn->worker.task);
3349
3350         if ((flags & BM_LOCKED_SET_ALLOWED) == 0)
3351                 drbd_suspend_io(mdev);
3352
3353         drbd_bm_lock(mdev, why, flags);
3354         rv = io_fn(mdev);
3355         drbd_bm_unlock(mdev);
3356
3357         if ((flags & BM_LOCKED_SET_ALLOWED) == 0)
3358                 drbd_resume_io(mdev);
3359
3360         return rv;
3361 }
3362
3363 void drbd_md_set_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
3364 {
3365         if ((mdev->ldev->md.flags & flag) != flag) {
3366                 drbd_md_mark_dirty(mdev);
3367                 mdev->ldev->md.flags |= flag;
3368         }
3369 }
3370
3371 void drbd_md_clear_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
3372 {
3373         if ((mdev->ldev->md.flags & flag) != 0) {
3374                 drbd_md_mark_dirty(mdev);
3375                 mdev->ldev->md.flags &= ~flag;
3376         }
3377 }
3378 int drbd_md_test_flag(struct drbd_backing_dev *bdev, int flag)
3379 {
3380         return (bdev->md.flags & flag) != 0;
3381 }
3382
3383 static void md_sync_timer_fn(unsigned long data)
3384 {
3385         struct drbd_conf *mdev = (struct drbd_conf *) data;
3386
3387         drbd_queue_work_front(&mdev->tconn->data.work, &mdev->md_sync_work);
3388 }
3389
3390 static int w_md_sync(struct drbd_work *w, int unused)
3391 {
3392         struct drbd_conf *mdev = w->mdev;
3393
3394         dev_warn(DEV, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
3395 #ifdef DEBUG
3396         dev_warn(DEV, "last md_mark_dirty: %s:%u\n",
3397                 mdev->last_md_mark_dirty.func, mdev->last_md_mark_dirty.line);
3398 #endif
3399         drbd_md_sync(mdev);
3400         return 0;
3401 }
3402
3403 const char *cmdname(enum drbd_packet cmd)
3404 {
3405         /* THINK may need to become several global tables
3406          * when we want to support more than
3407          * one PRO_VERSION */
3408         static const char *cmdnames[] = {
3409                 [P_DATA]                = "Data",
3410                 [P_DATA_REPLY]          = "DataReply",
3411                 [P_RS_DATA_REPLY]       = "RSDataReply",
3412                 [P_BARRIER]             = "Barrier",
3413                 [P_BITMAP]              = "ReportBitMap",
3414                 [P_BECOME_SYNC_TARGET]  = "BecomeSyncTarget",
3415                 [P_BECOME_SYNC_SOURCE]  = "BecomeSyncSource",
3416                 [P_UNPLUG_REMOTE]       = "UnplugRemote",
3417                 [P_DATA_REQUEST]        = "DataRequest",
3418                 [P_RS_DATA_REQUEST]     = "RSDataRequest",
3419                 [P_SYNC_PARAM]          = "SyncParam",
3420                 [P_SYNC_PARAM89]        = "SyncParam89",
3421                 [P_PROTOCOL]            = "ReportProtocol",
3422                 [P_UUIDS]               = "ReportUUIDs",
3423                 [P_SIZES]               = "ReportSizes",
3424                 [P_STATE]               = "ReportState",
3425                 [P_SYNC_UUID]           = "ReportSyncUUID",
3426                 [P_AUTH_CHALLENGE]      = "AuthChallenge",
3427                 [P_AUTH_RESPONSE]       = "AuthResponse",
3428                 [P_PING]                = "Ping",
3429                 [P_PING_ACK]            = "PingAck",
3430                 [P_RECV_ACK]            = "RecvAck",
3431                 [P_WRITE_ACK]           = "WriteAck",
3432                 [P_RS_WRITE_ACK]        = "RSWriteAck",
3433                 [P_DISCARD_WRITE]        = "DiscardWrite",
3434                 [P_NEG_ACK]             = "NegAck",
3435                 [P_NEG_DREPLY]          = "NegDReply",
3436                 [P_NEG_RS_DREPLY]       = "NegRSDReply",
3437                 [P_BARRIER_ACK]         = "BarrierAck",
3438                 [P_STATE_CHG_REQ]       = "StateChgRequest",
3439                 [P_STATE_CHG_REPLY]     = "StateChgReply",
3440                 [P_OV_REQUEST]          = "OVRequest",
3441                 [P_OV_REPLY]            = "OVReply",
3442                 [P_OV_RESULT]           = "OVResult",
3443                 [P_CSUM_RS_REQUEST]     = "CsumRSRequest",
3444                 [P_RS_IS_IN_SYNC]       = "CsumRSIsInSync",
3445                 [P_COMPRESSED_BITMAP]   = "CBitmap",
3446                 [P_DELAY_PROBE]         = "DelayProbe",
3447                 [P_OUT_OF_SYNC]         = "OutOfSync",
3448                 [P_RETRY_WRITE]         = "RetryWrite",
3449                 [P_RS_CANCEL]           = "RSCancel",
3450                 [P_CONN_ST_CHG_REQ]     = "conn_st_chg_req",
3451                 [P_CONN_ST_CHG_REPLY]   = "conn_st_chg_reply",
3452                 [P_RETRY_WRITE]         = "retry_write",
3453                 [P_PROTOCOL_UPDATE]     = "protocol_update",
3454
3455                 /* enum drbd_packet, but not commands - obsoleted flags:
3456                  *      P_MAY_IGNORE
3457                  *      P_MAX_OPT_CMD
3458                  */
3459         };
3460
3461         /* too big for the array: 0xfffX */
3462         if (cmd == P_INITIAL_META)
3463                 return "InitialMeta";
3464         if (cmd == P_INITIAL_DATA)
3465                 return "InitialData";
3466         if (cmd == P_CONNECTION_FEATURES)
3467                 return "ConnectionFeatures";
3468         if (cmd >= ARRAY_SIZE(cmdnames))
3469                 return "Unknown";
3470         return cmdnames[cmd];
3471 }
3472
3473 /**
3474  * drbd_wait_misc  -  wait for a request to make progress
3475  * @mdev:       device associated with the request
3476  * @i:          the struct drbd_interval embedded in struct drbd_request or
3477  *              struct drbd_peer_request
3478  */
3479 int drbd_wait_misc(struct drbd_conf *mdev, struct drbd_interval *i)
3480 {
3481         struct net_conf *nc;
3482         DEFINE_WAIT(wait);
3483         long timeout;
3484
3485         rcu_read_lock();
3486         nc = rcu_dereference(mdev->tconn->net_conf);
3487         if (!nc) {
3488                 rcu_read_unlock();
3489                 return -ETIMEDOUT;
3490         }
3491         timeout = nc->ko_count ? nc->timeout * HZ / 10 * nc->ko_count : MAX_SCHEDULE_TIMEOUT;
3492         rcu_read_unlock();
3493
3494         /* Indicate to wake up mdev->misc_wait on progress.  */
3495         i->waiting = true;
3496         prepare_to_wait(&mdev->misc_wait, &wait, TASK_INTERRUPTIBLE);
3497         spin_unlock_irq(&mdev->tconn->req_lock);
3498         timeout = schedule_timeout(timeout);
3499         finish_wait(&mdev->misc_wait, &wait);
3500         spin_lock_irq(&mdev->tconn->req_lock);
3501         if (!timeout || mdev->state.conn < C_CONNECTED)
3502                 return -ETIMEDOUT;
3503         if (signal_pending(current))
3504                 return -ERESTARTSYS;
3505         return 0;
3506 }
3507
3508 #ifdef CONFIG_DRBD_FAULT_INJECTION
3509 /* Fault insertion support including random number generator shamelessly
3510  * stolen from kernel/rcutorture.c */
3511 struct fault_random_state {
3512         unsigned long state;
3513         unsigned long count;
3514 };
3515
3516 #define FAULT_RANDOM_MULT 39916801  /* prime */
3517 #define FAULT_RANDOM_ADD        479001701 /* prime */
3518 #define FAULT_RANDOM_REFRESH 10000
3519
3520 /*
3521  * Crude but fast random-number generator.  Uses a linear congruential
3522  * generator, with occasional help from get_random_bytes().
3523  */
3524 static unsigned long
3525 _drbd_fault_random(struct fault_random_state *rsp)
3526 {
3527         long refresh;
3528
3529         if (!rsp->count--) {
3530                 get_random_bytes(&refresh, sizeof(refresh));
3531                 rsp->state += refresh;
3532                 rsp->count = FAULT_RANDOM_REFRESH;
3533         }
3534         rsp->state = rsp->state * FAULT_RANDOM_MULT + FAULT_RANDOM_ADD;
3535         return swahw32(rsp->state);
3536 }
3537
3538 static char *
3539 _drbd_fault_str(unsigned int type) {
3540         static char *_faults[] = {
3541                 [DRBD_FAULT_MD_WR] = "Meta-data write",
3542                 [DRBD_FAULT_MD_RD] = "Meta-data read",
3543                 [DRBD_FAULT_RS_WR] = "Resync write",
3544                 [DRBD_FAULT_RS_RD] = "Resync read",
3545                 [DRBD_FAULT_DT_WR] = "Data write",
3546                 [DRBD_FAULT_DT_RD] = "Data read",
3547                 [DRBD_FAULT_DT_RA] = "Data read ahead",
3548                 [DRBD_FAULT_BM_ALLOC] = "BM allocation",
3549                 [DRBD_FAULT_AL_EE] = "EE allocation",
3550                 [DRBD_FAULT_RECEIVE] = "receive data corruption",
3551         };
3552
3553         return (type < DRBD_FAULT_MAX) ? _faults[type] : "**Unknown**";
3554 }
3555
3556 unsigned int
3557 _drbd_insert_fault(struct drbd_conf *mdev, unsigned int type)
3558 {
3559         static struct fault_random_state rrs = {0, 0};
3560
3561         unsigned int ret = (
3562                 (fault_devs == 0 ||
3563                         ((1 << mdev_to_minor(mdev)) & fault_devs) != 0) &&
3564                 (((_drbd_fault_random(&rrs) % 100) + 1) <= fault_rate));
3565
3566         if (ret) {
3567                 fault_count++;
3568
3569                 if (__ratelimit(&drbd_ratelimit_state))
3570                         dev_warn(DEV, "***Simulating %s failure\n",
3571                                 _drbd_fault_str(type));
3572         }
3573
3574         return ret;
3575 }
3576 #endif
3577
3578 const char *drbd_buildtag(void)
3579 {
3580         /* DRBD built from external sources has here a reference to the
3581            git hash of the source code. */
3582
3583         static char buildtag[38] = "\0uilt-in";
3584
3585         if (buildtag[0] == 0) {
3586 #ifdef CONFIG_MODULES
3587                 if (THIS_MODULE != NULL)
3588                         sprintf(buildtag, "srcversion: %-24s", THIS_MODULE->srcversion);
3589                 else
3590 #endif
3591                         buildtag[0] = 'b';
3592         }
3593
3594         return buildtag;
3595 }
3596
3597 module_init(drbd_init)
3598 module_exit(drbd_cleanup)
3599
3600 EXPORT_SYMBOL(drbd_conn_str);
3601 EXPORT_SYMBOL(drbd_role_str);
3602 EXPORT_SYMBOL(drbd_disk_str);
3603 EXPORT_SYMBOL(drbd_set_st_err_str);