drbd: struct drbd_peer_request: Use drbd_work instead of drbd_device_work
[cascardo/linux.git] / drivers / block / drbd / drbd_receiver.c
1 /*
2    drbd_receiver.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25
26 #include <linux/module.h>
27
28 #include <asm/uaccess.h>
29 #include <net/sock.h>
30
31 #include <linux/drbd.h>
32 #include <linux/fs.h>
33 #include <linux/file.h>
34 #include <linux/in.h>
35 #include <linux/mm.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/pkt_sched.h>
40 #define __KERNEL_SYSCALLS__
41 #include <linux/unistd.h>
42 #include <linux/vmalloc.h>
43 #include <linux/random.h>
44 #include <linux/string.h>
45 #include <linux/scatterlist.h>
46 #include "drbd_int.h"
47 #include "drbd_protocol.h"
48 #include "drbd_req.h"
49
50 #include "drbd_vli.h"
51
52 struct packet_info {
53         enum drbd_packet cmd;
54         unsigned int size;
55         unsigned int vnr;
56         void *data;
57 };
58
59 enum finish_epoch {
60         FE_STILL_LIVE,
61         FE_DESTROYED,
62         FE_RECYCLED,
63 };
64
65 static int drbd_do_features(struct drbd_connection *connection);
66 static int drbd_do_auth(struct drbd_connection *connection);
67 static int drbd_disconnected(struct drbd_peer_device *);
68
69 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *, struct drbd_epoch *, enum epoch_event);
70 static int e_end_block(struct drbd_work *, int);
71
72
73 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
74
75 /*
76  * some helper functions to deal with single linked page lists,
77  * page->private being our "next" pointer.
78  */
79
80 /* If at least n pages are linked at head, get n pages off.
81  * Otherwise, don't modify head, and return NULL.
82  * Locking is the responsibility of the caller.
83  */
84 static struct page *page_chain_del(struct page **head, int n)
85 {
86         struct page *page;
87         struct page *tmp;
88
89         BUG_ON(!n);
90         BUG_ON(!head);
91
92         page = *head;
93
94         if (!page)
95                 return NULL;
96
97         while (page) {
98                 tmp = page_chain_next(page);
99                 if (--n == 0)
100                         break; /* found sufficient pages */
101                 if (tmp == NULL)
102                         /* insufficient pages, don't use any of them. */
103                         return NULL;
104                 page = tmp;
105         }
106
107         /* add end of list marker for the returned list */
108         set_page_private(page, 0);
109         /* actual return value, and adjustment of head */
110         page = *head;
111         *head = tmp;
112         return page;
113 }
114
115 /* may be used outside of locks to find the tail of a (usually short)
116  * "private" page chain, before adding it back to a global chain head
117  * with page_chain_add() under a spinlock. */
118 static struct page *page_chain_tail(struct page *page, int *len)
119 {
120         struct page *tmp;
121         int i = 1;
122         while ((tmp = page_chain_next(page)))
123                 ++i, page = tmp;
124         if (len)
125                 *len = i;
126         return page;
127 }
128
129 static int page_chain_free(struct page *page)
130 {
131         struct page *tmp;
132         int i = 0;
133         page_chain_for_each_safe(page, tmp) {
134                 put_page(page);
135                 ++i;
136         }
137         return i;
138 }
139
140 static void page_chain_add(struct page **head,
141                 struct page *chain_first, struct page *chain_last)
142 {
143 #if 1
144         struct page *tmp;
145         tmp = page_chain_tail(chain_first, NULL);
146         BUG_ON(tmp != chain_last);
147 #endif
148
149         /* add chain to head */
150         set_page_private(chain_last, (unsigned long)*head);
151         *head = chain_first;
152 }
153
154 static struct page *__drbd_alloc_pages(struct drbd_device *device,
155                                        unsigned int number)
156 {
157         struct page *page = NULL;
158         struct page *tmp = NULL;
159         unsigned int i = 0;
160
161         /* Yes, testing drbd_pp_vacant outside the lock is racy.
162          * So what. It saves a spin_lock. */
163         if (drbd_pp_vacant >= number) {
164                 spin_lock(&drbd_pp_lock);
165                 page = page_chain_del(&drbd_pp_pool, number);
166                 if (page)
167                         drbd_pp_vacant -= number;
168                 spin_unlock(&drbd_pp_lock);
169                 if (page)
170                         return page;
171         }
172
173         /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
174          * "criss-cross" setup, that might cause write-out on some other DRBD,
175          * which in turn might block on the other node at this very place.  */
176         for (i = 0; i < number; i++) {
177                 tmp = alloc_page(GFP_TRY);
178                 if (!tmp)
179                         break;
180                 set_page_private(tmp, (unsigned long)page);
181                 page = tmp;
182         }
183
184         if (i == number)
185                 return page;
186
187         /* Not enough pages immediately available this time.
188          * No need to jump around here, drbd_alloc_pages will retry this
189          * function "soon". */
190         if (page) {
191                 tmp = page_chain_tail(page, NULL);
192                 spin_lock(&drbd_pp_lock);
193                 page_chain_add(&drbd_pp_pool, page, tmp);
194                 drbd_pp_vacant += i;
195                 spin_unlock(&drbd_pp_lock);
196         }
197         return NULL;
198 }
199
200 static void reclaim_finished_net_peer_reqs(struct drbd_device *device,
201                                            struct list_head *to_be_freed)
202 {
203         struct drbd_peer_request *peer_req, *tmp;
204
205         /* The EEs are always appended to the end of the list. Since
206            they are sent in order over the wire, they have to finish
207            in order. As soon as we see the first not finished we can
208            stop to examine the list... */
209
210         list_for_each_entry_safe(peer_req, tmp, &device->net_ee, w.list) {
211                 if (drbd_peer_req_has_active_page(peer_req))
212                         break;
213                 list_move(&peer_req->w.list, to_be_freed);
214         }
215 }
216
217 static void drbd_kick_lo_and_reclaim_net(struct drbd_device *device)
218 {
219         LIST_HEAD(reclaimed);
220         struct drbd_peer_request *peer_req, *t;
221
222         spin_lock_irq(&device->resource->req_lock);
223         reclaim_finished_net_peer_reqs(device, &reclaimed);
224         spin_unlock_irq(&device->resource->req_lock);
225
226         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
227                 drbd_free_net_peer_req(device, peer_req);
228 }
229
230 /**
231  * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
232  * @device:     DRBD device.
233  * @number:     number of pages requested
234  * @retry:      whether to retry, if not enough pages are available right now
235  *
236  * Tries to allocate number pages, first from our own page pool, then from
237  * the kernel, unless this allocation would exceed the max_buffers setting.
238  * Possibly retry until DRBD frees sufficient pages somewhere else.
239  *
240  * Returns a page chain linked via page->private.
241  */
242 struct page *drbd_alloc_pages(struct drbd_peer_device *peer_device, unsigned int number,
243                               bool retry)
244 {
245         struct drbd_device *device = peer_device->device;
246         struct page *page = NULL;
247         struct net_conf *nc;
248         DEFINE_WAIT(wait);
249         int mxb;
250
251         /* Yes, we may run up to @number over max_buffers. If we
252          * follow it strictly, the admin will get it wrong anyways. */
253         rcu_read_lock();
254         nc = rcu_dereference(peer_device->connection->net_conf);
255         mxb = nc ? nc->max_buffers : 1000000;
256         rcu_read_unlock();
257
258         if (atomic_read(&device->pp_in_use) < mxb)
259                 page = __drbd_alloc_pages(device, number);
260
261         while (page == NULL) {
262                 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
263
264                 drbd_kick_lo_and_reclaim_net(device);
265
266                 if (atomic_read(&device->pp_in_use) < mxb) {
267                         page = __drbd_alloc_pages(device, number);
268                         if (page)
269                                 break;
270                 }
271
272                 if (!retry)
273                         break;
274
275                 if (signal_pending(current)) {
276                         drbd_warn(device, "drbd_alloc_pages interrupted!\n");
277                         break;
278                 }
279
280                 schedule();
281         }
282         finish_wait(&drbd_pp_wait, &wait);
283
284         if (page)
285                 atomic_add(number, &device->pp_in_use);
286         return page;
287 }
288
289 /* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
290  * Is also used from inside an other spin_lock_irq(&resource->req_lock);
291  * Either links the page chain back to the global pool,
292  * or returns all pages to the system. */
293 static void drbd_free_pages(struct drbd_device *device, struct page *page, int is_net)
294 {
295         atomic_t *a = is_net ? &device->pp_in_use_by_net : &device->pp_in_use;
296         int i;
297
298         if (page == NULL)
299                 return;
300
301         if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count)
302                 i = page_chain_free(page);
303         else {
304                 struct page *tmp;
305                 tmp = page_chain_tail(page, &i);
306                 spin_lock(&drbd_pp_lock);
307                 page_chain_add(&drbd_pp_pool, page, tmp);
308                 drbd_pp_vacant += i;
309                 spin_unlock(&drbd_pp_lock);
310         }
311         i = atomic_sub_return(i, a);
312         if (i < 0)
313                 drbd_warn(device, "ASSERTION FAILED: %s: %d < 0\n",
314                         is_net ? "pp_in_use_by_net" : "pp_in_use", i);
315         wake_up(&drbd_pp_wait);
316 }
317
318 /*
319 You need to hold the req_lock:
320  _drbd_wait_ee_list_empty()
321
322 You must not have the req_lock:
323  drbd_free_peer_req()
324  drbd_alloc_peer_req()
325  drbd_free_peer_reqs()
326  drbd_ee_fix_bhs()
327  drbd_finish_peer_reqs()
328  drbd_clear_done_ee()
329  drbd_wait_ee_list_empty()
330 */
331
332 struct drbd_peer_request *
333 drbd_alloc_peer_req(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
334                     unsigned int data_size, gfp_t gfp_mask) __must_hold(local)
335 {
336         struct drbd_device *device = peer_device->device;
337         struct drbd_peer_request *peer_req;
338         struct page *page = NULL;
339         unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
340
341         if (drbd_insert_fault(device, DRBD_FAULT_AL_EE))
342                 return NULL;
343
344         peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
345         if (!peer_req) {
346                 if (!(gfp_mask & __GFP_NOWARN))
347                         drbd_err(device, "%s: allocation failed\n", __func__);
348                 return NULL;
349         }
350
351         if (data_size) {
352                 page = drbd_alloc_pages(peer_device, nr_pages, (gfp_mask & __GFP_WAIT));
353                 if (!page)
354                         goto fail;
355         }
356
357         drbd_clear_interval(&peer_req->i);
358         peer_req->i.size = data_size;
359         peer_req->i.sector = sector;
360         peer_req->i.local = false;
361         peer_req->i.waiting = false;
362
363         peer_req->epoch = NULL;
364         peer_req->peer_device = peer_device;
365         peer_req->pages = page;
366         atomic_set(&peer_req->pending_bios, 0);
367         peer_req->flags = 0;
368         /*
369          * The block_id is opaque to the receiver.  It is not endianness
370          * converted, and sent back to the sender unchanged.
371          */
372         peer_req->block_id = id;
373
374         return peer_req;
375
376  fail:
377         mempool_free(peer_req, drbd_ee_mempool);
378         return NULL;
379 }
380
381 void __drbd_free_peer_req(struct drbd_device *device, struct drbd_peer_request *peer_req,
382                        int is_net)
383 {
384         if (peer_req->flags & EE_HAS_DIGEST)
385                 kfree(peer_req->digest);
386         drbd_free_pages(device, peer_req->pages, is_net);
387         D_ASSERT(device, atomic_read(&peer_req->pending_bios) == 0);
388         D_ASSERT(device, drbd_interval_empty(&peer_req->i));
389         mempool_free(peer_req, drbd_ee_mempool);
390 }
391
392 int drbd_free_peer_reqs(struct drbd_device *device, struct list_head *list)
393 {
394         LIST_HEAD(work_list);
395         struct drbd_peer_request *peer_req, *t;
396         int count = 0;
397         int is_net = list == &device->net_ee;
398
399         spin_lock_irq(&device->resource->req_lock);
400         list_splice_init(list, &work_list);
401         spin_unlock_irq(&device->resource->req_lock);
402
403         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
404                 __drbd_free_peer_req(device, peer_req, is_net);
405                 count++;
406         }
407         return count;
408 }
409
410 /*
411  * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
412  */
413 static int drbd_finish_peer_reqs(struct drbd_device *device)
414 {
415         LIST_HEAD(work_list);
416         LIST_HEAD(reclaimed);
417         struct drbd_peer_request *peer_req, *t;
418         int err = 0;
419
420         spin_lock_irq(&device->resource->req_lock);
421         reclaim_finished_net_peer_reqs(device, &reclaimed);
422         list_splice_init(&device->done_ee, &work_list);
423         spin_unlock_irq(&device->resource->req_lock);
424
425         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
426                 drbd_free_net_peer_req(device, peer_req);
427
428         /* possible callbacks here:
429          * e_end_block, and e_end_resync_block, e_send_superseded.
430          * all ignore the last argument.
431          */
432         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
433                 int err2;
434
435                 /* list_del not necessary, next/prev members not touched */
436                 err2 = peer_req->w.cb(&peer_req->w, !!err);
437                 if (!err)
438                         err = err2;
439                 drbd_free_peer_req(device, peer_req);
440         }
441         wake_up(&device->ee_wait);
442
443         return err;
444 }
445
446 static void _drbd_wait_ee_list_empty(struct drbd_device *device,
447                                      struct list_head *head)
448 {
449         DEFINE_WAIT(wait);
450
451         /* avoids spin_lock/unlock
452          * and calling prepare_to_wait in the fast path */
453         while (!list_empty(head)) {
454                 prepare_to_wait(&device->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
455                 spin_unlock_irq(&device->resource->req_lock);
456                 io_schedule();
457                 finish_wait(&device->ee_wait, &wait);
458                 spin_lock_irq(&device->resource->req_lock);
459         }
460 }
461
462 static void drbd_wait_ee_list_empty(struct drbd_device *device,
463                                     struct list_head *head)
464 {
465         spin_lock_irq(&device->resource->req_lock);
466         _drbd_wait_ee_list_empty(device, head);
467         spin_unlock_irq(&device->resource->req_lock);
468 }
469
470 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
471 {
472         mm_segment_t oldfs;
473         struct kvec iov = {
474                 .iov_base = buf,
475                 .iov_len = size,
476         };
477         struct msghdr msg = {
478                 .msg_iovlen = 1,
479                 .msg_iov = (struct iovec *)&iov,
480                 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
481         };
482         int rv;
483
484         oldfs = get_fs();
485         set_fs(KERNEL_DS);
486         rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
487         set_fs(oldfs);
488
489         return rv;
490 }
491
492 static int drbd_recv(struct drbd_connection *connection, void *buf, size_t size)
493 {
494         int rv;
495
496         rv = drbd_recv_short(connection->data.socket, buf, size, 0);
497
498         if (rv < 0) {
499                 if (rv == -ECONNRESET)
500                         drbd_info(connection, "sock was reset by peer\n");
501                 else if (rv != -ERESTARTSYS)
502                         drbd_err(connection, "sock_recvmsg returned %d\n", rv);
503         } else if (rv == 0) {
504                 if (test_bit(DISCONNECT_SENT, &connection->flags)) {
505                         long t;
506                         rcu_read_lock();
507                         t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
508                         rcu_read_unlock();
509
510                         t = wait_event_timeout(connection->ping_wait, connection->cstate < C_WF_REPORT_PARAMS, t);
511
512                         if (t)
513                                 goto out;
514                 }
515                 drbd_info(connection, "sock was shut down by peer\n");
516         }
517
518         if (rv != size)
519                 conn_request_state(connection, NS(conn, C_BROKEN_PIPE), CS_HARD);
520
521 out:
522         return rv;
523 }
524
525 static int drbd_recv_all(struct drbd_connection *connection, void *buf, size_t size)
526 {
527         int err;
528
529         err = drbd_recv(connection, buf, size);
530         if (err != size) {
531                 if (err >= 0)
532                         err = -EIO;
533         } else
534                 err = 0;
535         return err;
536 }
537
538 static int drbd_recv_all_warn(struct drbd_connection *connection, void *buf, size_t size)
539 {
540         int err;
541
542         err = drbd_recv_all(connection, buf, size);
543         if (err && !signal_pending(current))
544                 drbd_warn(connection, "short read (expected size %d)\n", (int)size);
545         return err;
546 }
547
548 /* quoting tcp(7):
549  *   On individual connections, the socket buffer size must be set prior to the
550  *   listen(2) or connect(2) calls in order to have it take effect.
551  * This is our wrapper to do so.
552  */
553 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
554                 unsigned int rcv)
555 {
556         /* open coded SO_SNDBUF, SO_RCVBUF */
557         if (snd) {
558                 sock->sk->sk_sndbuf = snd;
559                 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
560         }
561         if (rcv) {
562                 sock->sk->sk_rcvbuf = rcv;
563                 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
564         }
565 }
566
567 static struct socket *drbd_try_connect(struct drbd_connection *connection)
568 {
569         const char *what;
570         struct socket *sock;
571         struct sockaddr_in6 src_in6;
572         struct sockaddr_in6 peer_in6;
573         struct net_conf *nc;
574         int err, peer_addr_len, my_addr_len;
575         int sndbuf_size, rcvbuf_size, connect_int;
576         int disconnect_on_error = 1;
577
578         rcu_read_lock();
579         nc = rcu_dereference(connection->net_conf);
580         if (!nc) {
581                 rcu_read_unlock();
582                 return NULL;
583         }
584         sndbuf_size = nc->sndbuf_size;
585         rcvbuf_size = nc->rcvbuf_size;
586         connect_int = nc->connect_int;
587         rcu_read_unlock();
588
589         my_addr_len = min_t(int, connection->my_addr_len, sizeof(src_in6));
590         memcpy(&src_in6, &connection->my_addr, my_addr_len);
591
592         if (((struct sockaddr *)&connection->my_addr)->sa_family == AF_INET6)
593                 src_in6.sin6_port = 0;
594         else
595                 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
596
597         peer_addr_len = min_t(int, connection->peer_addr_len, sizeof(src_in6));
598         memcpy(&peer_in6, &connection->peer_addr, peer_addr_len);
599
600         what = "sock_create_kern";
601         err = sock_create_kern(((struct sockaddr *)&src_in6)->sa_family,
602                                SOCK_STREAM, IPPROTO_TCP, &sock);
603         if (err < 0) {
604                 sock = NULL;
605                 goto out;
606         }
607
608         sock->sk->sk_rcvtimeo =
609         sock->sk->sk_sndtimeo = connect_int * HZ;
610         drbd_setbufsize(sock, sndbuf_size, rcvbuf_size);
611
612        /* explicitly bind to the configured IP as source IP
613         *  for the outgoing connections.
614         *  This is needed for multihomed hosts and to be
615         *  able to use lo: interfaces for drbd.
616         * Make sure to use 0 as port number, so linux selects
617         *  a free one dynamically.
618         */
619         what = "bind before connect";
620         err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len);
621         if (err < 0)
622                 goto out;
623
624         /* connect may fail, peer not yet available.
625          * stay C_WF_CONNECTION, don't go Disconnecting! */
626         disconnect_on_error = 0;
627         what = "connect";
628         err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0);
629
630 out:
631         if (err < 0) {
632                 if (sock) {
633                         sock_release(sock);
634                         sock = NULL;
635                 }
636                 switch (-err) {
637                         /* timeout, busy, signal pending */
638                 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
639                 case EINTR: case ERESTARTSYS:
640                         /* peer not (yet) available, network problem */
641                 case ECONNREFUSED: case ENETUNREACH:
642                 case EHOSTDOWN:    case EHOSTUNREACH:
643                         disconnect_on_error = 0;
644                         break;
645                 default:
646                         drbd_err(connection, "%s failed, err = %d\n", what, err);
647                 }
648                 if (disconnect_on_error)
649                         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
650         }
651
652         return sock;
653 }
654
655 struct accept_wait_data {
656         struct drbd_connection *connection;
657         struct socket *s_listen;
658         struct completion door_bell;
659         void (*original_sk_state_change)(struct sock *sk);
660
661 };
662
663 static void drbd_incoming_connection(struct sock *sk)
664 {
665         struct accept_wait_data *ad = sk->sk_user_data;
666         void (*state_change)(struct sock *sk);
667
668         state_change = ad->original_sk_state_change;
669         if (sk->sk_state == TCP_ESTABLISHED)
670                 complete(&ad->door_bell);
671         state_change(sk);
672 }
673
674 static int prepare_listen_socket(struct drbd_connection *connection, struct accept_wait_data *ad)
675 {
676         int err, sndbuf_size, rcvbuf_size, my_addr_len;
677         struct sockaddr_in6 my_addr;
678         struct socket *s_listen;
679         struct net_conf *nc;
680         const char *what;
681
682         rcu_read_lock();
683         nc = rcu_dereference(connection->net_conf);
684         if (!nc) {
685                 rcu_read_unlock();
686                 return -EIO;
687         }
688         sndbuf_size = nc->sndbuf_size;
689         rcvbuf_size = nc->rcvbuf_size;
690         rcu_read_unlock();
691
692         my_addr_len = min_t(int, connection->my_addr_len, sizeof(struct sockaddr_in6));
693         memcpy(&my_addr, &connection->my_addr, my_addr_len);
694
695         what = "sock_create_kern";
696         err = sock_create_kern(((struct sockaddr *)&my_addr)->sa_family,
697                                SOCK_STREAM, IPPROTO_TCP, &s_listen);
698         if (err) {
699                 s_listen = NULL;
700                 goto out;
701         }
702
703         s_listen->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
704         drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size);
705
706         what = "bind before listen";
707         err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len);
708         if (err < 0)
709                 goto out;
710
711         ad->s_listen = s_listen;
712         write_lock_bh(&s_listen->sk->sk_callback_lock);
713         ad->original_sk_state_change = s_listen->sk->sk_state_change;
714         s_listen->sk->sk_state_change = drbd_incoming_connection;
715         s_listen->sk->sk_user_data = ad;
716         write_unlock_bh(&s_listen->sk->sk_callback_lock);
717
718         what = "listen";
719         err = s_listen->ops->listen(s_listen, 5);
720         if (err < 0)
721                 goto out;
722
723         return 0;
724 out:
725         if (s_listen)
726                 sock_release(s_listen);
727         if (err < 0) {
728                 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
729                         drbd_err(connection, "%s failed, err = %d\n", what, err);
730                         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
731                 }
732         }
733
734         return -EIO;
735 }
736
737 static void unregister_state_change(struct sock *sk, struct accept_wait_data *ad)
738 {
739         write_lock_bh(&sk->sk_callback_lock);
740         sk->sk_state_change = ad->original_sk_state_change;
741         sk->sk_user_data = NULL;
742         write_unlock_bh(&sk->sk_callback_lock);
743 }
744
745 static struct socket *drbd_wait_for_connect(struct drbd_connection *connection, struct accept_wait_data *ad)
746 {
747         int timeo, connect_int, err = 0;
748         struct socket *s_estab = NULL;
749         struct net_conf *nc;
750
751         rcu_read_lock();
752         nc = rcu_dereference(connection->net_conf);
753         if (!nc) {
754                 rcu_read_unlock();
755                 return NULL;
756         }
757         connect_int = nc->connect_int;
758         rcu_read_unlock();
759
760         timeo = connect_int * HZ;
761         /* 28.5% random jitter */
762         timeo += (prandom_u32() & 1) ? timeo / 7 : -timeo / 7;
763
764         err = wait_for_completion_interruptible_timeout(&ad->door_bell, timeo);
765         if (err <= 0)
766                 return NULL;
767
768         err = kernel_accept(ad->s_listen, &s_estab, 0);
769         if (err < 0) {
770                 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
771                         drbd_err(connection, "accept failed, err = %d\n", err);
772                         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
773                 }
774         }
775
776         if (s_estab)
777                 unregister_state_change(s_estab->sk, ad);
778
779         return s_estab;
780 }
781
782 static int decode_header(struct drbd_connection *, void *, struct packet_info *);
783
784 static int send_first_packet(struct drbd_connection *connection, struct drbd_socket *sock,
785                              enum drbd_packet cmd)
786 {
787         if (!conn_prepare_command(connection, sock))
788                 return -EIO;
789         return conn_send_command(connection, sock, cmd, 0, NULL, 0);
790 }
791
792 static int receive_first_packet(struct drbd_connection *connection, struct socket *sock)
793 {
794         unsigned int header_size = drbd_header_size(connection);
795         struct packet_info pi;
796         int err;
797
798         err = drbd_recv_short(sock, connection->data.rbuf, header_size, 0);
799         if (err != header_size) {
800                 if (err >= 0)
801                         err = -EIO;
802                 return err;
803         }
804         err = decode_header(connection, connection->data.rbuf, &pi);
805         if (err)
806                 return err;
807         return pi.cmd;
808 }
809
810 /**
811  * drbd_socket_okay() - Free the socket if its connection is not okay
812  * @sock:       pointer to the pointer to the socket.
813  */
814 static int drbd_socket_okay(struct socket **sock)
815 {
816         int rr;
817         char tb[4];
818
819         if (!*sock)
820                 return false;
821
822         rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
823
824         if (rr > 0 || rr == -EAGAIN) {
825                 return true;
826         } else {
827                 sock_release(*sock);
828                 *sock = NULL;
829                 return false;
830         }
831 }
832 /* Gets called if a connection is established, or if a new minor gets created
833    in a connection */
834 int drbd_connected(struct drbd_peer_device *peer_device)
835 {
836         struct drbd_device *device = peer_device->device;
837         int err;
838
839         atomic_set(&device->packet_seq, 0);
840         device->peer_seq = 0;
841
842         device->state_mutex = peer_device->connection->agreed_pro_version < 100 ?
843                 &peer_device->connection->cstate_mutex :
844                 &device->own_state_mutex;
845
846         err = drbd_send_sync_param(peer_device);
847         if (!err)
848                 err = drbd_send_sizes(peer_device, 0, 0);
849         if (!err)
850                 err = drbd_send_uuids(peer_device);
851         if (!err)
852                 err = drbd_send_current_state(peer_device);
853         clear_bit(USE_DEGR_WFC_T, &device->flags);
854         clear_bit(RESIZE_PENDING, &device->flags);
855         atomic_set(&device->ap_in_flight, 0);
856         mod_timer(&device->request_timer, jiffies + HZ); /* just start it here. */
857         return err;
858 }
859
860 /*
861  * return values:
862  *   1 yes, we have a valid connection
863  *   0 oops, did not work out, please try again
864  *  -1 peer talks different language,
865  *     no point in trying again, please go standalone.
866  *  -2 We do not have a network config...
867  */
868 static int conn_connect(struct drbd_connection *connection)
869 {
870         struct drbd_socket sock, msock;
871         struct drbd_peer_device *peer_device;
872         struct net_conf *nc;
873         int vnr, timeout, h, ok;
874         bool discard_my_data;
875         enum drbd_state_rv rv;
876         struct accept_wait_data ad = {
877                 .connection = connection,
878                 .door_bell = COMPLETION_INITIALIZER_ONSTACK(ad.door_bell),
879         };
880
881         clear_bit(DISCONNECT_SENT, &connection->flags);
882         if (conn_request_state(connection, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
883                 return -2;
884
885         mutex_init(&sock.mutex);
886         sock.sbuf = connection->data.sbuf;
887         sock.rbuf = connection->data.rbuf;
888         sock.socket = NULL;
889         mutex_init(&msock.mutex);
890         msock.sbuf = connection->meta.sbuf;
891         msock.rbuf = connection->meta.rbuf;
892         msock.socket = NULL;
893
894         /* Assume that the peer only understands protocol 80 until we know better.  */
895         connection->agreed_pro_version = 80;
896
897         if (prepare_listen_socket(connection, &ad))
898                 return 0;
899
900         do {
901                 struct socket *s;
902
903                 s = drbd_try_connect(connection);
904                 if (s) {
905                         if (!sock.socket) {
906                                 sock.socket = s;
907                                 send_first_packet(connection, &sock, P_INITIAL_DATA);
908                         } else if (!msock.socket) {
909                                 clear_bit(RESOLVE_CONFLICTS, &connection->flags);
910                                 msock.socket = s;
911                                 send_first_packet(connection, &msock, P_INITIAL_META);
912                         } else {
913                                 drbd_err(connection, "Logic error in conn_connect()\n");
914                                 goto out_release_sockets;
915                         }
916                 }
917
918                 if (sock.socket && msock.socket) {
919                         rcu_read_lock();
920                         nc = rcu_dereference(connection->net_conf);
921                         timeout = nc->ping_timeo * HZ / 10;
922                         rcu_read_unlock();
923                         schedule_timeout_interruptible(timeout);
924                         ok = drbd_socket_okay(&sock.socket);
925                         ok = drbd_socket_okay(&msock.socket) && ok;
926                         if (ok)
927                                 break;
928                 }
929
930 retry:
931                 s = drbd_wait_for_connect(connection, &ad);
932                 if (s) {
933                         int fp = receive_first_packet(connection, s);
934                         drbd_socket_okay(&sock.socket);
935                         drbd_socket_okay(&msock.socket);
936                         switch (fp) {
937                         case P_INITIAL_DATA:
938                                 if (sock.socket) {
939                                         drbd_warn(connection, "initial packet S crossed\n");
940                                         sock_release(sock.socket);
941                                         sock.socket = s;
942                                         goto randomize;
943                                 }
944                                 sock.socket = s;
945                                 break;
946                         case P_INITIAL_META:
947                                 set_bit(RESOLVE_CONFLICTS, &connection->flags);
948                                 if (msock.socket) {
949                                         drbd_warn(connection, "initial packet M crossed\n");
950                                         sock_release(msock.socket);
951                                         msock.socket = s;
952                                         goto randomize;
953                                 }
954                                 msock.socket = s;
955                                 break;
956                         default:
957                                 drbd_warn(connection, "Error receiving initial packet\n");
958                                 sock_release(s);
959 randomize:
960                                 if (prandom_u32() & 1)
961                                         goto retry;
962                         }
963                 }
964
965                 if (connection->cstate <= C_DISCONNECTING)
966                         goto out_release_sockets;
967                 if (signal_pending(current)) {
968                         flush_signals(current);
969                         smp_rmb();
970                         if (get_t_state(&connection->receiver) == EXITING)
971                                 goto out_release_sockets;
972                 }
973
974                 ok = drbd_socket_okay(&sock.socket);
975                 ok = drbd_socket_okay(&msock.socket) && ok;
976         } while (!ok);
977
978         if (ad.s_listen)
979                 sock_release(ad.s_listen);
980
981         sock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
982         msock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
983
984         sock.socket->sk->sk_allocation = GFP_NOIO;
985         msock.socket->sk->sk_allocation = GFP_NOIO;
986
987         sock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
988         msock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE;
989
990         /* NOT YET ...
991          * sock.socket->sk->sk_sndtimeo = connection->net_conf->timeout*HZ/10;
992          * sock.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
993          * first set it to the P_CONNECTION_FEATURES timeout,
994          * which we set to 4x the configured ping_timeout. */
995         rcu_read_lock();
996         nc = rcu_dereference(connection->net_conf);
997
998         sock.socket->sk->sk_sndtimeo =
999         sock.socket->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10;
1000
1001         msock.socket->sk->sk_rcvtimeo = nc->ping_int*HZ;
1002         timeout = nc->timeout * HZ / 10;
1003         discard_my_data = nc->discard_my_data;
1004         rcu_read_unlock();
1005
1006         msock.socket->sk->sk_sndtimeo = timeout;
1007
1008         /* we don't want delays.
1009          * we use TCP_CORK where appropriate, though */
1010         drbd_tcp_nodelay(sock.socket);
1011         drbd_tcp_nodelay(msock.socket);
1012
1013         connection->data.socket = sock.socket;
1014         connection->meta.socket = msock.socket;
1015         connection->last_received = jiffies;
1016
1017         h = drbd_do_features(connection);
1018         if (h <= 0)
1019                 return h;
1020
1021         if (connection->cram_hmac_tfm) {
1022                 /* drbd_request_state(device, NS(conn, WFAuth)); */
1023                 switch (drbd_do_auth(connection)) {
1024                 case -1:
1025                         drbd_err(connection, "Authentication of peer failed\n");
1026                         return -1;
1027                 case 0:
1028                         drbd_err(connection, "Authentication of peer failed, trying again.\n");
1029                         return 0;
1030                 }
1031         }
1032
1033         connection->data.socket->sk->sk_sndtimeo = timeout;
1034         connection->data.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1035
1036         if (drbd_send_protocol(connection) == -EOPNOTSUPP)
1037                 return -1;
1038
1039         set_bit(STATE_SENT, &connection->flags);
1040
1041         rcu_read_lock();
1042         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1043                 struct drbd_device *device = peer_device->device;
1044                 kref_get(&device->kref);
1045                 rcu_read_unlock();
1046
1047                 /* Prevent a race between resync-handshake and
1048                  * being promoted to Primary.
1049                  *
1050                  * Grab and release the state mutex, so we know that any current
1051                  * drbd_set_role() is finished, and any incoming drbd_set_role
1052                  * will see the STATE_SENT flag, and wait for it to be cleared.
1053                  */
1054                 mutex_lock(device->state_mutex);
1055                 mutex_unlock(device->state_mutex);
1056
1057                 if (discard_my_data)
1058                         set_bit(DISCARD_MY_DATA, &device->flags);
1059                 else
1060                         clear_bit(DISCARD_MY_DATA, &device->flags);
1061
1062                 drbd_connected(peer_device);
1063                 kref_put(&device->kref, drbd_destroy_device);
1064                 rcu_read_lock();
1065         }
1066         rcu_read_unlock();
1067
1068         rv = conn_request_state(connection, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE);
1069         if (rv < SS_SUCCESS || connection->cstate != C_WF_REPORT_PARAMS) {
1070                 clear_bit(STATE_SENT, &connection->flags);
1071                 return 0;
1072         }
1073
1074         drbd_thread_start(&connection->asender);
1075
1076         mutex_lock(&connection->resource->conf_update);
1077         /* The discard_my_data flag is a single-shot modifier to the next
1078          * connection attempt, the handshake of which is now well underway.
1079          * No need for rcu style copying of the whole struct
1080          * just to clear a single value. */
1081         connection->net_conf->discard_my_data = 0;
1082         mutex_unlock(&connection->resource->conf_update);
1083
1084         return h;
1085
1086 out_release_sockets:
1087         if (ad.s_listen)
1088                 sock_release(ad.s_listen);
1089         if (sock.socket)
1090                 sock_release(sock.socket);
1091         if (msock.socket)
1092                 sock_release(msock.socket);
1093         return -1;
1094 }
1095
1096 static int decode_header(struct drbd_connection *connection, void *header, struct packet_info *pi)
1097 {
1098         unsigned int header_size = drbd_header_size(connection);
1099
1100         if (header_size == sizeof(struct p_header100) &&
1101             *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
1102                 struct p_header100 *h = header;
1103                 if (h->pad != 0) {
1104                         drbd_err(connection, "Header padding is not zero\n");
1105                         return -EINVAL;
1106                 }
1107                 pi->vnr = be16_to_cpu(h->volume);
1108                 pi->cmd = be16_to_cpu(h->command);
1109                 pi->size = be32_to_cpu(h->length);
1110         } else if (header_size == sizeof(struct p_header95) &&
1111                    *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
1112                 struct p_header95 *h = header;
1113                 pi->cmd = be16_to_cpu(h->command);
1114                 pi->size = be32_to_cpu(h->length);
1115                 pi->vnr = 0;
1116         } else if (header_size == sizeof(struct p_header80) &&
1117                    *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1118                 struct p_header80 *h = header;
1119                 pi->cmd = be16_to_cpu(h->command);
1120                 pi->size = be16_to_cpu(h->length);
1121                 pi->vnr = 0;
1122         } else {
1123                 drbd_err(connection, "Wrong magic value 0x%08x in protocol version %d\n",
1124                          be32_to_cpu(*(__be32 *)header),
1125                          connection->agreed_pro_version);
1126                 return -EINVAL;
1127         }
1128         pi->data = header + header_size;
1129         return 0;
1130 }
1131
1132 static int drbd_recv_header(struct drbd_connection *connection, struct packet_info *pi)
1133 {
1134         void *buffer = connection->data.rbuf;
1135         int err;
1136
1137         err = drbd_recv_all_warn(connection, buffer, drbd_header_size(connection));
1138         if (err)
1139                 return err;
1140
1141         err = decode_header(connection, buffer, pi);
1142         connection->last_received = jiffies;
1143
1144         return err;
1145 }
1146
1147 static void drbd_flush(struct drbd_connection *connection)
1148 {
1149         int rv;
1150         struct drbd_peer_device *peer_device;
1151         int vnr;
1152
1153         if (connection->write_ordering >= WO_bdev_flush) {
1154                 rcu_read_lock();
1155                 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1156                         struct drbd_device *device = peer_device->device;
1157
1158                         if (!get_ldev(device))
1159                                 continue;
1160                         kref_get(&device->kref);
1161                         rcu_read_unlock();
1162
1163                         rv = blkdev_issue_flush(device->ldev->backing_bdev,
1164                                         GFP_NOIO, NULL);
1165                         if (rv) {
1166                                 drbd_info(device, "local disk flush failed with status %d\n", rv);
1167                                 /* would rather check on EOPNOTSUPP, but that is not reliable.
1168                                  * don't try again for ANY return value != 0
1169                                  * if (rv == -EOPNOTSUPP) */
1170                                 drbd_bump_write_ordering(connection, WO_drain_io);
1171                         }
1172                         put_ldev(device);
1173                         kref_put(&device->kref, drbd_destroy_device);
1174
1175                         rcu_read_lock();
1176                         if (rv)
1177                                 break;
1178                 }
1179                 rcu_read_unlock();
1180         }
1181 }
1182
1183 /**
1184  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1185  * @device:     DRBD device.
1186  * @epoch:      Epoch object.
1187  * @ev:         Epoch event.
1188  */
1189 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *connection,
1190                                                struct drbd_epoch *epoch,
1191                                                enum epoch_event ev)
1192 {
1193         int epoch_size;
1194         struct drbd_epoch *next_epoch;
1195         enum finish_epoch rv = FE_STILL_LIVE;
1196
1197         spin_lock(&connection->epoch_lock);
1198         do {
1199                 next_epoch = NULL;
1200
1201                 epoch_size = atomic_read(&epoch->epoch_size);
1202
1203                 switch (ev & ~EV_CLEANUP) {
1204                 case EV_PUT:
1205                         atomic_dec(&epoch->active);
1206                         break;
1207                 case EV_GOT_BARRIER_NR:
1208                         set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1209                         break;
1210                 case EV_BECAME_LAST:
1211                         /* nothing to do*/
1212                         break;
1213                 }
1214
1215                 if (epoch_size != 0 &&
1216                     atomic_read(&epoch->active) == 0 &&
1217                     (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) {
1218                         if (!(ev & EV_CLEANUP)) {
1219                                 spin_unlock(&connection->epoch_lock);
1220                                 drbd_send_b_ack(epoch->connection, epoch->barrier_nr, epoch_size);
1221                                 spin_lock(&connection->epoch_lock);
1222                         }
1223 #if 0
1224                         /* FIXME: dec unacked on connection, once we have
1225                          * something to count pending connection packets in. */
1226                         if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags))
1227                                 dec_unacked(epoch->connection);
1228 #endif
1229
1230                         if (connection->current_epoch != epoch) {
1231                                 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1232                                 list_del(&epoch->list);
1233                                 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1234                                 connection->epochs--;
1235                                 kfree(epoch);
1236
1237                                 if (rv == FE_STILL_LIVE)
1238                                         rv = FE_DESTROYED;
1239                         } else {
1240                                 epoch->flags = 0;
1241                                 atomic_set(&epoch->epoch_size, 0);
1242                                 /* atomic_set(&epoch->active, 0); is already zero */
1243                                 if (rv == FE_STILL_LIVE)
1244                                         rv = FE_RECYCLED;
1245                         }
1246                 }
1247
1248                 if (!next_epoch)
1249                         break;
1250
1251                 epoch = next_epoch;
1252         } while (1);
1253
1254         spin_unlock(&connection->epoch_lock);
1255
1256         return rv;
1257 }
1258
1259 /**
1260  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1261  * @connection: DRBD connection.
1262  * @wo:         Write ordering method to try.
1263  */
1264 void drbd_bump_write_ordering(struct drbd_connection *connection, enum write_ordering_e wo)
1265 {
1266         struct disk_conf *dc;
1267         struct drbd_peer_device *peer_device;
1268         enum write_ordering_e pwo;
1269         int vnr;
1270         static char *write_ordering_str[] = {
1271                 [WO_none] = "none",
1272                 [WO_drain_io] = "drain",
1273                 [WO_bdev_flush] = "flush",
1274         };
1275
1276         pwo = connection->write_ordering;
1277         wo = min(pwo, wo);
1278         rcu_read_lock();
1279         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1280                 struct drbd_device *device = peer_device->device;
1281
1282                 if (!get_ldev_if_state(device, D_ATTACHING))
1283                         continue;
1284                 dc = rcu_dereference(device->ldev->disk_conf);
1285
1286                 if (wo == WO_bdev_flush && !dc->disk_flushes)
1287                         wo = WO_drain_io;
1288                 if (wo == WO_drain_io && !dc->disk_drain)
1289                         wo = WO_none;
1290                 put_ldev(device);
1291         }
1292         rcu_read_unlock();
1293         connection->write_ordering = wo;
1294         if (pwo != connection->write_ordering || wo == WO_bdev_flush)
1295                 drbd_info(connection, "Method to ensure write ordering: %s\n", write_ordering_str[connection->write_ordering]);
1296 }
1297
1298 /**
1299  * drbd_submit_peer_request()
1300  * @device:     DRBD device.
1301  * @peer_req:   peer request
1302  * @rw:         flag field, see bio->bi_rw
1303  *
1304  * May spread the pages to multiple bios,
1305  * depending on bio_add_page restrictions.
1306  *
1307  * Returns 0 if all bios have been submitted,
1308  * -ENOMEM if we could not allocate enough bios,
1309  * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1310  *  single page to an empty bio (which should never happen and likely indicates
1311  *  that the lower level IO stack is in some way broken). This has been observed
1312  *  on certain Xen deployments.
1313  */
1314 /* TODO allocate from our own bio_set. */
1315 int drbd_submit_peer_request(struct drbd_device *device,
1316                              struct drbd_peer_request *peer_req,
1317                              const unsigned rw, const int fault_type)
1318 {
1319         struct bio *bios = NULL;
1320         struct bio *bio;
1321         struct page *page = peer_req->pages;
1322         sector_t sector = peer_req->i.sector;
1323         unsigned ds = peer_req->i.size;
1324         unsigned n_bios = 0;
1325         unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
1326         int err = -ENOMEM;
1327
1328         /* In most cases, we will only need one bio.  But in case the lower
1329          * level restrictions happen to be different at this offset on this
1330          * side than those of the sending peer, we may need to submit the
1331          * request in more than one bio.
1332          *
1333          * Plain bio_alloc is good enough here, this is no DRBD internally
1334          * generated bio, but a bio allocated on behalf of the peer.
1335          */
1336 next_bio:
1337         bio = bio_alloc(GFP_NOIO, nr_pages);
1338         if (!bio) {
1339                 drbd_err(device, "submit_ee: Allocation of a bio failed\n");
1340                 goto fail;
1341         }
1342         /* > peer_req->i.sector, unless this is the first bio */
1343         bio->bi_iter.bi_sector = sector;
1344         bio->bi_bdev = device->ldev->backing_bdev;
1345         bio->bi_rw = rw;
1346         bio->bi_private = peer_req;
1347         bio->bi_end_io = drbd_peer_request_endio;
1348
1349         bio->bi_next = bios;
1350         bios = bio;
1351         ++n_bios;
1352
1353         page_chain_for_each(page) {
1354                 unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1355                 if (!bio_add_page(bio, page, len, 0)) {
1356                         /* A single page must always be possible!
1357                          * But in case it fails anyways,
1358                          * we deal with it, and complain (below). */
1359                         if (bio->bi_vcnt == 0) {
1360                                 drbd_err(device,
1361                                         "bio_add_page failed for len=%u, "
1362                                         "bi_vcnt=0 (bi_sector=%llu)\n",
1363                                         len, (uint64_t)bio->bi_iter.bi_sector);
1364                                 err = -ENOSPC;
1365                                 goto fail;
1366                         }
1367                         goto next_bio;
1368                 }
1369                 ds -= len;
1370                 sector += len >> 9;
1371                 --nr_pages;
1372         }
1373         D_ASSERT(device, page == NULL);
1374         D_ASSERT(device, ds == 0);
1375
1376         atomic_set(&peer_req->pending_bios, n_bios);
1377         do {
1378                 bio = bios;
1379                 bios = bios->bi_next;
1380                 bio->bi_next = NULL;
1381
1382                 drbd_generic_make_request(device, fault_type, bio);
1383         } while (bios);
1384         return 0;
1385
1386 fail:
1387         while (bios) {
1388                 bio = bios;
1389                 bios = bios->bi_next;
1390                 bio_put(bio);
1391         }
1392         return err;
1393 }
1394
1395 static void drbd_remove_epoch_entry_interval(struct drbd_device *device,
1396                                              struct drbd_peer_request *peer_req)
1397 {
1398         struct drbd_interval *i = &peer_req->i;
1399
1400         drbd_remove_interval(&device->write_requests, i);
1401         drbd_clear_interval(i);
1402
1403         /* Wake up any processes waiting for this peer request to complete.  */
1404         if (i->waiting)
1405                 wake_up(&device->misc_wait);
1406 }
1407
1408 static void conn_wait_active_ee_empty(struct drbd_connection *connection)
1409 {
1410         struct drbd_peer_device *peer_device;
1411         int vnr;
1412
1413         rcu_read_lock();
1414         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1415                 struct drbd_device *device = peer_device->device;
1416
1417                 kref_get(&device->kref);
1418                 rcu_read_unlock();
1419                 drbd_wait_ee_list_empty(device, &device->active_ee);
1420                 kref_put(&device->kref, drbd_destroy_device);
1421                 rcu_read_lock();
1422         }
1423         rcu_read_unlock();
1424 }
1425
1426 static struct drbd_peer_device *
1427 conn_peer_device(struct drbd_connection *connection, int volume_number)
1428 {
1429         return idr_find(&connection->peer_devices, volume_number);
1430 }
1431
1432 static int receive_Barrier(struct drbd_connection *connection, struct packet_info *pi)
1433 {
1434         int rv;
1435         struct p_barrier *p = pi->data;
1436         struct drbd_epoch *epoch;
1437
1438         /* FIXME these are unacked on connection,
1439          * not a specific (peer)device.
1440          */
1441         connection->current_epoch->barrier_nr = p->barrier;
1442         connection->current_epoch->connection = connection;
1443         rv = drbd_may_finish_epoch(connection, connection->current_epoch, EV_GOT_BARRIER_NR);
1444
1445         /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1446          * the activity log, which means it would not be resynced in case the
1447          * R_PRIMARY crashes now.
1448          * Therefore we must send the barrier_ack after the barrier request was
1449          * completed. */
1450         switch (connection->write_ordering) {
1451         case WO_none:
1452                 if (rv == FE_RECYCLED)
1453                         return 0;
1454
1455                 /* receiver context, in the writeout path of the other node.
1456                  * avoid potential distributed deadlock */
1457                 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1458                 if (epoch)
1459                         break;
1460                 else
1461                         drbd_warn(connection, "Allocation of an epoch failed, slowing down\n");
1462                         /* Fall through */
1463
1464         case WO_bdev_flush:
1465         case WO_drain_io:
1466                 conn_wait_active_ee_empty(connection);
1467                 drbd_flush(connection);
1468
1469                 if (atomic_read(&connection->current_epoch->epoch_size)) {
1470                         epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1471                         if (epoch)
1472                                 break;
1473                 }
1474
1475                 return 0;
1476         default:
1477                 drbd_err(connection, "Strangeness in connection->write_ordering %d\n", connection->write_ordering);
1478                 return -EIO;
1479         }
1480
1481         epoch->flags = 0;
1482         atomic_set(&epoch->epoch_size, 0);
1483         atomic_set(&epoch->active, 0);
1484
1485         spin_lock(&connection->epoch_lock);
1486         if (atomic_read(&connection->current_epoch->epoch_size)) {
1487                 list_add(&epoch->list, &connection->current_epoch->list);
1488                 connection->current_epoch = epoch;
1489                 connection->epochs++;
1490         } else {
1491                 /* The current_epoch got recycled while we allocated this one... */
1492                 kfree(epoch);
1493         }
1494         spin_unlock(&connection->epoch_lock);
1495
1496         return 0;
1497 }
1498
1499 /* used from receive_RSDataReply (recv_resync_read)
1500  * and from receive_Data */
1501 static struct drbd_peer_request *
1502 read_in_block(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
1503               int data_size) __must_hold(local)
1504 {
1505         struct drbd_device *device = peer_device->device;
1506         const sector_t capacity = drbd_get_capacity(device->this_bdev);
1507         struct drbd_peer_request *peer_req;
1508         struct page *page;
1509         int dgs, ds, err;
1510         void *dig_in = peer_device->connection->int_dig_in;
1511         void *dig_vv = peer_device->connection->int_dig_vv;
1512         unsigned long *data;
1513
1514         dgs = 0;
1515         if (peer_device->connection->peer_integrity_tfm) {
1516                 dgs = crypto_hash_digestsize(peer_device->connection->peer_integrity_tfm);
1517                 /*
1518                  * FIXME: Receive the incoming digest into the receive buffer
1519                  *        here, together with its struct p_data?
1520                  */
1521                 err = drbd_recv_all_warn(peer_device->connection, dig_in, dgs);
1522                 if (err)
1523                         return NULL;
1524                 data_size -= dgs;
1525         }
1526
1527         if (!expect(IS_ALIGNED(data_size, 512)))
1528                 return NULL;
1529         if (!expect(data_size <= DRBD_MAX_BIO_SIZE))
1530                 return NULL;
1531
1532         /* even though we trust out peer,
1533          * we sometimes have to double check. */
1534         if (sector + (data_size>>9) > capacity) {
1535                 drbd_err(device, "request from peer beyond end of local disk: "
1536                         "capacity: %llus < sector: %llus + size: %u\n",
1537                         (unsigned long long)capacity,
1538                         (unsigned long long)sector, data_size);
1539                 return NULL;
1540         }
1541
1542         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1543          * "criss-cross" setup, that might cause write-out on some other DRBD,
1544          * which in turn might block on the other node at this very place.  */
1545         peer_req = drbd_alloc_peer_req(peer_device, id, sector, data_size, GFP_NOIO);
1546         if (!peer_req)
1547                 return NULL;
1548
1549         if (!data_size)
1550                 return peer_req;
1551
1552         ds = data_size;
1553         page = peer_req->pages;
1554         page_chain_for_each(page) {
1555                 unsigned len = min_t(int, ds, PAGE_SIZE);
1556                 data = kmap(page);
1557                 err = drbd_recv_all_warn(peer_device->connection, data, len);
1558                 if (drbd_insert_fault(device, DRBD_FAULT_RECEIVE)) {
1559                         drbd_err(device, "Fault injection: Corrupting data on receive\n");
1560                         data[0] = data[0] ^ (unsigned long)-1;
1561                 }
1562                 kunmap(page);
1563                 if (err) {
1564                         drbd_free_peer_req(device, peer_req);
1565                         return NULL;
1566                 }
1567                 ds -= len;
1568         }
1569
1570         if (dgs) {
1571                 drbd_csum_ee(peer_device->connection->peer_integrity_tfm, peer_req, dig_vv);
1572                 if (memcmp(dig_in, dig_vv, dgs)) {
1573                         drbd_err(device, "Digest integrity check FAILED: %llus +%u\n",
1574                                 (unsigned long long)sector, data_size);
1575                         drbd_free_peer_req(device, peer_req);
1576                         return NULL;
1577                 }
1578         }
1579         device->recv_cnt += data_size>>9;
1580         return peer_req;
1581 }
1582
1583 /* drbd_drain_block() just takes a data block
1584  * out of the socket input buffer, and discards it.
1585  */
1586 static int drbd_drain_block(struct drbd_peer_device *peer_device, int data_size)
1587 {
1588         struct page *page;
1589         int err = 0;
1590         void *data;
1591
1592         if (!data_size)
1593                 return 0;
1594
1595         page = drbd_alloc_pages(peer_device, 1, 1);
1596
1597         data = kmap(page);
1598         while (data_size) {
1599                 unsigned int len = min_t(int, data_size, PAGE_SIZE);
1600
1601                 err = drbd_recv_all_warn(peer_device->connection, data, len);
1602                 if (err)
1603                         break;
1604                 data_size -= len;
1605         }
1606         kunmap(page);
1607         drbd_free_pages(peer_device->device, page, 0);
1608         return err;
1609 }
1610
1611 static int recv_dless_read(struct drbd_peer_device *peer_device, struct drbd_request *req,
1612                            sector_t sector, int data_size)
1613 {
1614         struct bio_vec bvec;
1615         struct bvec_iter iter;
1616         struct bio *bio;
1617         int dgs, err, expect;
1618         void *dig_in = peer_device->connection->int_dig_in;
1619         void *dig_vv = peer_device->connection->int_dig_vv;
1620
1621         dgs = 0;
1622         if (peer_device->connection->peer_integrity_tfm) {
1623                 dgs = crypto_hash_digestsize(peer_device->connection->peer_integrity_tfm);
1624                 err = drbd_recv_all_warn(peer_device->connection, dig_in, dgs);
1625                 if (err)
1626                         return err;
1627                 data_size -= dgs;
1628         }
1629
1630         /* optimistically update recv_cnt.  if receiving fails below,
1631          * we disconnect anyways, and counters will be reset. */
1632         peer_device->device->recv_cnt += data_size>>9;
1633
1634         bio = req->master_bio;
1635         D_ASSERT(peer_device->device, sector == bio->bi_iter.bi_sector);
1636
1637         bio_for_each_segment(bvec, bio, iter) {
1638                 void *mapped = kmap(bvec.bv_page) + bvec.bv_offset;
1639                 expect = min_t(int, data_size, bvec.bv_len);
1640                 err = drbd_recv_all_warn(peer_device->connection, mapped, expect);
1641                 kunmap(bvec.bv_page);
1642                 if (err)
1643                         return err;
1644                 data_size -= expect;
1645         }
1646
1647         if (dgs) {
1648                 drbd_csum_bio(peer_device->connection->peer_integrity_tfm, bio, dig_vv);
1649                 if (memcmp(dig_in, dig_vv, dgs)) {
1650                         drbd_err(peer_device, "Digest integrity check FAILED. Broken NICs?\n");
1651                         return -EINVAL;
1652                 }
1653         }
1654
1655         D_ASSERT(peer_device->device, data_size == 0);
1656         return 0;
1657 }
1658
1659 /*
1660  * e_end_resync_block() is called in asender context via
1661  * drbd_finish_peer_reqs().
1662  */
1663 static int e_end_resync_block(struct drbd_work *w, int unused)
1664 {
1665         struct drbd_peer_request *peer_req =
1666                 container_of(w, struct drbd_peer_request, w);
1667         struct drbd_peer_device *peer_device = peer_req->peer_device;
1668         struct drbd_device *device = peer_device->device;
1669         sector_t sector = peer_req->i.sector;
1670         int err;
1671
1672         D_ASSERT(device, drbd_interval_empty(&peer_req->i));
1673
1674         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1675                 drbd_set_in_sync(device, sector, peer_req->i.size);
1676                 err = drbd_send_ack(peer_device, P_RS_WRITE_ACK, peer_req);
1677         } else {
1678                 /* Record failure to sync */
1679                 drbd_rs_failed_io(device, sector, peer_req->i.size);
1680
1681                 err  = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
1682         }
1683         dec_unacked(device);
1684
1685         return err;
1686 }
1687
1688 static int recv_resync_read(struct drbd_peer_device *peer_device, sector_t sector,
1689                             int data_size) __releases(local)
1690 {
1691         struct drbd_device *device = peer_device->device;
1692         struct drbd_peer_request *peer_req;
1693
1694         peer_req = read_in_block(peer_device, ID_SYNCER, sector, data_size);
1695         if (!peer_req)
1696                 goto fail;
1697
1698         dec_rs_pending(device);
1699
1700         inc_unacked(device);
1701         /* corresponding dec_unacked() in e_end_resync_block()
1702          * respective _drbd_clear_done_ee */
1703
1704         peer_req->w.cb = e_end_resync_block;
1705
1706         spin_lock_irq(&device->resource->req_lock);
1707         list_add(&peer_req->w.list, &device->sync_ee);
1708         spin_unlock_irq(&device->resource->req_lock);
1709
1710         atomic_add(data_size >> 9, &device->rs_sect_ev);
1711         if (drbd_submit_peer_request(device, peer_req, WRITE, DRBD_FAULT_RS_WR) == 0)
1712                 return 0;
1713
1714         /* don't care for the reason here */
1715         drbd_err(device, "submit failed, triggering re-connect\n");
1716         spin_lock_irq(&device->resource->req_lock);
1717         list_del(&peer_req->w.list);
1718         spin_unlock_irq(&device->resource->req_lock);
1719
1720         drbd_free_peer_req(device, peer_req);
1721 fail:
1722         put_ldev(device);
1723         return -EIO;
1724 }
1725
1726 static struct drbd_request *
1727 find_request(struct drbd_device *device, struct rb_root *root, u64 id,
1728              sector_t sector, bool missing_ok, const char *func)
1729 {
1730         struct drbd_request *req;
1731
1732         /* Request object according to our peer */
1733         req = (struct drbd_request *)(unsigned long)id;
1734         if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
1735                 return req;
1736         if (!missing_ok) {
1737                 drbd_err(device, "%s: failed to find request 0x%lx, sector %llus\n", func,
1738                         (unsigned long)id, (unsigned long long)sector);
1739         }
1740         return NULL;
1741 }
1742
1743 static int receive_DataReply(struct drbd_connection *connection, struct packet_info *pi)
1744 {
1745         struct drbd_peer_device *peer_device;
1746         struct drbd_device *device;
1747         struct drbd_request *req;
1748         sector_t sector;
1749         int err;
1750         struct p_data *p = pi->data;
1751
1752         peer_device = conn_peer_device(connection, pi->vnr);
1753         if (!peer_device)
1754                 return -EIO;
1755         device = peer_device->device;
1756
1757         sector = be64_to_cpu(p->sector);
1758
1759         spin_lock_irq(&device->resource->req_lock);
1760         req = find_request(device, &device->read_requests, p->block_id, sector, false, __func__);
1761         spin_unlock_irq(&device->resource->req_lock);
1762         if (unlikely(!req))
1763                 return -EIO;
1764
1765         /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
1766          * special casing it there for the various failure cases.
1767          * still no race with drbd_fail_pending_reads */
1768         err = recv_dless_read(peer_device, req, sector, pi->size);
1769         if (!err)
1770                 req_mod(req, DATA_RECEIVED);
1771         /* else: nothing. handled from drbd_disconnect...
1772          * I don't think we may complete this just yet
1773          * in case we are "on-disconnect: freeze" */
1774
1775         return err;
1776 }
1777
1778 static int receive_RSDataReply(struct drbd_connection *connection, struct packet_info *pi)
1779 {
1780         struct drbd_peer_device *peer_device;
1781         struct drbd_device *device;
1782         sector_t sector;
1783         int err;
1784         struct p_data *p = pi->data;
1785
1786         peer_device = conn_peer_device(connection, pi->vnr);
1787         if (!peer_device)
1788                 return -EIO;
1789         device = peer_device->device;
1790
1791         sector = be64_to_cpu(p->sector);
1792         D_ASSERT(device, p->block_id == ID_SYNCER);
1793
1794         if (get_ldev(device)) {
1795                 /* data is submitted to disk within recv_resync_read.
1796                  * corresponding put_ldev done below on error,
1797                  * or in drbd_peer_request_endio. */
1798                 err = recv_resync_read(peer_device, sector, pi->size);
1799         } else {
1800                 if (__ratelimit(&drbd_ratelimit_state))
1801                         drbd_err(device, "Can not write resync data to local disk.\n");
1802
1803                 err = drbd_drain_block(peer_device, pi->size);
1804
1805                 drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
1806         }
1807
1808         atomic_add(pi->size >> 9, &device->rs_sect_in);
1809
1810         return err;
1811 }
1812
1813 static void restart_conflicting_writes(struct drbd_device *device,
1814                                        sector_t sector, int size)
1815 {
1816         struct drbd_interval *i;
1817         struct drbd_request *req;
1818
1819         drbd_for_each_overlap(i, &device->write_requests, sector, size) {
1820                 if (!i->local)
1821                         continue;
1822                 req = container_of(i, struct drbd_request, i);
1823                 if (req->rq_state & RQ_LOCAL_PENDING ||
1824                     !(req->rq_state & RQ_POSTPONED))
1825                         continue;
1826                 /* as it is RQ_POSTPONED, this will cause it to
1827                  * be queued on the retry workqueue. */
1828                 __req_mod(req, CONFLICT_RESOLVED, NULL);
1829         }
1830 }
1831
1832 /*
1833  * e_end_block() is called in asender context via drbd_finish_peer_reqs().
1834  */
1835 static int e_end_block(struct drbd_work *w, int cancel)
1836 {
1837         struct drbd_peer_request *peer_req =
1838                 container_of(w, struct drbd_peer_request, w);
1839         struct drbd_peer_device *peer_device = peer_req->peer_device;
1840         struct drbd_device *device = peer_device->device;
1841         sector_t sector = peer_req->i.sector;
1842         int err = 0, pcmd;
1843
1844         if (peer_req->flags & EE_SEND_WRITE_ACK) {
1845                 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1846                         pcmd = (device->state.conn >= C_SYNC_SOURCE &&
1847                                 device->state.conn <= C_PAUSED_SYNC_T &&
1848                                 peer_req->flags & EE_MAY_SET_IN_SYNC) ?
1849                                 P_RS_WRITE_ACK : P_WRITE_ACK;
1850                         err = drbd_send_ack(peer_device, pcmd, peer_req);
1851                         if (pcmd == P_RS_WRITE_ACK)
1852                                 drbd_set_in_sync(device, sector, peer_req->i.size);
1853                 } else {
1854                         err = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
1855                         /* we expect it to be marked out of sync anyways...
1856                          * maybe assert this?  */
1857                 }
1858                 dec_unacked(device);
1859         }
1860         /* we delete from the conflict detection hash _after_ we sent out the
1861          * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
1862         if (peer_req->flags & EE_IN_INTERVAL_TREE) {
1863                 spin_lock_irq(&device->resource->req_lock);
1864                 D_ASSERT(device, !drbd_interval_empty(&peer_req->i));
1865                 drbd_remove_epoch_entry_interval(device, peer_req);
1866                 if (peer_req->flags & EE_RESTART_REQUESTS)
1867                         restart_conflicting_writes(device, sector, peer_req->i.size);
1868                 spin_unlock_irq(&device->resource->req_lock);
1869         } else
1870                 D_ASSERT(device, drbd_interval_empty(&peer_req->i));
1871
1872         drbd_may_finish_epoch(first_peer_device(device)->connection, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1873
1874         return err;
1875 }
1876
1877 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
1878 {
1879         struct drbd_peer_request *peer_req =
1880                 container_of(w, struct drbd_peer_request, w);
1881         struct drbd_peer_device *peer_device = peer_req->peer_device;
1882         int err;
1883
1884         err = drbd_send_ack(peer_device, ack, peer_req);
1885         dec_unacked(peer_device->device);
1886
1887         return err;
1888 }
1889
1890 static int e_send_superseded(struct drbd_work *w, int unused)
1891 {
1892         return e_send_ack(w, P_SUPERSEDED);
1893 }
1894
1895 static int e_send_retry_write(struct drbd_work *w, int unused)
1896 {
1897         struct drbd_peer_request *peer_req =
1898                 container_of(w, struct drbd_peer_request, w);
1899         struct drbd_connection *connection = peer_req->peer_device->connection;
1900
1901         return e_send_ack(w, connection->agreed_pro_version >= 100 ?
1902                              P_RETRY_WRITE : P_SUPERSEDED);
1903 }
1904
1905 static bool seq_greater(u32 a, u32 b)
1906 {
1907         /*
1908          * We assume 32-bit wrap-around here.
1909          * For 24-bit wrap-around, we would have to shift:
1910          *  a <<= 8; b <<= 8;
1911          */
1912         return (s32)a - (s32)b > 0;
1913 }
1914
1915 static u32 seq_max(u32 a, u32 b)
1916 {
1917         return seq_greater(a, b) ? a : b;
1918 }
1919
1920 static void update_peer_seq(struct drbd_peer_device *peer_device, unsigned int peer_seq)
1921 {
1922         struct drbd_device *device = peer_device->device;
1923         unsigned int newest_peer_seq;
1924
1925         if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)) {
1926                 spin_lock(&device->peer_seq_lock);
1927                 newest_peer_seq = seq_max(device->peer_seq, peer_seq);
1928                 device->peer_seq = newest_peer_seq;
1929                 spin_unlock(&device->peer_seq_lock);
1930                 /* wake up only if we actually changed device->peer_seq */
1931                 if (peer_seq == newest_peer_seq)
1932                         wake_up(&device->seq_wait);
1933         }
1934 }
1935
1936 static inline int overlaps(sector_t s1, int l1, sector_t s2, int l2)
1937 {
1938         return !((s1 + (l1>>9) <= s2) || (s1 >= s2 + (l2>>9)));
1939 }
1940
1941 /* maybe change sync_ee into interval trees as well? */
1942 static bool overlapping_resync_write(struct drbd_device *device, struct drbd_peer_request *peer_req)
1943 {
1944         struct drbd_peer_request *rs_req;
1945         bool rv = 0;
1946
1947         spin_lock_irq(&device->resource->req_lock);
1948         list_for_each_entry(rs_req, &device->sync_ee, w.list) {
1949                 if (overlaps(peer_req->i.sector, peer_req->i.size,
1950                              rs_req->i.sector, rs_req->i.size)) {
1951                         rv = 1;
1952                         break;
1953                 }
1954         }
1955         spin_unlock_irq(&device->resource->req_lock);
1956
1957         return rv;
1958 }
1959
1960 /* Called from receive_Data.
1961  * Synchronize packets on sock with packets on msock.
1962  *
1963  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1964  * packet traveling on msock, they are still processed in the order they have
1965  * been sent.
1966  *
1967  * Note: we don't care for Ack packets overtaking P_DATA packets.
1968  *
1969  * In case packet_seq is larger than device->peer_seq number, there are
1970  * outstanding packets on the msock. We wait for them to arrive.
1971  * In case we are the logically next packet, we update device->peer_seq
1972  * ourselves. Correctly handles 32bit wrap around.
1973  *
1974  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1975  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1976  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1977  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1978  *
1979  * returns 0 if we may process the packet,
1980  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1981 static int wait_for_and_update_peer_seq(struct drbd_peer_device *peer_device, const u32 peer_seq)
1982 {
1983         struct drbd_device *device = peer_device->device;
1984         DEFINE_WAIT(wait);
1985         long timeout;
1986         int ret = 0, tp;
1987
1988         if (!test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags))
1989                 return 0;
1990
1991         spin_lock(&device->peer_seq_lock);
1992         for (;;) {
1993                 if (!seq_greater(peer_seq - 1, device->peer_seq)) {
1994                         device->peer_seq = seq_max(device->peer_seq, peer_seq);
1995                         break;
1996                 }
1997
1998                 if (signal_pending(current)) {
1999                         ret = -ERESTARTSYS;
2000                         break;
2001                 }
2002
2003                 rcu_read_lock();
2004                 tp = rcu_dereference(first_peer_device(device)->connection->net_conf)->two_primaries;
2005                 rcu_read_unlock();
2006
2007                 if (!tp)
2008                         break;
2009
2010                 /* Only need to wait if two_primaries is enabled */
2011                 prepare_to_wait(&device->seq_wait, &wait, TASK_INTERRUPTIBLE);
2012                 spin_unlock(&device->peer_seq_lock);
2013                 rcu_read_lock();
2014                 timeout = rcu_dereference(peer_device->connection->net_conf)->ping_timeo*HZ/10;
2015                 rcu_read_unlock();
2016                 timeout = schedule_timeout(timeout);
2017                 spin_lock(&device->peer_seq_lock);
2018                 if (!timeout) {
2019                         ret = -ETIMEDOUT;
2020                         drbd_err(device, "Timed out waiting for missing ack packets; disconnecting\n");
2021                         break;
2022                 }
2023         }
2024         spin_unlock(&device->peer_seq_lock);
2025         finish_wait(&device->seq_wait, &wait);
2026         return ret;
2027 }
2028
2029 /* see also bio_flags_to_wire()
2030  * DRBD_REQ_*, because we need to semantically map the flags to data packet
2031  * flags and back. We may replicate to other kernel versions. */
2032 static unsigned long wire_flags_to_bio(struct drbd_device *device, u32 dpf)
2033 {
2034         return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
2035                 (dpf & DP_FUA ? REQ_FUA : 0) |
2036                 (dpf & DP_FLUSH ? REQ_FLUSH : 0) |
2037                 (dpf & DP_DISCARD ? REQ_DISCARD : 0);
2038 }
2039
2040 static void fail_postponed_requests(struct drbd_device *device, sector_t sector,
2041                                     unsigned int size)
2042 {
2043         struct drbd_interval *i;
2044
2045     repeat:
2046         drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2047                 struct drbd_request *req;
2048                 struct bio_and_error m;
2049
2050                 if (!i->local)
2051                         continue;
2052                 req = container_of(i, struct drbd_request, i);
2053                 if (!(req->rq_state & RQ_POSTPONED))
2054                         continue;
2055                 req->rq_state &= ~RQ_POSTPONED;
2056                 __req_mod(req, NEG_ACKED, &m);
2057                 spin_unlock_irq(&device->resource->req_lock);
2058                 if (m.bio)
2059                         complete_master_bio(device, &m);
2060                 spin_lock_irq(&device->resource->req_lock);
2061                 goto repeat;
2062         }
2063 }
2064
2065 static int handle_write_conflicts(struct drbd_device *device,
2066                                   struct drbd_peer_request *peer_req)
2067 {
2068         struct drbd_connection *connection = first_peer_device(device)->connection;
2069         bool resolve_conflicts = test_bit(RESOLVE_CONFLICTS, &connection->flags);
2070         sector_t sector = peer_req->i.sector;
2071         const unsigned int size = peer_req->i.size;
2072         struct drbd_interval *i;
2073         bool equal;
2074         int err;
2075
2076         /*
2077          * Inserting the peer request into the write_requests tree will prevent
2078          * new conflicting local requests from being added.
2079          */
2080         drbd_insert_interval(&device->write_requests, &peer_req->i);
2081
2082     repeat:
2083         drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2084                 if (i == &peer_req->i)
2085                         continue;
2086
2087                 if (!i->local) {
2088                         /*
2089                          * Our peer has sent a conflicting remote request; this
2090                          * should not happen in a two-node setup.  Wait for the
2091                          * earlier peer request to complete.
2092                          */
2093                         err = drbd_wait_misc(device, i);
2094                         if (err)
2095                                 goto out;
2096                         goto repeat;
2097                 }
2098
2099                 equal = i->sector == sector && i->size == size;
2100                 if (resolve_conflicts) {
2101                         /*
2102                          * If the peer request is fully contained within the
2103                          * overlapping request, it can be considered overwritten
2104                          * and thus superseded; otherwise, it will be retried
2105                          * once all overlapping requests have completed.
2106                          */
2107                         bool superseded = i->sector <= sector && i->sector +
2108                                        (i->size >> 9) >= sector + (size >> 9);
2109
2110                         if (!equal)
2111                                 drbd_alert(device, "Concurrent writes detected: "
2112                                                "local=%llus +%u, remote=%llus +%u, "
2113                                                "assuming %s came first\n",
2114                                           (unsigned long long)i->sector, i->size,
2115                                           (unsigned long long)sector, size,
2116                                           superseded ? "local" : "remote");
2117
2118                         inc_unacked(device);
2119                         peer_req->w.cb = superseded ? e_send_superseded :
2120                                                    e_send_retry_write;
2121                         list_add_tail(&peer_req->w.list, &device->done_ee);
2122                         wake_asender(first_peer_device(device)->connection);
2123
2124                         err = -ENOENT;
2125                         goto out;
2126                 } else {
2127                         struct drbd_request *req =
2128                                 container_of(i, struct drbd_request, i);
2129
2130                         if (!equal)
2131                                 drbd_alert(device, "Concurrent writes detected: "
2132                                                "local=%llus +%u, remote=%llus +%u\n",
2133                                           (unsigned long long)i->sector, i->size,
2134                                           (unsigned long long)sector, size);
2135
2136                         if (req->rq_state & RQ_LOCAL_PENDING ||
2137                             !(req->rq_state & RQ_POSTPONED)) {
2138                                 /*
2139                                  * Wait for the node with the discard flag to
2140                                  * decide if this request has been superseded
2141                                  * or needs to be retried.
2142                                  * Requests that have been superseded will
2143                                  * disappear from the write_requests tree.
2144                                  *
2145                                  * In addition, wait for the conflicting
2146                                  * request to finish locally before submitting
2147                                  * the conflicting peer request.
2148                                  */
2149                                 err = drbd_wait_misc(device, &req->i);
2150                                 if (err) {
2151                                         _conn_request_state(first_peer_device(device)->connection,
2152                                                             NS(conn, C_TIMEOUT),
2153                                                             CS_HARD);
2154                                         fail_postponed_requests(device, sector, size);
2155                                         goto out;
2156                                 }
2157                                 goto repeat;
2158                         }
2159                         /*
2160                          * Remember to restart the conflicting requests after
2161                          * the new peer request has completed.
2162                          */
2163                         peer_req->flags |= EE_RESTART_REQUESTS;
2164                 }
2165         }
2166         err = 0;
2167
2168     out:
2169         if (err)
2170                 drbd_remove_epoch_entry_interval(device, peer_req);
2171         return err;
2172 }
2173
2174 /* mirrored write */
2175 static int receive_Data(struct drbd_connection *connection, struct packet_info *pi)
2176 {
2177         struct drbd_peer_device *peer_device;
2178         struct drbd_device *device;
2179         sector_t sector;
2180         struct drbd_peer_request *peer_req;
2181         struct p_data *p = pi->data;
2182         u32 peer_seq = be32_to_cpu(p->seq_num);
2183         int rw = WRITE;
2184         u32 dp_flags;
2185         int err, tp;
2186
2187         peer_device = conn_peer_device(connection, pi->vnr);
2188         if (!peer_device)
2189                 return -EIO;
2190         device = peer_device->device;
2191
2192         if (!get_ldev(device)) {
2193                 int err2;
2194
2195                 err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2196                 drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
2197                 atomic_inc(&connection->current_epoch->epoch_size);
2198                 err2 = drbd_drain_block(peer_device, pi->size);
2199                 if (!err)
2200                         err = err2;
2201                 return err;
2202         }
2203
2204         /*
2205          * Corresponding put_ldev done either below (on various errors), or in
2206          * drbd_peer_request_endio, if we successfully submit the data at the
2207          * end of this function.
2208          */
2209
2210         sector = be64_to_cpu(p->sector);
2211         peer_req = read_in_block(peer_device, p->block_id, sector, pi->size);
2212         if (!peer_req) {
2213                 put_ldev(device);
2214                 return -EIO;
2215         }
2216
2217         peer_req->w.cb = e_end_block;
2218
2219         dp_flags = be32_to_cpu(p->dp_flags);
2220         rw |= wire_flags_to_bio(device, dp_flags);
2221         if (peer_req->pages == NULL) {
2222                 D_ASSERT(device, peer_req->i.size == 0);
2223                 D_ASSERT(device, dp_flags & DP_FLUSH);
2224         }
2225
2226         if (dp_flags & DP_MAY_SET_IN_SYNC)
2227                 peer_req->flags |= EE_MAY_SET_IN_SYNC;
2228
2229         spin_lock(&connection->epoch_lock);
2230         peer_req->epoch = connection->current_epoch;
2231         atomic_inc(&peer_req->epoch->epoch_size);
2232         atomic_inc(&peer_req->epoch->active);
2233         spin_unlock(&connection->epoch_lock);
2234
2235         rcu_read_lock();
2236         tp = rcu_dereference(peer_device->connection->net_conf)->two_primaries;
2237         rcu_read_unlock();
2238         if (tp) {
2239                 peer_req->flags |= EE_IN_INTERVAL_TREE;
2240                 err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2241                 if (err)
2242                         goto out_interrupted;
2243                 spin_lock_irq(&device->resource->req_lock);
2244                 err = handle_write_conflicts(device, peer_req);
2245                 if (err) {
2246                         spin_unlock_irq(&device->resource->req_lock);
2247                         if (err == -ENOENT) {
2248                                 put_ldev(device);
2249                                 return 0;
2250                         }
2251                         goto out_interrupted;
2252                 }
2253         } else {
2254                 update_peer_seq(peer_device, peer_seq);
2255                 spin_lock_irq(&device->resource->req_lock);
2256         }
2257         list_add(&peer_req->w.list, &device->active_ee);
2258         spin_unlock_irq(&device->resource->req_lock);
2259
2260         if (device->state.conn == C_SYNC_TARGET)
2261                 wait_event(device->ee_wait, !overlapping_resync_write(device, peer_req));
2262
2263         if (peer_device->connection->agreed_pro_version < 100) {
2264                 rcu_read_lock();
2265                 switch (rcu_dereference(peer_device->connection->net_conf)->wire_protocol) {
2266                 case DRBD_PROT_C:
2267                         dp_flags |= DP_SEND_WRITE_ACK;
2268                         break;
2269                 case DRBD_PROT_B:
2270                         dp_flags |= DP_SEND_RECEIVE_ACK;
2271                         break;
2272                 }
2273                 rcu_read_unlock();
2274         }
2275
2276         if (dp_flags & DP_SEND_WRITE_ACK) {
2277                 peer_req->flags |= EE_SEND_WRITE_ACK;
2278                 inc_unacked(device);
2279                 /* corresponding dec_unacked() in e_end_block()
2280                  * respective _drbd_clear_done_ee */
2281         }
2282
2283         if (dp_flags & DP_SEND_RECEIVE_ACK) {
2284                 /* I really don't like it that the receiver thread
2285                  * sends on the msock, but anyways */
2286                 drbd_send_ack(first_peer_device(device), P_RECV_ACK, peer_req);
2287         }
2288
2289         if (device->state.pdsk < D_INCONSISTENT) {
2290                 /* In case we have the only disk of the cluster, */
2291                 drbd_set_out_of_sync(device, peer_req->i.sector, peer_req->i.size);
2292                 peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2293                 peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2294                 drbd_al_begin_io(device, &peer_req->i, true);
2295         }
2296
2297         err = drbd_submit_peer_request(device, peer_req, rw, DRBD_FAULT_DT_WR);
2298         if (!err)
2299                 return 0;
2300
2301         /* don't care for the reason here */
2302         drbd_err(device, "submit failed, triggering re-connect\n");
2303         spin_lock_irq(&device->resource->req_lock);
2304         list_del(&peer_req->w.list);
2305         drbd_remove_epoch_entry_interval(device, peer_req);
2306         spin_unlock_irq(&device->resource->req_lock);
2307         if (peer_req->flags & EE_CALL_AL_COMPLETE_IO)
2308                 drbd_al_complete_io(device, &peer_req->i);
2309
2310 out_interrupted:
2311         drbd_may_finish_epoch(connection, peer_req->epoch, EV_PUT + EV_CLEANUP);
2312         put_ldev(device);
2313         drbd_free_peer_req(device, peer_req);
2314         return err;
2315 }
2316
2317 /* We may throttle resync, if the lower device seems to be busy,
2318  * and current sync rate is above c_min_rate.
2319  *
2320  * To decide whether or not the lower device is busy, we use a scheme similar
2321  * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2322  * (more than 64 sectors) of activity we cannot account for with our own resync
2323  * activity, it obviously is "busy".
2324  *
2325  * The current sync rate used here uses only the most recent two step marks,
2326  * to have a short time average so we can react faster.
2327  */
2328 int drbd_rs_should_slow_down(struct drbd_device *device, sector_t sector)
2329 {
2330         struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk;
2331         unsigned long db, dt, dbdt;
2332         struct lc_element *tmp;
2333         int curr_events;
2334         int throttle = 0;
2335         unsigned int c_min_rate;
2336
2337         rcu_read_lock();
2338         c_min_rate = rcu_dereference(device->ldev->disk_conf)->c_min_rate;
2339         rcu_read_unlock();
2340
2341         /* feature disabled? */
2342         if (c_min_rate == 0)
2343                 return 0;
2344
2345         spin_lock_irq(&device->al_lock);
2346         tmp = lc_find(device->resync, BM_SECT_TO_EXT(sector));
2347         if (tmp) {
2348                 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2349                 if (test_bit(BME_PRIORITY, &bm_ext->flags)) {
2350                         spin_unlock_irq(&device->al_lock);
2351                         return 0;
2352                 }
2353                 /* Do not slow down if app IO is already waiting for this extent */
2354         }
2355         spin_unlock_irq(&device->al_lock);
2356
2357         curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2358                       (int)part_stat_read(&disk->part0, sectors[1]) -
2359                         atomic_read(&device->rs_sect_ev);
2360
2361         if (!device->rs_last_events || curr_events - device->rs_last_events > 64) {
2362                 unsigned long rs_left;
2363                 int i;
2364
2365                 device->rs_last_events = curr_events;
2366
2367                 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2368                  * approx. */
2369                 i = (device->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2370
2371                 if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
2372                         rs_left = device->ov_left;
2373                 else
2374                         rs_left = drbd_bm_total_weight(device) - device->rs_failed;
2375
2376                 dt = ((long)jiffies - (long)device->rs_mark_time[i]) / HZ;
2377                 if (!dt)
2378                         dt++;
2379                 db = device->rs_mark_left[i] - rs_left;
2380                 dbdt = Bit2KB(db/dt);
2381
2382                 if (dbdt > c_min_rate)
2383                         throttle = 1;
2384         }
2385         return throttle;
2386 }
2387
2388
2389 static int receive_DataRequest(struct drbd_connection *connection, struct packet_info *pi)
2390 {
2391         struct drbd_peer_device *peer_device;
2392         struct drbd_device *device;
2393         sector_t sector;
2394         sector_t capacity;
2395         struct drbd_peer_request *peer_req;
2396         struct digest_info *di = NULL;
2397         int size, verb;
2398         unsigned int fault_type;
2399         struct p_block_req *p = pi->data;
2400
2401         peer_device = conn_peer_device(connection, pi->vnr);
2402         if (!peer_device)
2403                 return -EIO;
2404         device = peer_device->device;
2405         capacity = drbd_get_capacity(device->this_bdev);
2406
2407         sector = be64_to_cpu(p->sector);
2408         size   = be32_to_cpu(p->blksize);
2409
2410         if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2411                 drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2412                                 (unsigned long long)sector, size);
2413                 return -EINVAL;
2414         }
2415         if (sector + (size>>9) > capacity) {
2416                 drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2417                                 (unsigned long long)sector, size);
2418                 return -EINVAL;
2419         }
2420
2421         if (!get_ldev_if_state(device, D_UP_TO_DATE)) {
2422                 verb = 1;
2423                 switch (pi->cmd) {
2424                 case P_DATA_REQUEST:
2425                         drbd_send_ack_rp(peer_device, P_NEG_DREPLY, p);
2426                         break;
2427                 case P_RS_DATA_REQUEST:
2428                 case P_CSUM_RS_REQUEST:
2429                 case P_OV_REQUEST:
2430                         drbd_send_ack_rp(peer_device, P_NEG_RS_DREPLY , p);
2431                         break;
2432                 case P_OV_REPLY:
2433                         verb = 0;
2434                         dec_rs_pending(device);
2435                         drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size, ID_IN_SYNC);
2436                         break;
2437                 default:
2438                         BUG();
2439                 }
2440                 if (verb && __ratelimit(&drbd_ratelimit_state))
2441                         drbd_err(device, "Can not satisfy peer's read request, "
2442                             "no local data.\n");
2443
2444                 /* drain possibly payload */
2445                 return drbd_drain_block(peer_device, pi->size);
2446         }
2447
2448         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2449          * "criss-cross" setup, that might cause write-out on some other DRBD,
2450          * which in turn might block on the other node at this very place.  */
2451         peer_req = drbd_alloc_peer_req(peer_device, p->block_id, sector, size, GFP_NOIO);
2452         if (!peer_req) {
2453                 put_ldev(device);
2454                 return -ENOMEM;
2455         }
2456
2457         switch (pi->cmd) {
2458         case P_DATA_REQUEST:
2459                 peer_req->w.cb = w_e_end_data_req;
2460                 fault_type = DRBD_FAULT_DT_RD;
2461                 /* application IO, don't drbd_rs_begin_io */
2462                 goto submit;
2463
2464         case P_RS_DATA_REQUEST:
2465                 peer_req->w.cb = w_e_end_rsdata_req;
2466                 fault_type = DRBD_FAULT_RS_RD;
2467                 /* used in the sector offset progress display */
2468                 device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2469                 break;
2470
2471         case P_OV_REPLY:
2472         case P_CSUM_RS_REQUEST:
2473                 fault_type = DRBD_FAULT_RS_RD;
2474                 di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2475                 if (!di)
2476                         goto out_free_e;
2477
2478                 di->digest_size = pi->size;
2479                 di->digest = (((char *)di)+sizeof(struct digest_info));
2480
2481                 peer_req->digest = di;
2482                 peer_req->flags |= EE_HAS_DIGEST;
2483
2484                 if (drbd_recv_all(peer_device->connection, di->digest, pi->size))
2485                         goto out_free_e;
2486
2487                 if (pi->cmd == P_CSUM_RS_REQUEST) {
2488                         D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
2489                         peer_req->w.cb = w_e_end_csum_rs_req;
2490                         /* used in the sector offset progress display */
2491                         device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2492                 } else if (pi->cmd == P_OV_REPLY) {
2493                         /* track progress, we may need to throttle */
2494                         atomic_add(size >> 9, &device->rs_sect_in);
2495                         peer_req->w.cb = w_e_end_ov_reply;
2496                         dec_rs_pending(device);
2497                         /* drbd_rs_begin_io done when we sent this request,
2498                          * but accounting still needs to be done. */
2499                         goto submit_for_resync;
2500                 }
2501                 break;
2502
2503         case P_OV_REQUEST:
2504                 if (device->ov_start_sector == ~(sector_t)0 &&
2505                     peer_device->connection->agreed_pro_version >= 90) {
2506                         unsigned long now = jiffies;
2507                         int i;
2508                         device->ov_start_sector = sector;
2509                         device->ov_position = sector;
2510                         device->ov_left = drbd_bm_bits(device) - BM_SECT_TO_BIT(sector);
2511                         device->rs_total = device->ov_left;
2512                         for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2513                                 device->rs_mark_left[i] = device->ov_left;
2514                                 device->rs_mark_time[i] = now;
2515                         }
2516                         drbd_info(device, "Online Verify start sector: %llu\n",
2517                                         (unsigned long long)sector);
2518                 }
2519                 peer_req->w.cb = w_e_end_ov_req;
2520                 fault_type = DRBD_FAULT_RS_RD;
2521                 break;
2522
2523         default:
2524                 BUG();
2525         }
2526
2527         /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2528          * wrt the receiver, but it is not as straightforward as it may seem.
2529          * Various places in the resync start and stop logic assume resync
2530          * requests are processed in order, requeuing this on the worker thread
2531          * introduces a bunch of new code for synchronization between threads.
2532          *
2533          * Unlimited throttling before drbd_rs_begin_io may stall the resync
2534          * "forever", throttling after drbd_rs_begin_io will lock that extent
2535          * for application writes for the same time.  For now, just throttle
2536          * here, where the rest of the code expects the receiver to sleep for
2537          * a while, anyways.
2538          */
2539
2540         /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2541          * this defers syncer requests for some time, before letting at least
2542          * on request through.  The resync controller on the receiving side
2543          * will adapt to the incoming rate accordingly.
2544          *
2545          * We cannot throttle here if remote is Primary/SyncTarget:
2546          * we would also throttle its application reads.
2547          * In that case, throttling is done on the SyncTarget only.
2548          */
2549         if (device->state.peer != R_PRIMARY && drbd_rs_should_slow_down(device, sector))
2550                 schedule_timeout_uninterruptible(HZ/10);
2551         if (drbd_rs_begin_io(device, sector))
2552                 goto out_free_e;
2553
2554 submit_for_resync:
2555         atomic_add(size >> 9, &device->rs_sect_ev);
2556
2557 submit:
2558         inc_unacked(device);
2559         spin_lock_irq(&device->resource->req_lock);
2560         list_add_tail(&peer_req->w.list, &device->read_ee);
2561         spin_unlock_irq(&device->resource->req_lock);
2562
2563         if (drbd_submit_peer_request(device, peer_req, READ, fault_type) == 0)
2564                 return 0;
2565
2566         /* don't care for the reason here */
2567         drbd_err(device, "submit failed, triggering re-connect\n");
2568         spin_lock_irq(&device->resource->req_lock);
2569         list_del(&peer_req->w.list);
2570         spin_unlock_irq(&device->resource->req_lock);
2571         /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2572
2573 out_free_e:
2574         put_ldev(device);
2575         drbd_free_peer_req(device, peer_req);
2576         return -EIO;
2577 }
2578
2579 /**
2580  * drbd_asb_recover_0p  -  Recover after split-brain with no remaining primaries
2581  */
2582 static int drbd_asb_recover_0p(struct drbd_peer_device *peer_device) __must_hold(local)
2583 {
2584         struct drbd_device *device = peer_device->device;
2585         int self, peer, rv = -100;
2586         unsigned long ch_self, ch_peer;
2587         enum drbd_after_sb_p after_sb_0p;
2588
2589         self = device->ldev->md.uuid[UI_BITMAP] & 1;
2590         peer = device->p_uuid[UI_BITMAP] & 1;
2591
2592         ch_peer = device->p_uuid[UI_SIZE];
2593         ch_self = device->comm_bm_set;
2594
2595         rcu_read_lock();
2596         after_sb_0p = rcu_dereference(peer_device->connection->net_conf)->after_sb_0p;
2597         rcu_read_unlock();
2598         switch (after_sb_0p) {
2599         case ASB_CONSENSUS:
2600         case ASB_DISCARD_SECONDARY:
2601         case ASB_CALL_HELPER:
2602         case ASB_VIOLENTLY:
2603                 drbd_err(device, "Configuration error.\n");
2604                 break;
2605         case ASB_DISCONNECT:
2606                 break;
2607         case ASB_DISCARD_YOUNGER_PRI:
2608                 if (self == 0 && peer == 1) {
2609                         rv = -1;
2610                         break;
2611                 }
2612                 if (self == 1 && peer == 0) {
2613                         rv =  1;
2614                         break;
2615                 }
2616                 /* Else fall through to one of the other strategies... */
2617         case ASB_DISCARD_OLDER_PRI:
2618                 if (self == 0 && peer == 1) {
2619                         rv = 1;
2620                         break;
2621                 }
2622                 if (self == 1 && peer == 0) {
2623                         rv = -1;
2624                         break;
2625                 }
2626                 /* Else fall through to one of the other strategies... */
2627                 drbd_warn(device, "Discard younger/older primary did not find a decision\n"
2628                      "Using discard-least-changes instead\n");
2629         case ASB_DISCARD_ZERO_CHG:
2630                 if (ch_peer == 0 && ch_self == 0) {
2631                         rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
2632                                 ? -1 : 1;
2633                         break;
2634                 } else {
2635                         if (ch_peer == 0) { rv =  1; break; }
2636                         if (ch_self == 0) { rv = -1; break; }
2637                 }
2638                 if (after_sb_0p == ASB_DISCARD_ZERO_CHG)
2639                         break;
2640         case ASB_DISCARD_LEAST_CHG:
2641                 if      (ch_self < ch_peer)
2642                         rv = -1;
2643                 else if (ch_self > ch_peer)
2644                         rv =  1;
2645                 else /* ( ch_self == ch_peer ) */
2646                      /* Well, then use something else. */
2647                         rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
2648                                 ? -1 : 1;
2649                 break;
2650         case ASB_DISCARD_LOCAL:
2651                 rv = -1;
2652                 break;
2653         case ASB_DISCARD_REMOTE:
2654                 rv =  1;
2655         }
2656
2657         return rv;
2658 }
2659
2660 /**
2661  * drbd_asb_recover_1p  -  Recover after split-brain with one remaining primary
2662  */
2663 static int drbd_asb_recover_1p(struct drbd_peer_device *peer_device) __must_hold(local)
2664 {
2665         struct drbd_device *device = peer_device->device;
2666         int hg, rv = -100;
2667         enum drbd_after_sb_p after_sb_1p;
2668
2669         rcu_read_lock();
2670         after_sb_1p = rcu_dereference(peer_device->connection->net_conf)->after_sb_1p;
2671         rcu_read_unlock();
2672         switch (after_sb_1p) {
2673         case ASB_DISCARD_YOUNGER_PRI:
2674         case ASB_DISCARD_OLDER_PRI:
2675         case ASB_DISCARD_LEAST_CHG:
2676         case ASB_DISCARD_LOCAL:
2677         case ASB_DISCARD_REMOTE:
2678         case ASB_DISCARD_ZERO_CHG:
2679                 drbd_err(device, "Configuration error.\n");
2680                 break;
2681         case ASB_DISCONNECT:
2682                 break;
2683         case ASB_CONSENSUS:
2684                 hg = drbd_asb_recover_0p(peer_device);
2685                 if (hg == -1 && device->state.role == R_SECONDARY)
2686                         rv = hg;
2687                 if (hg == 1  && device->state.role == R_PRIMARY)
2688                         rv = hg;
2689                 break;
2690         case ASB_VIOLENTLY:
2691                 rv = drbd_asb_recover_0p(peer_device);
2692                 break;
2693         case ASB_DISCARD_SECONDARY:
2694                 return device->state.role == R_PRIMARY ? 1 : -1;
2695         case ASB_CALL_HELPER:
2696                 hg = drbd_asb_recover_0p(peer_device);
2697                 if (hg == -1 && device->state.role == R_PRIMARY) {
2698                         enum drbd_state_rv rv2;
2699
2700                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2701                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2702                           * we do not need to wait for the after state change work either. */
2703                         rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
2704                         if (rv2 != SS_SUCCESS) {
2705                                 drbd_khelper(device, "pri-lost-after-sb");
2706                         } else {
2707                                 drbd_warn(device, "Successfully gave up primary role.\n");
2708                                 rv = hg;
2709                         }
2710                 } else
2711                         rv = hg;
2712         }
2713
2714         return rv;
2715 }
2716
2717 /**
2718  * drbd_asb_recover_2p  -  Recover after split-brain with two remaining primaries
2719  */
2720 static int drbd_asb_recover_2p(struct drbd_peer_device *peer_device) __must_hold(local)
2721 {
2722         struct drbd_device *device = peer_device->device;
2723         int hg, rv = -100;
2724         enum drbd_after_sb_p after_sb_2p;
2725
2726         rcu_read_lock();
2727         after_sb_2p = rcu_dereference(peer_device->connection->net_conf)->after_sb_2p;
2728         rcu_read_unlock();
2729         switch (after_sb_2p) {
2730         case ASB_DISCARD_YOUNGER_PRI:
2731         case ASB_DISCARD_OLDER_PRI:
2732         case ASB_DISCARD_LEAST_CHG:
2733         case ASB_DISCARD_LOCAL:
2734         case ASB_DISCARD_REMOTE:
2735         case ASB_CONSENSUS:
2736         case ASB_DISCARD_SECONDARY:
2737         case ASB_DISCARD_ZERO_CHG:
2738                 drbd_err(device, "Configuration error.\n");
2739                 break;
2740         case ASB_VIOLENTLY:
2741                 rv = drbd_asb_recover_0p(peer_device);
2742                 break;
2743         case ASB_DISCONNECT:
2744                 break;
2745         case ASB_CALL_HELPER:
2746                 hg = drbd_asb_recover_0p(peer_device);
2747                 if (hg == -1) {
2748                         enum drbd_state_rv rv2;
2749
2750                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2751                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2752                           * we do not need to wait for the after state change work either. */
2753                         rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
2754                         if (rv2 != SS_SUCCESS) {
2755                                 drbd_khelper(device, "pri-lost-after-sb");
2756                         } else {
2757                                 drbd_warn(device, "Successfully gave up primary role.\n");
2758                                 rv = hg;
2759                         }
2760                 } else
2761                         rv = hg;
2762         }
2763
2764         return rv;
2765 }
2766
2767 static void drbd_uuid_dump(struct drbd_device *device, char *text, u64 *uuid,
2768                            u64 bits, u64 flags)
2769 {
2770         if (!uuid) {
2771                 drbd_info(device, "%s uuid info vanished while I was looking!\n", text);
2772                 return;
2773         }
2774         drbd_info(device, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2775              text,
2776              (unsigned long long)uuid[UI_CURRENT],
2777              (unsigned long long)uuid[UI_BITMAP],
2778              (unsigned long long)uuid[UI_HISTORY_START],
2779              (unsigned long long)uuid[UI_HISTORY_END],
2780              (unsigned long long)bits,
2781              (unsigned long long)flags);
2782 }
2783
2784 /*
2785   100   after split brain try auto recover
2786     2   C_SYNC_SOURCE set BitMap
2787     1   C_SYNC_SOURCE use BitMap
2788     0   no Sync
2789    -1   C_SYNC_TARGET use BitMap
2790    -2   C_SYNC_TARGET set BitMap
2791  -100   after split brain, disconnect
2792 -1000   unrelated data
2793 -1091   requires proto 91
2794 -1096   requires proto 96
2795  */
2796 static int drbd_uuid_compare(struct drbd_device *device, int *rule_nr) __must_hold(local)
2797 {
2798         u64 self, peer;
2799         int i, j;
2800
2801         self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2802         peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
2803
2804         *rule_nr = 10;
2805         if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2806                 return 0;
2807
2808         *rule_nr = 20;
2809         if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2810              peer != UUID_JUST_CREATED)
2811                 return -2;
2812
2813         *rule_nr = 30;
2814         if (self != UUID_JUST_CREATED &&
2815             (peer == UUID_JUST_CREATED || peer == (u64)0))
2816                 return 2;
2817
2818         if (self == peer) {
2819                 int rct, dc; /* roles at crash time */
2820
2821                 if (device->p_uuid[UI_BITMAP] == (u64)0 && device->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2822
2823                         if (first_peer_device(device)->connection->agreed_pro_version < 91)
2824                                 return -1091;
2825
2826                         if ((device->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2827                             (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2828                                 drbd_info(device, "was SyncSource, missed the resync finished event, corrected myself:\n");
2829                                 drbd_uuid_move_history(device);
2830                                 device->ldev->md.uuid[UI_HISTORY_START] = device->ldev->md.uuid[UI_BITMAP];
2831                                 device->ldev->md.uuid[UI_BITMAP] = 0;
2832
2833                                 drbd_uuid_dump(device, "self", device->ldev->md.uuid,
2834                                                device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
2835                                 *rule_nr = 34;
2836                         } else {
2837                                 drbd_info(device, "was SyncSource (peer failed to write sync_uuid)\n");
2838                                 *rule_nr = 36;
2839                         }
2840
2841                         return 1;
2842                 }
2843
2844                 if (device->ldev->md.uuid[UI_BITMAP] == (u64)0 && device->p_uuid[UI_BITMAP] != (u64)0) {
2845
2846                         if (first_peer_device(device)->connection->agreed_pro_version < 91)
2847                                 return -1091;
2848
2849                         if ((device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2850                             (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2851                                 drbd_info(device, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2852
2853                                 device->p_uuid[UI_HISTORY_START + 1] = device->p_uuid[UI_HISTORY_START];
2854                                 device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_BITMAP];
2855                                 device->p_uuid[UI_BITMAP] = 0UL;
2856
2857                                 drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
2858                                 *rule_nr = 35;
2859                         } else {
2860                                 drbd_info(device, "was SyncTarget (failed to write sync_uuid)\n");
2861                                 *rule_nr = 37;
2862                         }
2863
2864                         return -1;
2865                 }
2866
2867                 /* Common power [off|failure] */
2868                 rct = (test_bit(CRASHED_PRIMARY, &device->flags) ? 1 : 0) +
2869                         (device->p_uuid[UI_FLAGS] & 2);
2870                 /* lowest bit is set when we were primary,
2871                  * next bit (weight 2) is set when peer was primary */
2872                 *rule_nr = 40;
2873
2874                 switch (rct) {
2875                 case 0: /* !self_pri && !peer_pri */ return 0;
2876                 case 1: /*  self_pri && !peer_pri */ return 1;
2877                 case 2: /* !self_pri &&  peer_pri */ return -1;
2878                 case 3: /*  self_pri &&  peer_pri */
2879                         dc = test_bit(RESOLVE_CONFLICTS, &first_peer_device(device)->connection->flags);
2880                         return dc ? -1 : 1;
2881                 }
2882         }
2883
2884         *rule_nr = 50;
2885         peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
2886         if (self == peer)
2887                 return -1;
2888
2889         *rule_nr = 51;
2890         peer = device->p_uuid[UI_HISTORY_START] & ~((u64)1);
2891         if (self == peer) {
2892                 if (first_peer_device(device)->connection->agreed_pro_version < 96 ?
2893                     (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
2894                     (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
2895                     peer + UUID_NEW_BM_OFFSET == (device->p_uuid[UI_BITMAP] & ~((u64)1))) {
2896                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2897                            resync as sync source modifications of the peer's UUIDs. */
2898
2899                         if (first_peer_device(device)->connection->agreed_pro_version < 91)
2900                                 return -1091;
2901
2902                         device->p_uuid[UI_BITMAP] = device->p_uuid[UI_HISTORY_START];
2903                         device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_HISTORY_START + 1];
2904
2905                         drbd_info(device, "Lost last syncUUID packet, corrected:\n");
2906                         drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
2907
2908                         return -1;
2909                 }
2910         }
2911
2912         *rule_nr = 60;
2913         self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2914         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2915                 peer = device->p_uuid[i] & ~((u64)1);
2916                 if (self == peer)
2917                         return -2;
2918         }
2919
2920         *rule_nr = 70;
2921         self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2922         peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
2923         if (self == peer)
2924                 return 1;
2925
2926         *rule_nr = 71;
2927         self = device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2928         if (self == peer) {
2929                 if (first_peer_device(device)->connection->agreed_pro_version < 96 ?
2930                     (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
2931                     (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
2932                     self + UUID_NEW_BM_OFFSET == (device->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
2933                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2934                            resync as sync source modifications of our UUIDs. */
2935
2936                         if (first_peer_device(device)->connection->agreed_pro_version < 91)
2937                                 return -1091;
2938
2939                         __drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_HISTORY_START]);
2940                         __drbd_uuid_set(device, UI_HISTORY_START, device->ldev->md.uuid[UI_HISTORY_START + 1]);
2941
2942                         drbd_info(device, "Last syncUUID did not get through, corrected:\n");
2943                         drbd_uuid_dump(device, "self", device->ldev->md.uuid,
2944                                        device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
2945
2946                         return 1;
2947                 }
2948         }
2949
2950
2951         *rule_nr = 80;
2952         peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
2953         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2954                 self = device->ldev->md.uuid[i] & ~((u64)1);
2955                 if (self == peer)
2956                         return 2;
2957         }
2958
2959         *rule_nr = 90;
2960         self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2961         peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
2962         if (self == peer && self != ((u64)0))
2963                 return 100;
2964
2965         *rule_nr = 100;
2966         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2967                 self = device->ldev->md.uuid[i] & ~((u64)1);
2968                 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2969                         peer = device->p_uuid[j] & ~((u64)1);
2970                         if (self == peer)
2971                                 return -100;
2972                 }
2973         }
2974
2975         return -1000;
2976 }
2977
2978 /* drbd_sync_handshake() returns the new conn state on success, or
2979    CONN_MASK (-1) on failure.
2980  */
2981 static enum drbd_conns drbd_sync_handshake(struct drbd_peer_device *peer_device,
2982                                            enum drbd_role peer_role,
2983                                            enum drbd_disk_state peer_disk) __must_hold(local)
2984 {
2985         struct drbd_device *device = peer_device->device;
2986         enum drbd_conns rv = C_MASK;
2987         enum drbd_disk_state mydisk;
2988         struct net_conf *nc;
2989         int hg, rule_nr, rr_conflict, tentative;
2990
2991         mydisk = device->state.disk;
2992         if (mydisk == D_NEGOTIATING)
2993                 mydisk = device->new_state_tmp.disk;
2994
2995         drbd_info(device, "drbd_sync_handshake:\n");
2996
2997         spin_lock_irq(&device->ldev->md.uuid_lock);
2998         drbd_uuid_dump(device, "self", device->ldev->md.uuid, device->comm_bm_set, 0);
2999         drbd_uuid_dump(device, "peer", device->p_uuid,
3000                        device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3001
3002         hg = drbd_uuid_compare(device, &rule_nr);
3003         spin_unlock_irq(&device->ldev->md.uuid_lock);
3004
3005         drbd_info(device, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
3006
3007         if (hg == -1000) {
3008                 drbd_alert(device, "Unrelated data, aborting!\n");
3009                 return C_MASK;
3010         }
3011         if (hg < -1000) {
3012                 drbd_alert(device, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
3013                 return C_MASK;
3014         }
3015
3016         if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
3017             (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
3018                 int f = (hg == -100) || abs(hg) == 2;
3019                 hg = mydisk > D_INCONSISTENT ? 1 : -1;
3020                 if (f)
3021                         hg = hg*2;
3022                 drbd_info(device, "Becoming sync %s due to disk states.\n",
3023                      hg > 0 ? "source" : "target");
3024         }
3025
3026         if (abs(hg) == 100)
3027                 drbd_khelper(device, "initial-split-brain");
3028
3029         rcu_read_lock();
3030         nc = rcu_dereference(peer_device->connection->net_conf);
3031
3032         if (hg == 100 || (hg == -100 && nc->always_asbp)) {
3033                 int pcount = (device->state.role == R_PRIMARY)
3034                            + (peer_role == R_PRIMARY);
3035                 int forced = (hg == -100);
3036
3037                 switch (pcount) {
3038                 case 0:
3039                         hg = drbd_asb_recover_0p(peer_device);
3040                         break;
3041                 case 1:
3042                         hg = drbd_asb_recover_1p(peer_device);
3043                         break;
3044                 case 2:
3045                         hg = drbd_asb_recover_2p(peer_device);
3046                         break;
3047                 }
3048                 if (abs(hg) < 100) {
3049                         drbd_warn(device, "Split-Brain detected, %d primaries, "
3050                              "automatically solved. Sync from %s node\n",
3051                              pcount, (hg < 0) ? "peer" : "this");
3052                         if (forced) {
3053                                 drbd_warn(device, "Doing a full sync, since"
3054                                      " UUIDs where ambiguous.\n");
3055                                 hg = hg*2;
3056                         }
3057                 }
3058         }
3059
3060         if (hg == -100) {
3061                 if (test_bit(DISCARD_MY_DATA, &device->flags) && !(device->p_uuid[UI_FLAGS]&1))
3062                         hg = -1;
3063                 if (!test_bit(DISCARD_MY_DATA, &device->flags) && (device->p_uuid[UI_FLAGS]&1))
3064                         hg = 1;
3065
3066                 if (abs(hg) < 100)
3067                         drbd_warn(device, "Split-Brain detected, manually solved. "
3068                              "Sync from %s node\n",
3069                              (hg < 0) ? "peer" : "this");
3070         }
3071         rr_conflict = nc->rr_conflict;
3072         tentative = nc->tentative;
3073         rcu_read_unlock();
3074
3075         if (hg == -100) {
3076                 /* FIXME this log message is not correct if we end up here
3077                  * after an attempted attach on a diskless node.
3078                  * We just refuse to attach -- well, we drop the "connection"
3079                  * to that disk, in a way... */
3080                 drbd_alert(device, "Split-Brain detected but unresolved, dropping connection!\n");
3081                 drbd_khelper(device, "split-brain");
3082                 return C_MASK;
3083         }
3084
3085         if (hg > 0 && mydisk <= D_INCONSISTENT) {
3086                 drbd_err(device, "I shall become SyncSource, but I am inconsistent!\n");
3087                 return C_MASK;
3088         }
3089
3090         if (hg < 0 && /* by intention we do not use mydisk here. */
3091             device->state.role == R_PRIMARY && device->state.disk >= D_CONSISTENT) {
3092                 switch (rr_conflict) {
3093                 case ASB_CALL_HELPER:
3094                         drbd_khelper(device, "pri-lost");
3095                         /* fall through */
3096                 case ASB_DISCONNECT:
3097                         drbd_err(device, "I shall become SyncTarget, but I am primary!\n");
3098                         return C_MASK;
3099                 case ASB_VIOLENTLY:
3100                         drbd_warn(device, "Becoming SyncTarget, violating the stable-data"
3101                              "assumption\n");
3102                 }
3103         }
3104
3105         if (tentative || test_bit(CONN_DRY_RUN, &peer_device->connection->flags)) {
3106                 if (hg == 0)
3107                         drbd_info(device, "dry-run connect: No resync, would become Connected immediately.\n");
3108                 else
3109                         drbd_info(device, "dry-run connect: Would become %s, doing a %s resync.",
3110                                  drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
3111                                  abs(hg) >= 2 ? "full" : "bit-map based");
3112                 return C_MASK;
3113         }
3114
3115         if (abs(hg) >= 2) {
3116                 drbd_info(device, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
3117                 if (drbd_bitmap_io(device, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
3118                                         BM_LOCKED_SET_ALLOWED))
3119                         return C_MASK;
3120         }
3121
3122         if (hg > 0) { /* become sync source. */
3123                 rv = C_WF_BITMAP_S;
3124         } else if (hg < 0) { /* become sync target */
3125                 rv = C_WF_BITMAP_T;
3126         } else {
3127                 rv = C_CONNECTED;
3128                 if (drbd_bm_total_weight(device)) {
3129                         drbd_info(device, "No resync, but %lu bits in bitmap!\n",
3130                              drbd_bm_total_weight(device));
3131                 }
3132         }
3133
3134         return rv;
3135 }
3136
3137 static enum drbd_after_sb_p convert_after_sb(enum drbd_after_sb_p peer)
3138 {
3139         /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
3140         if (peer == ASB_DISCARD_REMOTE)
3141                 return ASB_DISCARD_LOCAL;
3142
3143         /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
3144         if (peer == ASB_DISCARD_LOCAL)
3145                 return ASB_DISCARD_REMOTE;
3146
3147         /* everything else is valid if they are equal on both sides. */
3148         return peer;
3149 }
3150
3151 static int receive_protocol(struct drbd_connection *connection, struct packet_info *pi)
3152 {
3153         struct p_protocol *p = pi->data;
3154         enum drbd_after_sb_p p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
3155         int p_proto, p_discard_my_data, p_two_primaries, cf;
3156         struct net_conf *nc, *old_net_conf, *new_net_conf = NULL;
3157         char integrity_alg[SHARED_SECRET_MAX] = "";
3158         struct crypto_hash *peer_integrity_tfm = NULL;
3159         void *int_dig_in = NULL, *int_dig_vv = NULL;
3160
3161         p_proto         = be32_to_cpu(p->protocol);
3162         p_after_sb_0p   = be32_to_cpu(p->after_sb_0p);
3163         p_after_sb_1p   = be32_to_cpu(p->after_sb_1p);
3164         p_after_sb_2p   = be32_to_cpu(p->after_sb_2p);
3165         p_two_primaries = be32_to_cpu(p->two_primaries);
3166         cf              = be32_to_cpu(p->conn_flags);
3167         p_discard_my_data = cf & CF_DISCARD_MY_DATA;
3168
3169         if (connection->agreed_pro_version >= 87) {
3170                 int err;
3171
3172                 if (pi->size > sizeof(integrity_alg))
3173                         return -EIO;
3174                 err = drbd_recv_all(connection, integrity_alg, pi->size);
3175                 if (err)
3176                         return err;
3177                 integrity_alg[SHARED_SECRET_MAX - 1] = 0;
3178         }
3179
3180         if (pi->cmd != P_PROTOCOL_UPDATE) {
3181                 clear_bit(CONN_DRY_RUN, &connection->flags);
3182
3183                 if (cf & CF_DRY_RUN)
3184                         set_bit(CONN_DRY_RUN, &connection->flags);
3185
3186                 rcu_read_lock();
3187                 nc = rcu_dereference(connection->net_conf);
3188
3189                 if (p_proto != nc->wire_protocol) {
3190                         drbd_err(connection, "incompatible %s settings\n", "protocol");
3191                         goto disconnect_rcu_unlock;
3192                 }
3193
3194                 if (convert_after_sb(p_after_sb_0p) != nc->after_sb_0p) {
3195                         drbd_err(connection, "incompatible %s settings\n", "after-sb-0pri");
3196                         goto disconnect_rcu_unlock;
3197                 }
3198
3199                 if (convert_after_sb(p_after_sb_1p) != nc->after_sb_1p) {
3200                         drbd_err(connection, "incompatible %s settings\n", "after-sb-1pri");
3201                         goto disconnect_rcu_unlock;
3202                 }
3203
3204                 if (convert_after_sb(p_after_sb_2p) != nc->after_sb_2p) {
3205                         drbd_err(connection, "incompatible %s settings\n", "after-sb-2pri");
3206                         goto disconnect_rcu_unlock;
3207                 }
3208
3209                 if (p_discard_my_data && nc->discard_my_data) {
3210                         drbd_err(connection, "incompatible %s settings\n", "discard-my-data");
3211                         goto disconnect_rcu_unlock;
3212                 }
3213
3214                 if (p_two_primaries != nc->two_primaries) {
3215                         drbd_err(connection, "incompatible %s settings\n", "allow-two-primaries");
3216                         goto disconnect_rcu_unlock;
3217                 }
3218
3219                 if (strcmp(integrity_alg, nc->integrity_alg)) {
3220                         drbd_err(connection, "incompatible %s settings\n", "data-integrity-alg");
3221                         goto disconnect_rcu_unlock;
3222                 }
3223
3224                 rcu_read_unlock();
3225         }
3226
3227         if (integrity_alg[0]) {
3228                 int hash_size;
3229
3230                 /*
3231                  * We can only change the peer data integrity algorithm
3232                  * here.  Changing our own data integrity algorithm
3233                  * requires that we send a P_PROTOCOL_UPDATE packet at
3234                  * the same time; otherwise, the peer has no way to
3235                  * tell between which packets the algorithm should
3236                  * change.
3237                  */
3238
3239                 peer_integrity_tfm = crypto_alloc_hash(integrity_alg, 0, CRYPTO_ALG_ASYNC);
3240                 if (!peer_integrity_tfm) {
3241                         drbd_err(connection, "peer data-integrity-alg %s not supported\n",
3242                                  integrity_alg);
3243                         goto disconnect;
3244                 }
3245
3246                 hash_size = crypto_hash_digestsize(peer_integrity_tfm);
3247                 int_dig_in = kmalloc(hash_size, GFP_KERNEL);
3248                 int_dig_vv = kmalloc(hash_size, GFP_KERNEL);
3249                 if (!(int_dig_in && int_dig_vv)) {
3250                         drbd_err(connection, "Allocation of buffers for data integrity checking failed\n");
3251                         goto disconnect;
3252                 }
3253         }
3254
3255         new_net_conf = kmalloc(sizeof(struct net_conf), GFP_KERNEL);
3256         if (!new_net_conf) {
3257                 drbd_err(connection, "Allocation of new net_conf failed\n");
3258                 goto disconnect;
3259         }
3260
3261         mutex_lock(&connection->data.mutex);
3262         mutex_lock(&connection->resource->conf_update);
3263         old_net_conf = connection->net_conf;
3264         *new_net_conf = *old_net_conf;
3265
3266         new_net_conf->wire_protocol = p_proto;
3267         new_net_conf->after_sb_0p = convert_after_sb(p_after_sb_0p);
3268         new_net_conf->after_sb_1p = convert_after_sb(p_after_sb_1p);
3269         new_net_conf->after_sb_2p = convert_after_sb(p_after_sb_2p);
3270         new_net_conf->two_primaries = p_two_primaries;
3271
3272         rcu_assign_pointer(connection->net_conf, new_net_conf);
3273         mutex_unlock(&connection->resource->conf_update);
3274         mutex_unlock(&connection->data.mutex);
3275
3276         crypto_free_hash(connection->peer_integrity_tfm);
3277         kfree(connection->int_dig_in);
3278         kfree(connection->int_dig_vv);
3279         connection->peer_integrity_tfm = peer_integrity_tfm;
3280         connection->int_dig_in = int_dig_in;
3281         connection->int_dig_vv = int_dig_vv;
3282
3283         if (strcmp(old_net_conf->integrity_alg, integrity_alg))
3284                 drbd_info(connection, "peer data-integrity-alg: %s\n",
3285                           integrity_alg[0] ? integrity_alg : "(none)");
3286
3287         synchronize_rcu();
3288         kfree(old_net_conf);
3289         return 0;
3290
3291 disconnect_rcu_unlock:
3292         rcu_read_unlock();
3293 disconnect:
3294         crypto_free_hash(peer_integrity_tfm);
3295         kfree(int_dig_in);
3296         kfree(int_dig_vv);
3297         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
3298         return -EIO;
3299 }
3300
3301 /* helper function
3302  * input: alg name, feature name
3303  * return: NULL (alg name was "")
3304  *         ERR_PTR(error) if something goes wrong
3305  *         or the crypto hash ptr, if it worked out ok. */
3306 static
3307 struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_device *device,
3308                 const char *alg, const char *name)
3309 {
3310         struct crypto_hash *tfm;
3311
3312         if (!alg[0])
3313                 return NULL;
3314
3315         tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
3316         if (IS_ERR(tfm)) {
3317                 drbd_err(device, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3318                         alg, name, PTR_ERR(tfm));
3319                 return tfm;
3320         }
3321         return tfm;
3322 }
3323
3324 static int ignore_remaining_packet(struct drbd_connection *connection, struct packet_info *pi)
3325 {
3326         void *buffer = connection->data.rbuf;
3327         int size = pi->size;
3328
3329         while (size) {
3330                 int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3331                 s = drbd_recv(connection, buffer, s);
3332                 if (s <= 0) {
3333                         if (s < 0)
3334                                 return s;
3335                         break;
3336                 }
3337                 size -= s;
3338         }
3339         if (size)
3340                 return -EIO;
3341         return 0;
3342 }
3343
3344 /*
3345  * config_unknown_volume  -  device configuration command for unknown volume
3346  *
3347  * When a device is added to an existing connection, the node on which the
3348  * device is added first will send configuration commands to its peer but the
3349  * peer will not know about the device yet.  It will warn and ignore these
3350  * commands.  Once the device is added on the second node, the second node will
3351  * send the same device configuration commands, but in the other direction.
3352  *
3353  * (We can also end up here if drbd is misconfigured.)
3354  */
3355 static int config_unknown_volume(struct drbd_connection *connection, struct packet_info *pi)
3356 {
3357         drbd_warn(connection, "%s packet received for volume %u, which is not configured locally\n",
3358                   cmdname(pi->cmd), pi->vnr);
3359         return ignore_remaining_packet(connection, pi);
3360 }
3361
3362 static int receive_SyncParam(struct drbd_connection *connection, struct packet_info *pi)
3363 {
3364         struct drbd_peer_device *peer_device;
3365         struct drbd_device *device;
3366         struct p_rs_param_95 *p;
3367         unsigned int header_size, data_size, exp_max_sz;
3368         struct crypto_hash *verify_tfm = NULL;
3369         struct crypto_hash *csums_tfm = NULL;
3370         struct net_conf *old_net_conf, *new_net_conf = NULL;
3371         struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL;
3372         const int apv = connection->agreed_pro_version;
3373         struct fifo_buffer *old_plan = NULL, *new_plan = NULL;
3374         int fifo_size = 0;
3375         int err;
3376
3377         peer_device = conn_peer_device(connection, pi->vnr);
3378         if (!peer_device)
3379                 return config_unknown_volume(connection, pi);
3380         device = peer_device->device;
3381
3382         exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
3383                     : apv == 88 ? sizeof(struct p_rs_param)
3384                                         + SHARED_SECRET_MAX
3385                     : apv <= 94 ? sizeof(struct p_rs_param_89)
3386                     : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3387
3388         if (pi->size > exp_max_sz) {
3389                 drbd_err(device, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3390                     pi->size, exp_max_sz);
3391                 return -EIO;
3392         }
3393
3394         if (apv <= 88) {
3395                 header_size = sizeof(struct p_rs_param);
3396                 data_size = pi->size - header_size;
3397         } else if (apv <= 94) {
3398                 header_size = sizeof(struct p_rs_param_89);
3399                 data_size = pi->size - header_size;
3400                 D_ASSERT(device, data_size == 0);
3401         } else {
3402                 header_size = sizeof(struct p_rs_param_95);
3403                 data_size = pi->size - header_size;
3404                 D_ASSERT(device, data_size == 0);
3405         }
3406
3407         /* initialize verify_alg and csums_alg */
3408         p = pi->data;
3409         memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3410
3411         err = drbd_recv_all(peer_device->connection, p, header_size);
3412         if (err)
3413                 return err;
3414
3415         mutex_lock(&connection->resource->conf_update);
3416         old_net_conf = peer_device->connection->net_conf;
3417         if (get_ldev(device)) {
3418                 new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3419                 if (!new_disk_conf) {
3420                         put_ldev(device);
3421                         mutex_unlock(&connection->resource->conf_update);
3422                         drbd_err(device, "Allocation of new disk_conf failed\n");
3423                         return -ENOMEM;
3424                 }
3425
3426                 old_disk_conf = device->ldev->disk_conf;
3427                 *new_disk_conf = *old_disk_conf;
3428
3429                 new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate);
3430         }
3431
3432         if (apv >= 88) {
3433                 if (apv == 88) {
3434                         if (data_size > SHARED_SECRET_MAX || data_size == 0) {
3435                                 drbd_err(device, "verify-alg of wrong size, "
3436                                         "peer wants %u, accepting only up to %u byte\n",
3437                                         data_size, SHARED_SECRET_MAX);
3438                                 err = -EIO;
3439                                 goto reconnect;
3440                         }
3441
3442                         err = drbd_recv_all(peer_device->connection, p->verify_alg, data_size);
3443                         if (err)
3444                                 goto reconnect;
3445                         /* we expect NUL terminated string */
3446                         /* but just in case someone tries to be evil */
3447                         D_ASSERT(device, p->verify_alg[data_size-1] == 0);
3448                         p->verify_alg[data_size-1] = 0;
3449
3450                 } else /* apv >= 89 */ {
3451                         /* we still expect NUL terminated strings */
3452                         /* but just in case someone tries to be evil */
3453                         D_ASSERT(device, p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3454                         D_ASSERT(device, p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3455                         p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3456                         p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3457                 }
3458
3459                 if (strcmp(old_net_conf->verify_alg, p->verify_alg)) {
3460                         if (device->state.conn == C_WF_REPORT_PARAMS) {
3461                                 drbd_err(device, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3462                                     old_net_conf->verify_alg, p->verify_alg);
3463                                 goto disconnect;
3464                         }
3465                         verify_tfm = drbd_crypto_alloc_digest_safe(device,
3466                                         p->verify_alg, "verify-alg");
3467                         if (IS_ERR(verify_tfm)) {
3468                                 verify_tfm = NULL;
3469                                 goto disconnect;
3470                         }
3471                 }
3472
3473                 if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) {
3474                         if (device->state.conn == C_WF_REPORT_PARAMS) {
3475                                 drbd_err(device, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3476                                     old_net_conf->csums_alg, p->csums_alg);
3477                                 goto disconnect;
3478                         }
3479                         csums_tfm = drbd_crypto_alloc_digest_safe(device,
3480                                         p->csums_alg, "csums-alg");
3481                         if (IS_ERR(csums_tfm)) {
3482                                 csums_tfm = NULL;
3483                                 goto disconnect;
3484                         }
3485                 }
3486
3487                 if (apv > 94 && new_disk_conf) {
3488                         new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3489                         new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target);
3490                         new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target);
3491                         new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate);
3492
3493                         fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ;
3494                         if (fifo_size != device->rs_plan_s->size) {
3495                                 new_plan = fifo_alloc(fifo_size);
3496                                 if (!new_plan) {
3497                                         drbd_err(device, "kmalloc of fifo_buffer failed");
3498                                         put_ldev(device);
3499                                         goto disconnect;
3500                                 }
3501                         }
3502                 }
3503
3504                 if (verify_tfm || csums_tfm) {
3505                         new_net_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL);
3506                         if (!new_net_conf) {
3507                                 drbd_err(device, "Allocation of new net_conf failed\n");
3508                                 goto disconnect;
3509                         }
3510
3511                         *new_net_conf = *old_net_conf;
3512
3513                         if (verify_tfm) {
3514                                 strcpy(new_net_conf->verify_alg, p->verify_alg);
3515                                 new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3516                                 crypto_free_hash(peer_device->connection->verify_tfm);
3517                                 peer_device->connection->verify_tfm = verify_tfm;
3518                                 drbd_info(device, "using verify-alg: \"%s\"\n", p->verify_alg);
3519                         }
3520                         if (csums_tfm) {
3521                                 strcpy(new_net_conf->csums_alg, p->csums_alg);
3522                                 new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3523                                 crypto_free_hash(peer_device->connection->csums_tfm);
3524                                 peer_device->connection->csums_tfm = csums_tfm;
3525                                 drbd_info(device, "using csums-alg: \"%s\"\n", p->csums_alg);
3526                         }
3527                         rcu_assign_pointer(connection->net_conf, new_net_conf);
3528                 }
3529         }
3530
3531         if (new_disk_conf) {
3532                 rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
3533                 put_ldev(device);
3534         }
3535
3536         if (new_plan) {
3537                 old_plan = device->rs_plan_s;
3538                 rcu_assign_pointer(device->rs_plan_s, new_plan);
3539         }
3540
3541         mutex_unlock(&connection->resource->conf_update);
3542         synchronize_rcu();
3543         if (new_net_conf)
3544                 kfree(old_net_conf);
3545         kfree(old_disk_conf);
3546         kfree(old_plan);
3547
3548         return 0;
3549
3550 reconnect:
3551         if (new_disk_conf) {
3552                 put_ldev(device);
3553                 kfree(new_disk_conf);
3554         }
3555         mutex_unlock(&connection->resource->conf_update);
3556         return -EIO;
3557
3558 disconnect:
3559         kfree(new_plan);
3560         if (new_disk_conf) {
3561                 put_ldev(device);
3562                 kfree(new_disk_conf);
3563         }
3564         mutex_unlock(&connection->resource->conf_update);
3565         /* just for completeness: actually not needed,
3566          * as this is not reached if csums_tfm was ok. */
3567         crypto_free_hash(csums_tfm);
3568         /* but free the verify_tfm again, if csums_tfm did not work out */
3569         crypto_free_hash(verify_tfm);
3570         conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3571         return -EIO;
3572 }
3573
3574 /* warn if the arguments differ by more than 12.5% */
3575 static void warn_if_differ_considerably(struct drbd_device *device,
3576         const char *s, sector_t a, sector_t b)
3577 {
3578         sector_t d;
3579         if (a == 0 || b == 0)
3580                 return;
3581         d = (a > b) ? (a - b) : (b - a);
3582         if (d > (a>>3) || d > (b>>3))
3583                 drbd_warn(device, "Considerable difference in %s: %llus vs. %llus\n", s,
3584                      (unsigned long long)a, (unsigned long long)b);
3585 }
3586
3587 static int receive_sizes(struct drbd_connection *connection, struct packet_info *pi)
3588 {
3589         struct drbd_peer_device *peer_device;
3590         struct drbd_device *device;
3591         struct p_sizes *p = pi->data;
3592         enum determine_dev_size dd = DS_UNCHANGED;
3593         sector_t p_size, p_usize, my_usize;
3594         int ldsc = 0; /* local disk size changed */
3595         enum dds_flags ddsf;
3596
3597         peer_device = conn_peer_device(connection, pi->vnr);
3598         if (!peer_device)
3599                 return config_unknown_volume(connection, pi);
3600         device = peer_device->device;
3601
3602         p_size = be64_to_cpu(p->d_size);
3603         p_usize = be64_to_cpu(p->u_size);
3604
3605         /* just store the peer's disk size for now.
3606          * we still need to figure out whether we accept that. */
3607         device->p_size = p_size;
3608
3609         if (get_ldev(device)) {
3610                 rcu_read_lock();
3611                 my_usize = rcu_dereference(device->ldev->disk_conf)->disk_size;
3612                 rcu_read_unlock();
3613
3614                 warn_if_differ_considerably(device, "lower level device sizes",
3615                            p_size, drbd_get_max_capacity(device->ldev));
3616                 warn_if_differ_considerably(device, "user requested size",
3617                                             p_usize, my_usize);
3618
3619                 /* if this is the first connect, or an otherwise expected
3620                  * param exchange, choose the minimum */
3621                 if (device->state.conn == C_WF_REPORT_PARAMS)
3622                         p_usize = min_not_zero(my_usize, p_usize);
3623
3624                 /* Never shrink a device with usable data during connect.
3625                    But allow online shrinking if we are connected. */
3626                 if (drbd_new_dev_size(device, device->ldev, p_usize, 0) <
3627                     drbd_get_capacity(device->this_bdev) &&
3628                     device->state.disk >= D_OUTDATED &&
3629                     device->state.conn < C_CONNECTED) {
3630                         drbd_err(device, "The peer's disk size is too small!\n");
3631                         conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3632                         put_ldev(device);
3633                         return -EIO;
3634                 }
3635
3636                 if (my_usize != p_usize) {
3637                         struct disk_conf *old_disk_conf, *new_disk_conf = NULL;
3638
3639                         new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3640                         if (!new_disk_conf) {
3641                                 drbd_err(device, "Allocation of new disk_conf failed\n");
3642                                 put_ldev(device);
3643                                 return -ENOMEM;
3644                         }
3645
3646                         mutex_lock(&connection->resource->conf_update);
3647                         old_disk_conf = device->ldev->disk_conf;
3648                         *new_disk_conf = *old_disk_conf;
3649                         new_disk_conf->disk_size = p_usize;
3650
3651                         rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
3652                         mutex_unlock(&connection->resource->conf_update);
3653                         synchronize_rcu();
3654                         kfree(old_disk_conf);
3655
3656                         drbd_info(device, "Peer sets u_size to %lu sectors\n",
3657                                  (unsigned long)my_usize);
3658                 }
3659
3660                 put_ldev(device);
3661         }
3662
3663         ddsf = be16_to_cpu(p->dds_flags);
3664         if (get_ldev(device)) {
3665                 dd = drbd_determine_dev_size(device, ddsf, NULL);
3666                 put_ldev(device);
3667                 if (dd == DS_ERROR)
3668                         return -EIO;
3669                 drbd_md_sync(device);
3670         } else {
3671                 /* I am diskless, need to accept the peer's size. */
3672                 drbd_set_my_capacity(device, p_size);
3673         }
3674
3675         device->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3676         drbd_reconsider_max_bio_size(device);
3677
3678         if (get_ldev(device)) {
3679                 if (device->ldev->known_size != drbd_get_capacity(device->ldev->backing_bdev)) {
3680                         device->ldev->known_size = drbd_get_capacity(device->ldev->backing_bdev);
3681                         ldsc = 1;
3682                 }
3683
3684                 put_ldev(device);
3685         }
3686
3687         if (device->state.conn > C_WF_REPORT_PARAMS) {
3688                 if (be64_to_cpu(p->c_size) !=
3689                     drbd_get_capacity(device->this_bdev) || ldsc) {
3690                         /* we have different sizes, probably peer
3691                          * needs to know my new size... */
3692                         drbd_send_sizes(peer_device, 0, ddsf);
3693                 }
3694                 if (test_and_clear_bit(RESIZE_PENDING, &device->flags) ||
3695                     (dd == DS_GREW && device->state.conn == C_CONNECTED)) {
3696                         if (device->state.pdsk >= D_INCONSISTENT &&
3697                             device->state.disk >= D_INCONSISTENT) {
3698                                 if (ddsf & DDSF_NO_RESYNC)
3699                                         drbd_info(device, "Resync of new storage suppressed with --assume-clean\n");
3700                                 else
3701                                         resync_after_online_grow(device);
3702                         } else
3703                                 set_bit(RESYNC_AFTER_NEG, &device->flags);
3704                 }
3705         }
3706
3707         return 0;
3708 }
3709
3710 static int receive_uuids(struct drbd_connection *connection, struct packet_info *pi)
3711 {
3712         struct drbd_peer_device *peer_device;
3713         struct drbd_device *device;
3714         struct p_uuids *p = pi->data;
3715         u64 *p_uuid;
3716         int i, updated_uuids = 0;
3717
3718         peer_device = conn_peer_device(connection, pi->vnr);
3719         if (!peer_device)
3720                 return config_unknown_volume(connection, pi);
3721         device = peer_device->device;
3722
3723         p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3724         if (!p_uuid) {
3725                 drbd_err(device, "kmalloc of p_uuid failed\n");
3726                 return false;
3727         }
3728
3729         for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3730                 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3731
3732         kfree(device->p_uuid);
3733         device->p_uuid = p_uuid;
3734
3735         if (device->state.conn < C_CONNECTED &&
3736             device->state.disk < D_INCONSISTENT &&
3737             device->state.role == R_PRIMARY &&
3738             (device->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3739                 drbd_err(device, "Can only connect to data with current UUID=%016llX\n",
3740                     (unsigned long long)device->ed_uuid);
3741                 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3742                 return -EIO;
3743         }
3744
3745         if (get_ldev(device)) {
3746                 int skip_initial_sync =
3747                         device->state.conn == C_CONNECTED &&
3748                         peer_device->connection->agreed_pro_version >= 90 &&
3749                         device->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3750                         (p_uuid[UI_FLAGS] & 8);
3751                 if (skip_initial_sync) {
3752                         drbd_info(device, "Accepted new current UUID, preparing to skip initial sync\n");
3753                         drbd_bitmap_io(device, &drbd_bmio_clear_n_write,
3754                                         "clear_n_write from receive_uuids",
3755                                         BM_LOCKED_TEST_ALLOWED);
3756                         _drbd_uuid_set(device, UI_CURRENT, p_uuid[UI_CURRENT]);
3757                         _drbd_uuid_set(device, UI_BITMAP, 0);
3758                         _drbd_set_state(_NS2(device, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3759                                         CS_VERBOSE, NULL);
3760                         drbd_md_sync(device);
3761                         updated_uuids = 1;
3762                 }
3763                 put_ldev(device);
3764         } else if (device->state.disk < D_INCONSISTENT &&
3765                    device->state.role == R_PRIMARY) {
3766                 /* I am a diskless primary, the peer just created a new current UUID
3767                    for me. */
3768                 updated_uuids = drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
3769         }
3770
3771         /* Before we test for the disk state, we should wait until an eventually
3772            ongoing cluster wide state change is finished. That is important if
3773            we are primary and are detaching from our disk. We need to see the
3774            new disk state... */
3775         mutex_lock(device->state_mutex);
3776         mutex_unlock(device->state_mutex);
3777         if (device->state.conn >= C_CONNECTED && device->state.disk < D_INCONSISTENT)
3778                 updated_uuids |= drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
3779
3780         if (updated_uuids)
3781                 drbd_print_uuids(device, "receiver updated UUIDs to");
3782
3783         return 0;
3784 }
3785
3786 /**
3787  * convert_state() - Converts the peer's view of the cluster state to our point of view
3788  * @ps:         The state as seen by the peer.
3789  */
3790 static union drbd_state convert_state(union drbd_state ps)
3791 {
3792         union drbd_state ms;
3793
3794         static enum drbd_conns c_tab[] = {
3795                 [C_WF_REPORT_PARAMS] = C_WF_REPORT_PARAMS,
3796                 [C_CONNECTED] = C_CONNECTED,
3797
3798                 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3799                 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3800                 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3801                 [C_VERIFY_S]       = C_VERIFY_T,
3802                 [C_MASK]   = C_MASK,
3803         };
3804
3805         ms.i = ps.i;
3806
3807         ms.conn = c_tab[ps.conn];
3808         ms.peer = ps.role;
3809         ms.role = ps.peer;
3810         ms.pdsk = ps.disk;
3811         ms.disk = ps.pdsk;
3812         ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3813
3814         return ms;
3815 }
3816
3817 static int receive_req_state(struct drbd_connection *connection, struct packet_info *pi)
3818 {
3819         struct drbd_peer_device *peer_device;
3820         struct drbd_device *device;
3821         struct p_req_state *p = pi->data;
3822         union drbd_state mask, val;
3823         enum drbd_state_rv rv;
3824
3825         peer_device = conn_peer_device(connection, pi->vnr);
3826         if (!peer_device)
3827                 return -EIO;
3828         device = peer_device->device;
3829
3830         mask.i = be32_to_cpu(p->mask);
3831         val.i = be32_to_cpu(p->val);
3832
3833         if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags) &&
3834             mutex_is_locked(device->state_mutex)) {
3835                 drbd_send_sr_reply(peer_device, SS_CONCURRENT_ST_CHG);
3836                 return 0;
3837         }
3838
3839         mask = convert_state(mask);
3840         val = convert_state(val);
3841
3842         rv = drbd_change_state(device, CS_VERBOSE, mask, val);
3843         drbd_send_sr_reply(peer_device, rv);
3844
3845         drbd_md_sync(device);
3846
3847         return 0;
3848 }
3849
3850 static int receive_req_conn_state(struct drbd_connection *connection, struct packet_info *pi)
3851 {
3852         struct p_req_state *p = pi->data;
3853         union drbd_state mask, val;
3854         enum drbd_state_rv rv;
3855
3856         mask.i = be32_to_cpu(p->mask);
3857         val.i = be32_to_cpu(p->val);
3858
3859         if (test_bit(RESOLVE_CONFLICTS, &connection->flags) &&
3860             mutex_is_locked(&connection->cstate_mutex)) {
3861                 conn_send_sr_reply(connection, SS_CONCURRENT_ST_CHG);
3862                 return 0;
3863         }
3864
3865         mask = convert_state(mask);
3866         val = convert_state(val);
3867
3868         rv = conn_request_state(connection, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
3869         conn_send_sr_reply(connection, rv);
3870
3871         return 0;
3872 }
3873
3874 static int receive_state(struct drbd_connection *connection, struct packet_info *pi)
3875 {
3876         struct drbd_peer_device *peer_device;
3877         struct drbd_device *device;
3878         struct p_state *p = pi->data;
3879         union drbd_state os, ns, peer_state;
3880         enum drbd_disk_state real_peer_disk;
3881         enum chg_state_flags cs_flags;
3882         int rv;
3883
3884         peer_device = conn_peer_device(connection, pi->vnr);
3885         if (!peer_device)
3886                 return config_unknown_volume(connection, pi);
3887         device = peer_device->device;
3888
3889         peer_state.i = be32_to_cpu(p->state);
3890
3891         real_peer_disk = peer_state.disk;
3892         if (peer_state.disk == D_NEGOTIATING) {
3893                 real_peer_disk = device->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3894                 drbd_info(device, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3895         }
3896
3897         spin_lock_irq(&device->resource->req_lock);
3898  retry:
3899         os = ns = drbd_read_state(device);
3900         spin_unlock_irq(&device->resource->req_lock);
3901
3902         /* If some other part of the code (asender thread, timeout)
3903          * already decided to close the connection again,
3904          * we must not "re-establish" it here. */
3905         if (os.conn <= C_TEAR_DOWN)
3906                 return -ECONNRESET;
3907
3908         /* If this is the "end of sync" confirmation, usually the peer disk
3909          * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits
3910          * set) resync started in PausedSyncT, or if the timing of pause-/
3911          * unpause-sync events has been "just right", the peer disk may
3912          * transition from D_CONSISTENT to D_UP_TO_DATE as well.
3913          */
3914         if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) &&
3915             real_peer_disk == D_UP_TO_DATE &&
3916             os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
3917                 /* If we are (becoming) SyncSource, but peer is still in sync
3918                  * preparation, ignore its uptodate-ness to avoid flapping, it
3919                  * will change to inconsistent once the peer reaches active
3920                  * syncing states.
3921                  * It may have changed syncer-paused flags, however, so we
3922                  * cannot ignore this completely. */
3923                 if (peer_state.conn > C_CONNECTED &&
3924                     peer_state.conn < C_SYNC_SOURCE)
3925                         real_peer_disk = D_INCONSISTENT;
3926
3927                 /* if peer_state changes to connected at the same time,
3928                  * it explicitly notifies us that it finished resync.
3929                  * Maybe we should finish it up, too? */
3930                 else if (os.conn >= C_SYNC_SOURCE &&
3931                          peer_state.conn == C_CONNECTED) {
3932                         if (drbd_bm_total_weight(device) <= device->rs_failed)
3933                                 drbd_resync_finished(device);
3934                         return 0;
3935                 }
3936         }
3937
3938         /* explicit verify finished notification, stop sector reached. */
3939         if (os.conn == C_VERIFY_T && os.disk == D_UP_TO_DATE &&
3940             peer_state.conn == C_CONNECTED && real_peer_disk == D_UP_TO_DATE) {
3941                 ov_out_of_sync_print(device);
3942                 drbd_resync_finished(device);
3943                 return 0;
3944         }
3945
3946         /* peer says his disk is inconsistent, while we think it is uptodate,
3947          * and this happens while the peer still thinks we have a sync going on,
3948          * but we think we are already done with the sync.
3949          * We ignore this to avoid flapping pdsk.
3950          * This should not happen, if the peer is a recent version of drbd. */
3951         if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
3952             os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
3953                 real_peer_disk = D_UP_TO_DATE;
3954
3955         if (ns.conn == C_WF_REPORT_PARAMS)
3956                 ns.conn = C_CONNECTED;
3957
3958         if (peer_state.conn == C_AHEAD)
3959                 ns.conn = C_BEHIND;
3960
3961         if (device->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3962             get_ldev_if_state(device, D_NEGOTIATING)) {
3963                 int cr; /* consider resync */
3964
3965                 /* if we established a new connection */
3966                 cr  = (os.conn < C_CONNECTED);
3967                 /* if we had an established connection
3968                  * and one of the nodes newly attaches a disk */
3969                 cr |= (os.conn == C_CONNECTED &&
3970                        (peer_state.disk == D_NEGOTIATING ||
3971                         os.disk == D_NEGOTIATING));
3972                 /* if we have both been inconsistent, and the peer has been
3973                  * forced to be UpToDate with --overwrite-data */
3974                 cr |= test_bit(CONSIDER_RESYNC, &device->flags);
3975                 /* if we had been plain connected, and the admin requested to
3976                  * start a sync by "invalidate" or "invalidate-remote" */
3977                 cr |= (os.conn == C_CONNECTED &&
3978                                 (peer_state.conn >= C_STARTING_SYNC_S &&
3979                                  peer_state.conn <= C_WF_BITMAP_T));
3980
3981                 if (cr)
3982                         ns.conn = drbd_sync_handshake(peer_device, peer_state.role, real_peer_disk);
3983
3984                 put_ldev(device);
3985                 if (ns.conn == C_MASK) {
3986                         ns.conn = C_CONNECTED;
3987                         if (device->state.disk == D_NEGOTIATING) {
3988                                 drbd_force_state(device, NS(disk, D_FAILED));
3989                         } else if (peer_state.disk == D_NEGOTIATING) {
3990                                 drbd_err(device, "Disk attach process on the peer node was aborted.\n");
3991                                 peer_state.disk = D_DISKLESS;
3992                                 real_peer_disk = D_DISKLESS;
3993                         } else {
3994                                 if (test_and_clear_bit(CONN_DRY_RUN, &peer_device->connection->flags))
3995                                         return -EIO;
3996                                 D_ASSERT(device, os.conn == C_WF_REPORT_PARAMS);
3997                                 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3998                                 return -EIO;
3999                         }
4000                 }
4001         }
4002
4003         spin_lock_irq(&device->resource->req_lock);
4004         if (os.i != drbd_read_state(device).i)
4005                 goto retry;
4006         clear_bit(CONSIDER_RESYNC, &device->flags);
4007         ns.peer = peer_state.role;
4008         ns.pdsk = real_peer_disk;
4009         ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
4010         if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
4011                 ns.disk = device->new_state_tmp.disk;
4012         cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
4013         if (ns.pdsk == D_CONSISTENT && drbd_suspended(device) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
4014             test_bit(NEW_CUR_UUID, &device->flags)) {
4015                 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
4016                    for temporal network outages! */
4017                 spin_unlock_irq(&device->resource->req_lock);
4018                 drbd_err(device, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
4019                 tl_clear(peer_device->connection);
4020                 drbd_uuid_new_current(device);
4021                 clear_bit(NEW_CUR_UUID, &device->flags);
4022                 conn_request_state(peer_device->connection, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
4023                 return -EIO;
4024         }
4025         rv = _drbd_set_state(device, ns, cs_flags, NULL);
4026         ns = drbd_read_state(device);
4027         spin_unlock_irq(&device->resource->req_lock);
4028
4029         if (rv < SS_SUCCESS) {
4030                 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4031                 return -EIO;
4032         }
4033
4034         if (os.conn > C_WF_REPORT_PARAMS) {
4035                 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
4036                     peer_state.disk != D_NEGOTIATING ) {
4037                         /* we want resync, peer has not yet decided to sync... */
4038                         /* Nowadays only used when forcing a node into primary role and
4039                            setting its disk to UpToDate with that */
4040                         drbd_send_uuids(peer_device);
4041                         drbd_send_current_state(peer_device);
4042                 }
4043         }
4044
4045         clear_bit(DISCARD_MY_DATA, &device->flags);
4046
4047         drbd_md_sync(device); /* update connected indicator, la_size_sect, ... */
4048
4049         return 0;
4050 }
4051
4052 static int receive_sync_uuid(struct drbd_connection *connection, struct packet_info *pi)
4053 {
4054         struct drbd_peer_device *peer_device;
4055         struct drbd_device *device;
4056         struct p_rs_uuid *p = pi->data;
4057
4058         peer_device = conn_peer_device(connection, pi->vnr);
4059         if (!peer_device)
4060                 return -EIO;
4061         device = peer_device->device;
4062
4063         wait_event(device->misc_wait,
4064                    device->state.conn == C_WF_SYNC_UUID ||
4065                    device->state.conn == C_BEHIND ||
4066                    device->state.conn < C_CONNECTED ||
4067                    device->state.disk < D_NEGOTIATING);
4068
4069         /* D_ASSERT(device,  device->state.conn == C_WF_SYNC_UUID ); */
4070
4071         /* Here the _drbd_uuid_ functions are right, current should
4072            _not_ be rotated into the history */
4073         if (get_ldev_if_state(device, D_NEGOTIATING)) {
4074                 _drbd_uuid_set(device, UI_CURRENT, be64_to_cpu(p->uuid));
4075                 _drbd_uuid_set(device, UI_BITMAP, 0UL);
4076
4077                 drbd_print_uuids(device, "updated sync uuid");
4078                 drbd_start_resync(device, C_SYNC_TARGET);
4079
4080                 put_ldev(device);
4081         } else
4082                 drbd_err(device, "Ignoring SyncUUID packet!\n");
4083
4084         return 0;
4085 }
4086
4087 /**
4088  * receive_bitmap_plain
4089  *
4090  * Return 0 when done, 1 when another iteration is needed, and a negative error
4091  * code upon failure.
4092  */
4093 static int
4094 receive_bitmap_plain(struct drbd_peer_device *peer_device, unsigned int size,
4095                      unsigned long *p, struct bm_xfer_ctx *c)
4096 {
4097         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
4098                                  drbd_header_size(peer_device->connection);
4099         unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
4100                                        c->bm_words - c->word_offset);
4101         unsigned int want = num_words * sizeof(*p);
4102         int err;
4103
4104         if (want != size) {
4105                 drbd_err(peer_device, "%s:want (%u) != size (%u)\n", __func__, want, size);
4106                 return -EIO;
4107         }
4108         if (want == 0)
4109                 return 0;
4110         err = drbd_recv_all(peer_device->connection, p, want);
4111         if (err)
4112                 return err;
4113
4114         drbd_bm_merge_lel(peer_device->device, c->word_offset, num_words, p);
4115
4116         c->word_offset += num_words;
4117         c->bit_offset = c->word_offset * BITS_PER_LONG;
4118         if (c->bit_offset > c->bm_bits)
4119                 c->bit_offset = c->bm_bits;
4120
4121         return 1;
4122 }
4123
4124 static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
4125 {
4126         return (enum drbd_bitmap_code)(p->encoding & 0x0f);
4127 }
4128
4129 static int dcbp_get_start(struct p_compressed_bm *p)
4130 {
4131         return (p->encoding & 0x80) != 0;
4132 }
4133
4134 static int dcbp_get_pad_bits(struct p_compressed_bm *p)
4135 {
4136         return (p->encoding >> 4) & 0x7;
4137 }
4138
4139 /**
4140  * recv_bm_rle_bits
4141  *
4142  * Return 0 when done, 1 when another iteration is needed, and a negative error
4143  * code upon failure.
4144  */
4145 static int
4146 recv_bm_rle_bits(struct drbd_peer_device *peer_device,
4147                 struct p_compressed_bm *p,
4148                  struct bm_xfer_ctx *c,
4149                  unsigned int len)
4150 {
4151         struct bitstream bs;
4152         u64 look_ahead;
4153         u64 rl;
4154         u64 tmp;
4155         unsigned long s = c->bit_offset;
4156         unsigned long e;
4157         int toggle = dcbp_get_start(p);
4158         int have;
4159         int bits;
4160
4161         bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
4162
4163         bits = bitstream_get_bits(&bs, &look_ahead, 64);
4164         if (bits < 0)
4165                 return -EIO;
4166
4167         for (have = bits; have > 0; s += rl, toggle = !toggle) {
4168                 bits = vli_decode_bits(&rl, look_ahead);
4169                 if (bits <= 0)
4170                         return -EIO;
4171
4172                 if (toggle) {
4173                         e = s + rl -1;
4174                         if (e >= c->bm_bits) {
4175                                 drbd_err(peer_device, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
4176                                 return -EIO;
4177                         }
4178                         _drbd_bm_set_bits(peer_device->device, s, e);
4179                 }
4180
4181                 if (have < bits) {
4182                         drbd_err(peer_device, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
4183                                 have, bits, look_ahead,
4184                                 (unsigned int)(bs.cur.b - p->code),
4185                                 (unsigned int)bs.buf_len);
4186                         return -EIO;
4187                 }
4188                 /* if we consumed all 64 bits, assign 0; >> 64 is "undefined"; */
4189                 if (likely(bits < 64))
4190                         look_ahead >>= bits;
4191                 else
4192                         look_ahead = 0;
4193                 have -= bits;
4194
4195                 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
4196                 if (bits < 0)
4197                         return -EIO;
4198                 look_ahead |= tmp << have;
4199                 have += bits;
4200         }
4201
4202         c->bit_offset = s;
4203         bm_xfer_ctx_bit_to_word_offset(c);
4204
4205         return (s != c->bm_bits);
4206 }
4207
4208 /**
4209  * decode_bitmap_c
4210  *
4211  * Return 0 when done, 1 when another iteration is needed, and a negative error
4212  * code upon failure.
4213  */
4214 static int
4215 decode_bitmap_c(struct drbd_peer_device *peer_device,
4216                 struct p_compressed_bm *p,
4217                 struct bm_xfer_ctx *c,
4218                 unsigned int len)
4219 {
4220         if (dcbp_get_code(p) == RLE_VLI_Bits)
4221                 return recv_bm_rle_bits(peer_device, p, c, len - sizeof(*p));
4222
4223         /* other variants had been implemented for evaluation,
4224          * but have been dropped as this one turned out to be "best"
4225          * during all our tests. */
4226
4227         drbd_err(peer_device, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
4228         conn_request_state(peer_device->connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4229         return -EIO;
4230 }
4231
4232 void INFO_bm_xfer_stats(struct drbd_device *device,
4233                 const char *direction, struct bm_xfer_ctx *c)
4234 {
4235         /* what would it take to transfer it "plaintext" */
4236         unsigned int header_size = drbd_header_size(first_peer_device(device)->connection);
4237         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
4238         unsigned int plain =
4239                 header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
4240                 c->bm_words * sizeof(unsigned long);
4241         unsigned int total = c->bytes[0] + c->bytes[1];
4242         unsigned int r;
4243
4244         /* total can not be zero. but just in case: */
4245         if (total == 0)
4246                 return;
4247
4248         /* don't report if not compressed */
4249         if (total >= plain)
4250                 return;
4251
4252         /* total < plain. check for overflow, still */
4253         r = (total > UINT_MAX/1000) ? (total / (plain/1000))
4254                                     : (1000 * total / plain);
4255
4256         if (r > 1000)
4257                 r = 1000;
4258
4259         r = 1000 - r;
4260         drbd_info(device, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
4261              "total %u; compression: %u.%u%%\n",
4262                         direction,
4263                         c->bytes[1], c->packets[1],
4264                         c->bytes[0], c->packets[0],
4265                         total, r/10, r % 10);
4266 }
4267
4268 /* Since we are processing the bitfield from lower addresses to higher,
4269    it does not matter if the process it in 32 bit chunks or 64 bit
4270    chunks as long as it is little endian. (Understand it as byte stream,
4271    beginning with the lowest byte...) If we would use big endian
4272    we would need to process it from the highest address to the lowest,
4273    in order to be agnostic to the 32 vs 64 bits issue.
4274
4275    returns 0 on failure, 1 if we successfully received it. */
4276 static int receive_bitmap(struct drbd_connection *connection, struct packet_info *pi)
4277 {
4278         struct drbd_peer_device *peer_device;
4279         struct drbd_device *device;
4280         struct bm_xfer_ctx c;
4281         int err;
4282
4283         peer_device = conn_peer_device(connection, pi->vnr);
4284         if (!peer_device)
4285                 return -EIO;
4286         device = peer_device->device;
4287
4288         drbd_bm_lock(device, "receive bitmap", BM_LOCKED_SET_ALLOWED);
4289         /* you are supposed to send additional out-of-sync information
4290          * if you actually set bits during this phase */
4291
4292         c = (struct bm_xfer_ctx) {
4293                 .bm_bits = drbd_bm_bits(device),
4294                 .bm_words = drbd_bm_words(device),
4295         };
4296
4297         for(;;) {
4298                 if (pi->cmd == P_BITMAP)
4299                         err = receive_bitmap_plain(peer_device, pi->size, pi->data, &c);
4300                 else if (pi->cmd == P_COMPRESSED_BITMAP) {
4301                         /* MAYBE: sanity check that we speak proto >= 90,
4302                          * and the feature is enabled! */
4303                         struct p_compressed_bm *p = pi->data;
4304
4305                         if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(connection)) {
4306                                 drbd_err(device, "ReportCBitmap packet too large\n");
4307                                 err = -EIO;
4308                                 goto out;
4309                         }
4310                         if (pi->size <= sizeof(*p)) {
4311                                 drbd_err(device, "ReportCBitmap packet too small (l:%u)\n", pi->size);
4312                                 err = -EIO;
4313                                 goto out;
4314                         }
4315                         err = drbd_recv_all(peer_device->connection, p, pi->size);
4316                         if (err)
4317                                goto out;
4318                         err = decode_bitmap_c(peer_device, p, &c, pi->size);
4319                 } else {
4320                         drbd_warn(device, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
4321                         err = -EIO;
4322                         goto out;
4323                 }
4324
4325                 c.packets[pi->cmd == P_BITMAP]++;
4326                 c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(connection) + pi->size;
4327
4328                 if (err <= 0) {
4329                         if (err < 0)
4330                                 goto out;
4331                         break;
4332                 }
4333                 err = drbd_recv_header(peer_device->connection, pi);
4334                 if (err)
4335                         goto out;
4336         }
4337
4338         INFO_bm_xfer_stats(device, "receive", &c);
4339
4340         if (device->state.conn == C_WF_BITMAP_T) {
4341                 enum drbd_state_rv rv;
4342
4343                 err = drbd_send_bitmap(device);
4344                 if (err)
4345                         goto out;
4346                 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
4347                 rv = _drbd_request_state(device, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
4348                 D_ASSERT(device, rv == SS_SUCCESS);
4349         } else if (device->state.conn != C_WF_BITMAP_S) {
4350                 /* admin may have requested C_DISCONNECTING,
4351                  * other threads may have noticed network errors */
4352                 drbd_info(device, "unexpected cstate (%s) in receive_bitmap\n",
4353                     drbd_conn_str(device->state.conn));
4354         }
4355         err = 0;
4356
4357  out:
4358         drbd_bm_unlock(device);
4359         if (!err && device->state.conn == C_WF_BITMAP_S)
4360                 drbd_start_resync(device, C_SYNC_SOURCE);
4361         return err;
4362 }
4363
4364 static int receive_skip(struct drbd_connection *connection, struct packet_info *pi)
4365 {
4366         drbd_warn(connection, "skipping unknown optional packet type %d, l: %d!\n",
4367                  pi->cmd, pi->size);
4368
4369         return ignore_remaining_packet(connection, pi);
4370 }
4371
4372 static int receive_UnplugRemote(struct drbd_connection *connection, struct packet_info *pi)
4373 {
4374         /* Make sure we've acked all the TCP data associated
4375          * with the data requests being unplugged */
4376         drbd_tcp_quickack(connection->data.socket);
4377
4378         return 0;
4379 }
4380
4381 static int receive_out_of_sync(struct drbd_connection *connection, struct packet_info *pi)
4382 {
4383         struct drbd_peer_device *peer_device;
4384         struct drbd_device *device;
4385         struct p_block_desc *p = pi->data;
4386
4387         peer_device = conn_peer_device(connection, pi->vnr);
4388         if (!peer_device)
4389                 return -EIO;
4390         device = peer_device->device;
4391
4392         switch (device->state.conn) {
4393         case C_WF_SYNC_UUID:
4394         case C_WF_BITMAP_T:
4395         case C_BEHIND:
4396                         break;
4397         default:
4398                 drbd_err(device, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
4399                                 drbd_conn_str(device->state.conn));
4400         }
4401
4402         drbd_set_out_of_sync(device, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
4403
4404         return 0;
4405 }
4406
4407 struct data_cmd {
4408         int expect_payload;
4409         size_t pkt_size;
4410         int (*fn)(struct drbd_connection *, struct packet_info *);
4411 };
4412
4413 static struct data_cmd drbd_cmd_handler[] = {
4414         [P_DATA]            = { 1, sizeof(struct p_data), receive_Data },
4415         [P_DATA_REPLY]      = { 1, sizeof(struct p_data), receive_DataReply },
4416         [P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
4417         [P_BARRIER]         = { 0, sizeof(struct p_barrier), receive_Barrier } ,
4418         [P_BITMAP]          = { 1, 0, receive_bitmap } ,
4419         [P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
4420         [P_UNPLUG_REMOTE]   = { 0, 0, receive_UnplugRemote },
4421         [P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
4422         [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4423         [P_SYNC_PARAM]      = { 1, 0, receive_SyncParam },
4424         [P_SYNC_PARAM89]    = { 1, 0, receive_SyncParam },
4425         [P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
4426         [P_UUIDS]           = { 0, sizeof(struct p_uuids), receive_uuids },
4427         [P_SIZES]           = { 0, sizeof(struct p_sizes), receive_sizes },
4428         [P_STATE]           = { 0, sizeof(struct p_state), receive_state },
4429         [P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
4430         [P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
4431         [P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
4432         [P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
4433         [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4434         [P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
4435         [P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
4436         [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
4437         [P_PROTOCOL_UPDATE] = { 1, sizeof(struct p_protocol), receive_protocol },
4438 };
4439
4440 static void drbdd(struct drbd_connection *connection)
4441 {
4442         struct packet_info pi;
4443         size_t shs; /* sub header size */
4444         int err;
4445
4446         while (get_t_state(&connection->receiver) == RUNNING) {
4447                 struct data_cmd *cmd;
4448
4449                 drbd_thread_current_set_cpu(&connection->receiver);
4450                 if (drbd_recv_header(connection, &pi))
4451                         goto err_out;
4452
4453                 cmd = &drbd_cmd_handler[pi.cmd];
4454                 if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
4455                         drbd_err(connection, "Unexpected data packet %s (0x%04x)",
4456                                  cmdname(pi.cmd), pi.cmd);
4457                         goto err_out;
4458                 }
4459
4460                 shs = cmd->pkt_size;
4461                 if (pi.size > shs && !cmd->expect_payload) {
4462                         drbd_err(connection, "No payload expected %s l:%d\n",
4463                                  cmdname(pi.cmd), pi.size);
4464                         goto err_out;
4465                 }
4466
4467                 if (shs) {
4468                         err = drbd_recv_all_warn(connection, pi.data, shs);
4469                         if (err)
4470                                 goto err_out;
4471                         pi.size -= shs;
4472                 }
4473
4474                 err = cmd->fn(connection, &pi);
4475                 if (err) {
4476                         drbd_err(connection, "error receiving %s, e: %d l: %d!\n",
4477                                  cmdname(pi.cmd), err, pi.size);
4478                         goto err_out;
4479                 }
4480         }
4481         return;
4482
4483     err_out:
4484         conn_request_state(connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4485 }
4486
4487 static void conn_disconnect(struct drbd_connection *connection)
4488 {
4489         struct drbd_peer_device *peer_device;
4490         enum drbd_conns oc;
4491         int vnr;
4492
4493         if (connection->cstate == C_STANDALONE)
4494                 return;
4495
4496         /* We are about to start the cleanup after connection loss.
4497          * Make sure drbd_make_request knows about that.
4498          * Usually we should be in some network failure state already,
4499          * but just in case we are not, we fix it up here.
4500          */
4501         conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
4502
4503         /* asender does not clean up anything. it must not interfere, either */
4504         drbd_thread_stop(&connection->asender);
4505         drbd_free_sock(connection);
4506
4507         rcu_read_lock();
4508         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
4509                 struct drbd_device *device = peer_device->device;
4510                 kref_get(&device->kref);
4511                 rcu_read_unlock();
4512                 drbd_disconnected(peer_device);
4513                 kref_put(&device->kref, drbd_destroy_device);
4514                 rcu_read_lock();
4515         }
4516         rcu_read_unlock();
4517
4518         if (!list_empty(&connection->current_epoch->list))
4519                 drbd_err(connection, "ASSERTION FAILED: connection->current_epoch->list not empty\n");
4520         /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
4521         atomic_set(&connection->current_epoch->epoch_size, 0);
4522         connection->send.seen_any_write_yet = false;
4523
4524         drbd_info(connection, "Connection closed\n");
4525
4526         if (conn_highest_role(connection) == R_PRIMARY && conn_highest_pdsk(connection) >= D_UNKNOWN)
4527                 conn_try_outdate_peer_async(connection);
4528
4529         spin_lock_irq(&connection->resource->req_lock);
4530         oc = connection->cstate;
4531         if (oc >= C_UNCONNECTED)
4532                 _conn_request_state(connection, NS(conn, C_UNCONNECTED), CS_VERBOSE);
4533
4534         spin_unlock_irq(&connection->resource->req_lock);
4535
4536         if (oc == C_DISCONNECTING)
4537                 conn_request_state(connection, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD);
4538 }
4539
4540 static int drbd_disconnected(struct drbd_peer_device *peer_device)
4541 {
4542         struct drbd_device *device = peer_device->device;
4543         unsigned int i;
4544
4545         /* wait for current activity to cease. */
4546         spin_lock_irq(&device->resource->req_lock);
4547         _drbd_wait_ee_list_empty(device, &device->active_ee);
4548         _drbd_wait_ee_list_empty(device, &device->sync_ee);
4549         _drbd_wait_ee_list_empty(device, &device->read_ee);
4550         spin_unlock_irq(&device->resource->req_lock);
4551
4552         /* We do not have data structures that would allow us to
4553          * get the rs_pending_cnt down to 0 again.
4554          *  * On C_SYNC_TARGET we do not have any data structures describing
4555          *    the pending RSDataRequest's we have sent.
4556          *  * On C_SYNC_SOURCE there is no data structure that tracks
4557          *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
4558          *  And no, it is not the sum of the reference counts in the
4559          *  resync_LRU. The resync_LRU tracks the whole operation including
4560          *  the disk-IO, while the rs_pending_cnt only tracks the blocks
4561          *  on the fly. */
4562         drbd_rs_cancel_all(device);
4563         device->rs_total = 0;
4564         device->rs_failed = 0;
4565         atomic_set(&device->rs_pending_cnt, 0);
4566         wake_up(&device->misc_wait);
4567
4568         del_timer_sync(&device->resync_timer);
4569         resync_timer_fn((unsigned long)device);
4570
4571         /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
4572          * w_make_resync_request etc. which may still be on the worker queue
4573          * to be "canceled" */
4574         drbd_flush_workqueue(&peer_device->connection->sender_work);
4575
4576         drbd_finish_peer_reqs(device);
4577
4578         /* This second workqueue flush is necessary, since drbd_finish_peer_reqs()
4579            might have issued a work again. The one before drbd_finish_peer_reqs() is
4580            necessary to reclain net_ee in drbd_finish_peer_reqs(). */
4581         drbd_flush_workqueue(&peer_device->connection->sender_work);
4582
4583         /* need to do it again, drbd_finish_peer_reqs() may have populated it
4584          * again via drbd_try_clear_on_disk_bm(). */
4585         drbd_rs_cancel_all(device);
4586
4587         kfree(device->p_uuid);
4588         device->p_uuid = NULL;
4589
4590         if (!drbd_suspended(device))
4591                 tl_clear(peer_device->connection);
4592
4593         drbd_md_sync(device);
4594
4595         /* serialize with bitmap writeout triggered by the state change,
4596          * if any. */
4597         wait_event(device->misc_wait, !test_bit(BITMAP_IO, &device->flags));
4598
4599         /* tcp_close and release of sendpage pages can be deferred.  I don't
4600          * want to use SO_LINGER, because apparently it can be deferred for
4601          * more than 20 seconds (longest time I checked).
4602          *
4603          * Actually we don't care for exactly when the network stack does its
4604          * put_page(), but release our reference on these pages right here.
4605          */
4606         i = drbd_free_peer_reqs(device, &device->net_ee);
4607         if (i)
4608                 drbd_info(device, "net_ee not empty, killed %u entries\n", i);
4609         i = atomic_read(&device->pp_in_use_by_net);
4610         if (i)
4611                 drbd_info(device, "pp_in_use_by_net = %d, expected 0\n", i);
4612         i = atomic_read(&device->pp_in_use);
4613         if (i)
4614                 drbd_info(device, "pp_in_use = %d, expected 0\n", i);
4615
4616         D_ASSERT(device, list_empty(&device->read_ee));
4617         D_ASSERT(device, list_empty(&device->active_ee));
4618         D_ASSERT(device, list_empty(&device->sync_ee));
4619         D_ASSERT(device, list_empty(&device->done_ee));
4620
4621         return 0;
4622 }
4623
4624 /*
4625  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
4626  * we can agree on is stored in agreed_pro_version.
4627  *
4628  * feature flags and the reserved array should be enough room for future
4629  * enhancements of the handshake protocol, and possible plugins...
4630  *
4631  * for now, they are expected to be zero, but ignored.
4632  */
4633 static int drbd_send_features(struct drbd_connection *connection)
4634 {
4635         struct drbd_socket *sock;
4636         struct p_connection_features *p;
4637
4638         sock = &connection->data;
4639         p = conn_prepare_command(connection, sock);
4640         if (!p)
4641                 return -EIO;
4642         memset(p, 0, sizeof(*p));
4643         p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
4644         p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
4645         return conn_send_command(connection, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
4646 }
4647
4648 /*
4649  * return values:
4650  *   1 yes, we have a valid connection
4651  *   0 oops, did not work out, please try again
4652  *  -1 peer talks different language,
4653  *     no point in trying again, please go standalone.
4654  */
4655 static int drbd_do_features(struct drbd_connection *connection)
4656 {
4657         /* ASSERT current == connection->receiver ... */
4658         struct p_connection_features *p;
4659         const int expect = sizeof(struct p_connection_features);
4660         struct packet_info pi;
4661         int err;
4662
4663         err = drbd_send_features(connection);
4664         if (err)
4665                 return 0;
4666
4667         err = drbd_recv_header(connection, &pi);
4668         if (err)
4669                 return 0;
4670
4671         if (pi.cmd != P_CONNECTION_FEATURES) {
4672                 drbd_err(connection, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
4673                          cmdname(pi.cmd), pi.cmd);
4674                 return -1;
4675         }
4676
4677         if (pi.size != expect) {
4678                 drbd_err(connection, "expected ConnectionFeatures length: %u, received: %u\n",
4679                      expect, pi.size);
4680                 return -1;
4681         }
4682
4683         p = pi.data;
4684         err = drbd_recv_all_warn(connection, p, expect);
4685         if (err)
4686                 return 0;
4687
4688         p->protocol_min = be32_to_cpu(p->protocol_min);
4689         p->protocol_max = be32_to_cpu(p->protocol_max);
4690         if (p->protocol_max == 0)
4691                 p->protocol_max = p->protocol_min;
4692
4693         if (PRO_VERSION_MAX < p->protocol_min ||
4694             PRO_VERSION_MIN > p->protocol_max)
4695                 goto incompat;
4696
4697         connection->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
4698
4699         drbd_info(connection, "Handshake successful: "
4700              "Agreed network protocol version %d\n", connection->agreed_pro_version);
4701
4702         return 1;
4703
4704  incompat:
4705         drbd_err(connection, "incompatible DRBD dialects: "
4706             "I support %d-%d, peer supports %d-%d\n",
4707             PRO_VERSION_MIN, PRO_VERSION_MAX,
4708             p->protocol_min, p->protocol_max);
4709         return -1;
4710 }
4711
4712 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4713 static int drbd_do_auth(struct drbd_connection *connection)
4714 {
4715         drbd_err(connection, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4716         drbd_err(connection, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
4717         return -1;
4718 }
4719 #else
4720 #define CHALLENGE_LEN 64
4721
4722 /* Return value:
4723         1 - auth succeeded,
4724         0 - failed, try again (network error),
4725         -1 - auth failed, don't try again.
4726 */
4727
4728 static int drbd_do_auth(struct drbd_connection *connection)
4729 {
4730         struct drbd_socket *sock;
4731         char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
4732         struct scatterlist sg;
4733         char *response = NULL;
4734         char *right_response = NULL;
4735         char *peers_ch = NULL;
4736         unsigned int key_len;
4737         char secret[SHARED_SECRET_MAX]; /* 64 byte */
4738         unsigned int resp_size;
4739         struct hash_desc desc;
4740         struct packet_info pi;
4741         struct net_conf *nc;
4742         int err, rv;
4743
4744         /* FIXME: Put the challenge/response into the preallocated socket buffer.  */
4745
4746         rcu_read_lock();
4747         nc = rcu_dereference(connection->net_conf);
4748         key_len = strlen(nc->shared_secret);
4749         memcpy(secret, nc->shared_secret, key_len);
4750         rcu_read_unlock();
4751
4752         desc.tfm = connection->cram_hmac_tfm;
4753         desc.flags = 0;
4754
4755         rv = crypto_hash_setkey(connection->cram_hmac_tfm, (u8 *)secret, key_len);
4756         if (rv) {
4757                 drbd_err(connection, "crypto_hash_setkey() failed with %d\n", rv);
4758                 rv = -1;
4759                 goto fail;
4760         }
4761
4762         get_random_bytes(my_challenge, CHALLENGE_LEN);
4763
4764         sock = &connection->data;
4765         if (!conn_prepare_command(connection, sock)) {
4766                 rv = 0;
4767                 goto fail;
4768         }
4769         rv = !conn_send_command(connection, sock, P_AUTH_CHALLENGE, 0,
4770                                 my_challenge, CHALLENGE_LEN);
4771         if (!rv)
4772                 goto fail;
4773
4774         err = drbd_recv_header(connection, &pi);
4775         if (err) {
4776                 rv = 0;
4777                 goto fail;
4778         }
4779
4780         if (pi.cmd != P_AUTH_CHALLENGE) {
4781                 drbd_err(connection, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4782                          cmdname(pi.cmd), pi.cmd);
4783                 rv = 0;
4784                 goto fail;
4785         }
4786
4787         if (pi.size > CHALLENGE_LEN * 2) {
4788                 drbd_err(connection, "expected AuthChallenge payload too big.\n");
4789                 rv = -1;
4790                 goto fail;
4791         }
4792
4793         peers_ch = kmalloc(pi.size, GFP_NOIO);
4794         if (peers_ch == NULL) {
4795                 drbd_err(connection, "kmalloc of peers_ch failed\n");
4796                 rv = -1;
4797                 goto fail;
4798         }
4799
4800         err = drbd_recv_all_warn(connection, peers_ch, pi.size);
4801         if (err) {
4802                 rv = 0;
4803                 goto fail;
4804         }
4805
4806         resp_size = crypto_hash_digestsize(connection->cram_hmac_tfm);
4807         response = kmalloc(resp_size, GFP_NOIO);
4808         if (response == NULL) {
4809                 drbd_err(connection, "kmalloc of response failed\n");
4810                 rv = -1;
4811                 goto fail;
4812         }
4813
4814         sg_init_table(&sg, 1);
4815         sg_set_buf(&sg, peers_ch, pi.size);
4816
4817         rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4818         if (rv) {
4819                 drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
4820                 rv = -1;
4821                 goto fail;
4822         }
4823
4824         if (!conn_prepare_command(connection, sock)) {
4825                 rv = 0;
4826                 goto fail;
4827         }
4828         rv = !conn_send_command(connection, sock, P_AUTH_RESPONSE, 0,
4829                                 response, resp_size);
4830         if (!rv)
4831                 goto fail;
4832
4833         err = drbd_recv_header(connection, &pi);
4834         if (err) {
4835                 rv = 0;
4836                 goto fail;
4837         }
4838
4839         if (pi.cmd != P_AUTH_RESPONSE) {
4840                 drbd_err(connection, "expected AuthResponse packet, received: %s (0x%04x)\n",
4841                          cmdname(pi.cmd), pi.cmd);
4842                 rv = 0;
4843                 goto fail;
4844         }
4845
4846         if (pi.size != resp_size) {
4847                 drbd_err(connection, "expected AuthResponse payload of wrong size\n");
4848                 rv = 0;
4849                 goto fail;
4850         }
4851
4852         err = drbd_recv_all_warn(connection, response , resp_size);
4853         if (err) {
4854                 rv = 0;
4855                 goto fail;
4856         }
4857
4858         right_response = kmalloc(resp_size, GFP_NOIO);
4859         if (right_response == NULL) {
4860                 drbd_err(connection, "kmalloc of right_response failed\n");
4861                 rv = -1;
4862                 goto fail;
4863         }
4864
4865         sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4866
4867         rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4868         if (rv) {
4869                 drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
4870                 rv = -1;
4871                 goto fail;
4872         }
4873
4874         rv = !memcmp(response, right_response, resp_size);
4875
4876         if (rv)
4877                 drbd_info(connection, "Peer authenticated using %d bytes HMAC\n",
4878                      resp_size);
4879         else
4880                 rv = -1;
4881
4882  fail:
4883         kfree(peers_ch);
4884         kfree(response);
4885         kfree(right_response);
4886
4887         return rv;
4888 }
4889 #endif
4890
4891 int drbd_receiver(struct drbd_thread *thi)
4892 {
4893         struct drbd_connection *connection = thi->connection;
4894         int h;
4895
4896         drbd_info(connection, "receiver (re)started\n");
4897
4898         do {
4899                 h = conn_connect(connection);
4900                 if (h == 0) {
4901                         conn_disconnect(connection);
4902                         schedule_timeout_interruptible(HZ);
4903                 }
4904                 if (h == -1) {
4905                         drbd_warn(connection, "Discarding network configuration.\n");
4906                         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
4907                 }
4908         } while (h == 0);
4909
4910         if (h > 0)
4911                 drbdd(connection);
4912
4913         conn_disconnect(connection);
4914
4915         drbd_info(connection, "receiver terminated\n");
4916         return 0;
4917 }
4918
4919 /* ********* acknowledge sender ******** */
4920
4921 static int got_conn_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
4922 {
4923         struct p_req_state_reply *p = pi->data;
4924         int retcode = be32_to_cpu(p->retcode);
4925
4926         if (retcode >= SS_SUCCESS) {
4927                 set_bit(CONN_WD_ST_CHG_OKAY, &connection->flags);
4928         } else {
4929                 set_bit(CONN_WD_ST_CHG_FAIL, &connection->flags);
4930                 drbd_err(connection, "Requested state change failed by peer: %s (%d)\n",
4931                          drbd_set_st_err_str(retcode), retcode);
4932         }
4933         wake_up(&connection->ping_wait);
4934
4935         return 0;
4936 }
4937
4938 static int got_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
4939 {
4940         struct drbd_peer_device *peer_device;
4941         struct drbd_device *device;
4942         struct p_req_state_reply *p = pi->data;
4943         int retcode = be32_to_cpu(p->retcode);
4944
4945         peer_device = conn_peer_device(connection, pi->vnr);
4946         if (!peer_device)
4947                 return -EIO;
4948         device = peer_device->device;
4949
4950         if (test_bit(CONN_WD_ST_CHG_REQ, &connection->flags)) {
4951                 D_ASSERT(device, connection->agreed_pro_version < 100);
4952                 return got_conn_RqSReply(connection, pi);
4953         }
4954
4955         if (retcode >= SS_SUCCESS) {
4956                 set_bit(CL_ST_CHG_SUCCESS, &device->flags);
4957         } else {
4958                 set_bit(CL_ST_CHG_FAIL, &device->flags);
4959                 drbd_err(device, "Requested state change failed by peer: %s (%d)\n",
4960                         drbd_set_st_err_str(retcode), retcode);
4961         }
4962         wake_up(&device->state_wait);
4963
4964         return 0;
4965 }
4966
4967 static int got_Ping(struct drbd_connection *connection, struct packet_info *pi)
4968 {
4969         return drbd_send_ping_ack(connection);
4970
4971 }
4972
4973 static int got_PingAck(struct drbd_connection *connection, struct packet_info *pi)
4974 {
4975         /* restore idle timeout */
4976         connection->meta.socket->sk->sk_rcvtimeo = connection->net_conf->ping_int*HZ;
4977         if (!test_and_set_bit(GOT_PING_ACK, &connection->flags))
4978                 wake_up(&connection->ping_wait);
4979
4980         return 0;
4981 }
4982
4983 static int got_IsInSync(struct drbd_connection *connection, struct packet_info *pi)
4984 {
4985         struct drbd_peer_device *peer_device;
4986         struct drbd_device *device;
4987         struct p_block_ack *p = pi->data;
4988         sector_t sector = be64_to_cpu(p->sector);
4989         int blksize = be32_to_cpu(p->blksize);
4990
4991         peer_device = conn_peer_device(connection, pi->vnr);
4992         if (!peer_device)
4993                 return -EIO;
4994         device = peer_device->device;
4995
4996         D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
4997
4998         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
4999
5000         if (get_ldev(device)) {
5001                 drbd_rs_complete_io(device, sector);
5002                 drbd_set_in_sync(device, sector, blksize);
5003                 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
5004                 device->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
5005                 put_ldev(device);
5006         }
5007         dec_rs_pending(device);
5008         atomic_add(blksize >> 9, &device->rs_sect_in);
5009
5010         return 0;
5011 }
5012
5013 static int
5014 validate_req_change_req_state(struct drbd_device *device, u64 id, sector_t sector,
5015                               struct rb_root *root, const char *func,
5016                               enum drbd_req_event what, bool missing_ok)
5017 {
5018         struct drbd_request *req;
5019         struct bio_and_error m;
5020
5021         spin_lock_irq(&device->resource->req_lock);
5022         req = find_request(device, root, id, sector, missing_ok, func);
5023         if (unlikely(!req)) {
5024                 spin_unlock_irq(&device->resource->req_lock);
5025                 return -EIO;
5026         }
5027         __req_mod(req, what, &m);
5028         spin_unlock_irq(&device->resource->req_lock);
5029
5030         if (m.bio)
5031                 complete_master_bio(device, &m);
5032         return 0;
5033 }
5034
5035 static int got_BlockAck(struct drbd_connection *connection, struct packet_info *pi)
5036 {
5037         struct drbd_peer_device *peer_device;
5038         struct drbd_device *device;
5039         struct p_block_ack *p = pi->data;
5040         sector_t sector = be64_to_cpu(p->sector);
5041         int blksize = be32_to_cpu(p->blksize);
5042         enum drbd_req_event what;
5043
5044         peer_device = conn_peer_device(connection, pi->vnr);
5045         if (!peer_device)
5046                 return -EIO;
5047         device = peer_device->device;
5048
5049         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5050
5051         if (p->block_id == ID_SYNCER) {
5052                 drbd_set_in_sync(device, sector, blksize);
5053                 dec_rs_pending(device);
5054                 return 0;
5055         }
5056         switch (pi->cmd) {
5057         case P_RS_WRITE_ACK:
5058                 what = WRITE_ACKED_BY_PEER_AND_SIS;
5059                 break;
5060         case P_WRITE_ACK:
5061                 what = WRITE_ACKED_BY_PEER;
5062                 break;
5063         case P_RECV_ACK:
5064                 what = RECV_ACKED_BY_PEER;
5065                 break;
5066         case P_SUPERSEDED:
5067                 what = CONFLICT_RESOLVED;
5068                 break;
5069         case P_RETRY_WRITE:
5070                 what = POSTPONE_WRITE;
5071                 break;
5072         default:
5073                 BUG();
5074         }
5075
5076         return validate_req_change_req_state(device, p->block_id, sector,
5077                                              &device->write_requests, __func__,
5078                                              what, false);
5079 }
5080
5081 static int got_NegAck(struct drbd_connection *connection, struct packet_info *pi)
5082 {
5083         struct drbd_peer_device *peer_device;
5084         struct drbd_device *device;
5085         struct p_block_ack *p = pi->data;
5086         sector_t sector = be64_to_cpu(p->sector);
5087         int size = be32_to_cpu(p->blksize);
5088         int err;
5089
5090         peer_device = conn_peer_device(connection, pi->vnr);
5091         if (!peer_device)
5092                 return -EIO;
5093         device = peer_device->device;
5094
5095         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5096
5097         if (p->block_id == ID_SYNCER) {
5098                 dec_rs_pending(device);
5099                 drbd_rs_failed_io(device, sector, size);
5100                 return 0;
5101         }
5102
5103         err = validate_req_change_req_state(device, p->block_id, sector,
5104                                             &device->write_requests, __func__,
5105                                             NEG_ACKED, true);
5106         if (err) {
5107                 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
5108                    The master bio might already be completed, therefore the
5109                    request is no longer in the collision hash. */
5110                 /* In Protocol B we might already have got a P_RECV_ACK
5111                    but then get a P_NEG_ACK afterwards. */
5112                 drbd_set_out_of_sync(device, sector, size);
5113         }
5114         return 0;
5115 }
5116
5117 static int got_NegDReply(struct drbd_connection *connection, struct packet_info *pi)
5118 {
5119         struct drbd_peer_device *peer_device;
5120         struct drbd_device *device;
5121         struct p_block_ack *p = pi->data;
5122         sector_t sector = be64_to_cpu(p->sector);
5123
5124         peer_device = conn_peer_device(connection, pi->vnr);
5125         if (!peer_device)
5126                 return -EIO;
5127         device = peer_device->device;
5128
5129         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5130
5131         drbd_err(device, "Got NegDReply; Sector %llus, len %u.\n",
5132             (unsigned long long)sector, be32_to_cpu(p->blksize));
5133
5134         return validate_req_change_req_state(device, p->block_id, sector,
5135                                              &device->read_requests, __func__,
5136                                              NEG_ACKED, false);
5137 }
5138
5139 static int got_NegRSDReply(struct drbd_connection *connection, struct packet_info *pi)
5140 {
5141         struct drbd_peer_device *peer_device;
5142         struct drbd_device *device;
5143         sector_t sector;
5144         int size;
5145         struct p_block_ack *p = pi->data;
5146
5147         peer_device = conn_peer_device(connection, pi->vnr);
5148         if (!peer_device)
5149                 return -EIO;
5150         device = peer_device->device;
5151
5152         sector = be64_to_cpu(p->sector);
5153         size = be32_to_cpu(p->blksize);
5154
5155         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5156
5157         dec_rs_pending(device);
5158
5159         if (get_ldev_if_state(device, D_FAILED)) {
5160                 drbd_rs_complete_io(device, sector);
5161                 switch (pi->cmd) {
5162                 case P_NEG_RS_DREPLY:
5163                         drbd_rs_failed_io(device, sector, size);
5164                 case P_RS_CANCEL:
5165                         break;
5166                 default:
5167                         BUG();
5168                 }
5169                 put_ldev(device);
5170         }
5171
5172         return 0;
5173 }
5174
5175 static int got_BarrierAck(struct drbd_connection *connection, struct packet_info *pi)
5176 {
5177         struct p_barrier_ack *p = pi->data;
5178         struct drbd_peer_device *peer_device;
5179         int vnr;
5180
5181         tl_release(connection, p->barrier, be32_to_cpu(p->set_size));
5182
5183         rcu_read_lock();
5184         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5185                 struct drbd_device *device = peer_device->device;
5186
5187                 if (device->state.conn == C_AHEAD &&
5188                     atomic_read(&device->ap_in_flight) == 0 &&
5189                     !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &device->flags)) {
5190                         device->start_resync_timer.expires = jiffies + HZ;
5191                         add_timer(&device->start_resync_timer);
5192                 }
5193         }
5194         rcu_read_unlock();
5195
5196         return 0;
5197 }
5198
5199 static int got_OVResult(struct drbd_connection *connection, struct packet_info *pi)
5200 {
5201         struct drbd_peer_device *peer_device;
5202         struct drbd_device *device;
5203         struct p_block_ack *p = pi->data;
5204         struct drbd_device_work *dw;
5205         sector_t sector;
5206         int size;
5207
5208         peer_device = conn_peer_device(connection, pi->vnr);
5209         if (!peer_device)
5210                 return -EIO;
5211         device = peer_device->device;
5212
5213         sector = be64_to_cpu(p->sector);
5214         size = be32_to_cpu(p->blksize);
5215
5216         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5217
5218         if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
5219                 drbd_ov_out_of_sync_found(device, sector, size);
5220         else
5221                 ov_out_of_sync_print(device);
5222
5223         if (!get_ldev(device))
5224                 return 0;
5225
5226         drbd_rs_complete_io(device, sector);
5227         dec_rs_pending(device);
5228
5229         --device->ov_left;
5230
5231         /* let's advance progress step marks only for every other megabyte */
5232         if ((device->ov_left & 0x200) == 0x200)
5233                 drbd_advance_rs_marks(device, device->ov_left);
5234
5235         if (device->ov_left == 0) {
5236                 dw = kmalloc(sizeof(*dw), GFP_NOIO);
5237                 if (dw) {
5238                         dw->w.cb = w_ov_finished;
5239                         dw->device = device;
5240                         drbd_queue_work(&peer_device->connection->sender_work, &dw->w);
5241                 } else {
5242                         drbd_err(device, "kmalloc(dw) failed.");
5243                         ov_out_of_sync_print(device);
5244                         drbd_resync_finished(device);
5245                 }
5246         }
5247         put_ldev(device);
5248         return 0;
5249 }
5250
5251 static int got_skip(struct drbd_connection *connection, struct packet_info *pi)
5252 {
5253         return 0;
5254 }
5255
5256 static int connection_finish_peer_reqs(struct drbd_connection *connection)
5257 {
5258         struct drbd_peer_device *peer_device;
5259         int vnr, not_empty = 0;
5260
5261         do {
5262                 clear_bit(SIGNAL_ASENDER, &connection->flags);
5263                 flush_signals(current);
5264
5265                 rcu_read_lock();
5266                 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5267                         struct drbd_device *device = peer_device->device;
5268                         kref_get(&device->kref);
5269                         rcu_read_unlock();
5270                         if (drbd_finish_peer_reqs(device)) {
5271                                 kref_put(&device->kref, drbd_destroy_device);
5272                                 return 1;
5273                         }
5274                         kref_put(&device->kref, drbd_destroy_device);
5275                         rcu_read_lock();
5276                 }
5277                 set_bit(SIGNAL_ASENDER, &connection->flags);
5278
5279                 spin_lock_irq(&connection->resource->req_lock);
5280                 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5281                         struct drbd_device *device = peer_device->device;
5282                         not_empty = !list_empty(&device->done_ee);
5283                         if (not_empty)
5284                                 break;
5285                 }
5286                 spin_unlock_irq(&connection->resource->req_lock);
5287                 rcu_read_unlock();
5288         } while (not_empty);
5289
5290         return 0;
5291 }
5292
5293 struct asender_cmd {
5294         size_t pkt_size;
5295         int (*fn)(struct drbd_connection *connection, struct packet_info *);
5296 };
5297
5298 static struct asender_cmd asender_tbl[] = {
5299         [P_PING]            = { 0, got_Ping },
5300         [P_PING_ACK]        = { 0, got_PingAck },
5301         [P_RECV_ACK]        = { sizeof(struct p_block_ack), got_BlockAck },
5302         [P_WRITE_ACK]       = { sizeof(struct p_block_ack), got_BlockAck },
5303         [P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
5304         [P_SUPERSEDED]   = { sizeof(struct p_block_ack), got_BlockAck },
5305         [P_NEG_ACK]         = { sizeof(struct p_block_ack), got_NegAck },
5306         [P_NEG_DREPLY]      = { sizeof(struct p_block_ack), got_NegDReply },
5307         [P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply },
5308         [P_OV_RESULT]       = { sizeof(struct p_block_ack), got_OVResult },
5309         [P_BARRIER_ACK]     = { sizeof(struct p_barrier_ack), got_BarrierAck },
5310         [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
5311         [P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
5312         [P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
5313         [P_RS_CANCEL]       = { sizeof(struct p_block_ack), got_NegRSDReply },
5314         [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
5315         [P_RETRY_WRITE]     = { sizeof(struct p_block_ack), got_BlockAck },
5316 };
5317
5318 int drbd_asender(struct drbd_thread *thi)
5319 {
5320         struct drbd_connection *connection = thi->connection;
5321         struct asender_cmd *cmd = NULL;
5322         struct packet_info pi;
5323         int rv;
5324         void *buf    = connection->meta.rbuf;
5325         int received = 0;
5326         unsigned int header_size = drbd_header_size(connection);
5327         int expect   = header_size;
5328         bool ping_timeout_active = false;
5329         struct net_conf *nc;
5330         int ping_timeo, tcp_cork, ping_int;
5331         struct sched_param param = { .sched_priority = 2 };
5332
5333         rv = sched_setscheduler(current, SCHED_RR, &param);
5334         if (rv < 0)
5335                 drbd_err(connection, "drbd_asender: ERROR set priority, ret=%d\n", rv);
5336
5337         while (get_t_state(thi) == RUNNING) {
5338                 drbd_thread_current_set_cpu(thi);
5339
5340                 rcu_read_lock();
5341                 nc = rcu_dereference(connection->net_conf);
5342                 ping_timeo = nc->ping_timeo;
5343                 tcp_cork = nc->tcp_cork;
5344                 ping_int = nc->ping_int;
5345                 rcu_read_unlock();
5346
5347                 if (test_and_clear_bit(SEND_PING, &connection->flags)) {
5348                         if (drbd_send_ping(connection)) {
5349                                 drbd_err(connection, "drbd_send_ping has failed\n");
5350                                 goto reconnect;
5351                         }
5352                         connection->meta.socket->sk->sk_rcvtimeo = ping_timeo * HZ / 10;
5353                         ping_timeout_active = true;
5354                 }
5355
5356                 /* TODO: conditionally cork; it may hurt latency if we cork without
5357                    much to send */
5358                 if (tcp_cork)
5359                         drbd_tcp_cork(connection->meta.socket);
5360                 if (connection_finish_peer_reqs(connection)) {
5361                         drbd_err(connection, "connection_finish_peer_reqs() failed\n");
5362                         goto reconnect;
5363                 }
5364                 /* but unconditionally uncork unless disabled */
5365                 if (tcp_cork)
5366                         drbd_tcp_uncork(connection->meta.socket);
5367
5368                 /* short circuit, recv_msg would return EINTR anyways. */
5369                 if (signal_pending(current))
5370                         continue;
5371
5372                 rv = drbd_recv_short(connection->meta.socket, buf, expect-received, 0);
5373                 clear_bit(SIGNAL_ASENDER, &connection->flags);
5374
5375                 flush_signals(current);
5376
5377                 /* Note:
5378                  * -EINTR        (on meta) we got a signal
5379                  * -EAGAIN       (on meta) rcvtimeo expired
5380                  * -ECONNRESET   other side closed the connection
5381                  * -ERESTARTSYS  (on data) we got a signal
5382                  * rv <  0       other than above: unexpected error!
5383                  * rv == expected: full header or command
5384                  * rv <  expected: "woken" by signal during receive
5385                  * rv == 0       : "connection shut down by peer"
5386                  */
5387                 if (likely(rv > 0)) {
5388                         received += rv;
5389                         buf      += rv;
5390                 } else if (rv == 0) {
5391                         if (test_bit(DISCONNECT_SENT, &connection->flags)) {
5392                                 long t;
5393                                 rcu_read_lock();
5394                                 t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
5395                                 rcu_read_unlock();
5396
5397                                 t = wait_event_timeout(connection->ping_wait,
5398                                                        connection->cstate < C_WF_REPORT_PARAMS,
5399                                                        t);
5400                                 if (t)
5401                                         break;
5402                         }
5403                         drbd_err(connection, "meta connection shut down by peer.\n");
5404                         goto reconnect;
5405                 } else if (rv == -EAGAIN) {
5406                         /* If the data socket received something meanwhile,
5407                          * that is good enough: peer is still alive. */
5408                         if (time_after(connection->last_received,
5409                                 jiffies - connection->meta.socket->sk->sk_rcvtimeo))
5410                                 continue;
5411                         if (ping_timeout_active) {
5412                                 drbd_err(connection, "PingAck did not arrive in time.\n");
5413                                 goto reconnect;
5414                         }
5415                         set_bit(SEND_PING, &connection->flags);
5416                         continue;
5417                 } else if (rv == -EINTR) {
5418                         continue;
5419                 } else {
5420                         drbd_err(connection, "sock_recvmsg returned %d\n", rv);
5421                         goto reconnect;
5422                 }
5423
5424                 if (received == expect && cmd == NULL) {
5425                         if (decode_header(connection, connection->meta.rbuf, &pi))
5426                                 goto reconnect;
5427                         cmd = &asender_tbl[pi.cmd];
5428                         if (pi.cmd >= ARRAY_SIZE(asender_tbl) || !cmd->fn) {
5429                                 drbd_err(connection, "Unexpected meta packet %s (0x%04x)\n",
5430                                          cmdname(pi.cmd), pi.cmd);
5431                                 goto disconnect;
5432                         }
5433                         expect = header_size + cmd->pkt_size;
5434                         if (pi.size != expect - header_size) {
5435                                 drbd_err(connection, "Wrong packet size on meta (c: %d, l: %d)\n",
5436                                         pi.cmd, pi.size);
5437                                 goto reconnect;
5438                         }
5439                 }
5440                 if (received == expect) {
5441                         bool err;
5442
5443                         err = cmd->fn(connection, &pi);
5444                         if (err) {
5445                                 drbd_err(connection, "%pf failed\n", cmd->fn);
5446                                 goto reconnect;
5447                         }
5448
5449                         connection->last_received = jiffies;
5450
5451                         if (cmd == &asender_tbl[P_PING_ACK]) {
5452                                 /* restore idle timeout */
5453                                 connection->meta.socket->sk->sk_rcvtimeo = ping_int * HZ;
5454                                 ping_timeout_active = false;
5455                         }
5456
5457                         buf      = connection->meta.rbuf;
5458                         received = 0;
5459                         expect   = header_size;
5460                         cmd      = NULL;
5461                 }
5462         }
5463
5464         if (0) {
5465 reconnect:
5466                 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5467                 conn_md_sync(connection);
5468         }
5469         if (0) {
5470 disconnect:
5471                 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
5472         }
5473         clear_bit(SIGNAL_ASENDER, &connection->flags);
5474
5475         drbd_info(connection, "asender terminated\n");
5476
5477         return 0;
5478 }