4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
10 drbd is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2, or (at your option)
15 drbd is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
20 You should have received a copy of the GNU General Public License
21 along with drbd; see the file COPYING. If not, write to
22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
26 #include <linux/module.h>
28 #include <asm/uaccess.h>
31 #include <linux/drbd.h>
33 #include <linux/file.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/pkt_sched.h>
40 #define __KERNEL_SYSCALLS__
41 #include <linux/unistd.h>
42 #include <linux/vmalloc.h>
43 #include <linux/random.h>
44 #include <linux/string.h>
45 #include <linux/scatterlist.h>
47 #include "drbd_protocol.h"
65 static int drbd_do_features(struct drbd_connection *connection);
66 static int drbd_do_auth(struct drbd_connection *connection);
67 static int drbd_disconnected(struct drbd_peer_device *);
69 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *, struct drbd_epoch *, enum epoch_event);
70 static int e_end_block(struct drbd_work *, int);
73 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
76 * some helper functions to deal with single linked page lists,
77 * page->private being our "next" pointer.
80 /* If at least n pages are linked at head, get n pages off.
81 * Otherwise, don't modify head, and return NULL.
82 * Locking is the responsibility of the caller.
84 static struct page *page_chain_del(struct page **head, int n)
98 tmp = page_chain_next(page);
100 break; /* found sufficient pages */
102 /* insufficient pages, don't use any of them. */
107 /* add end of list marker for the returned list */
108 set_page_private(page, 0);
109 /* actual return value, and adjustment of head */
115 /* may be used outside of locks to find the tail of a (usually short)
116 * "private" page chain, before adding it back to a global chain head
117 * with page_chain_add() under a spinlock. */
118 static struct page *page_chain_tail(struct page *page, int *len)
122 while ((tmp = page_chain_next(page)))
129 static int page_chain_free(struct page *page)
133 page_chain_for_each_safe(page, tmp) {
140 static void page_chain_add(struct page **head,
141 struct page *chain_first, struct page *chain_last)
145 tmp = page_chain_tail(chain_first, NULL);
146 BUG_ON(tmp != chain_last);
149 /* add chain to head */
150 set_page_private(chain_last, (unsigned long)*head);
154 static struct page *__drbd_alloc_pages(struct drbd_device *device,
157 struct page *page = NULL;
158 struct page *tmp = NULL;
161 /* Yes, testing drbd_pp_vacant outside the lock is racy.
162 * So what. It saves a spin_lock. */
163 if (drbd_pp_vacant >= number) {
164 spin_lock(&drbd_pp_lock);
165 page = page_chain_del(&drbd_pp_pool, number);
167 drbd_pp_vacant -= number;
168 spin_unlock(&drbd_pp_lock);
173 /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
174 * "criss-cross" setup, that might cause write-out on some other DRBD,
175 * which in turn might block on the other node at this very place. */
176 for (i = 0; i < number; i++) {
177 tmp = alloc_page(GFP_TRY);
180 set_page_private(tmp, (unsigned long)page);
187 /* Not enough pages immediately available this time.
188 * No need to jump around here, drbd_alloc_pages will retry this
189 * function "soon". */
191 tmp = page_chain_tail(page, NULL);
192 spin_lock(&drbd_pp_lock);
193 page_chain_add(&drbd_pp_pool, page, tmp);
195 spin_unlock(&drbd_pp_lock);
200 static void reclaim_finished_net_peer_reqs(struct drbd_device *device,
201 struct list_head *to_be_freed)
203 struct drbd_peer_request *peer_req, *tmp;
205 /* The EEs are always appended to the end of the list. Since
206 they are sent in order over the wire, they have to finish
207 in order. As soon as we see the first not finished we can
208 stop to examine the list... */
210 list_for_each_entry_safe(peer_req, tmp, &device->net_ee, w.list) {
211 if (drbd_peer_req_has_active_page(peer_req))
213 list_move(&peer_req->w.list, to_be_freed);
217 static void drbd_kick_lo_and_reclaim_net(struct drbd_device *device)
219 LIST_HEAD(reclaimed);
220 struct drbd_peer_request *peer_req, *t;
222 spin_lock_irq(&device->resource->req_lock);
223 reclaim_finished_net_peer_reqs(device, &reclaimed);
224 spin_unlock_irq(&device->resource->req_lock);
226 list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
227 drbd_free_net_peer_req(device, peer_req);
231 * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
232 * @device: DRBD device.
233 * @number: number of pages requested
234 * @retry: whether to retry, if not enough pages are available right now
236 * Tries to allocate number pages, first from our own page pool, then from
237 * the kernel, unless this allocation would exceed the max_buffers setting.
238 * Possibly retry until DRBD frees sufficient pages somewhere else.
240 * Returns a page chain linked via page->private.
242 struct page *drbd_alloc_pages(struct drbd_peer_device *peer_device, unsigned int number,
245 struct drbd_device *device = peer_device->device;
246 struct page *page = NULL;
251 /* Yes, we may run up to @number over max_buffers. If we
252 * follow it strictly, the admin will get it wrong anyways. */
254 nc = rcu_dereference(peer_device->connection->net_conf);
255 mxb = nc ? nc->max_buffers : 1000000;
258 if (atomic_read(&device->pp_in_use) < mxb)
259 page = __drbd_alloc_pages(device, number);
261 while (page == NULL) {
262 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
264 drbd_kick_lo_and_reclaim_net(device);
266 if (atomic_read(&device->pp_in_use) < mxb) {
267 page = __drbd_alloc_pages(device, number);
275 if (signal_pending(current)) {
276 drbd_warn(device, "drbd_alloc_pages interrupted!\n");
282 finish_wait(&drbd_pp_wait, &wait);
285 atomic_add(number, &device->pp_in_use);
289 /* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
290 * Is also used from inside an other spin_lock_irq(&resource->req_lock);
291 * Either links the page chain back to the global pool,
292 * or returns all pages to the system. */
293 static void drbd_free_pages(struct drbd_device *device, struct page *page, int is_net)
295 atomic_t *a = is_net ? &device->pp_in_use_by_net : &device->pp_in_use;
301 if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count)
302 i = page_chain_free(page);
305 tmp = page_chain_tail(page, &i);
306 spin_lock(&drbd_pp_lock);
307 page_chain_add(&drbd_pp_pool, page, tmp);
309 spin_unlock(&drbd_pp_lock);
311 i = atomic_sub_return(i, a);
313 drbd_warn(device, "ASSERTION FAILED: %s: %d < 0\n",
314 is_net ? "pp_in_use_by_net" : "pp_in_use", i);
315 wake_up(&drbd_pp_wait);
319 You need to hold the req_lock:
320 _drbd_wait_ee_list_empty()
322 You must not have the req_lock:
324 drbd_alloc_peer_req()
325 drbd_free_peer_reqs()
327 drbd_finish_peer_reqs()
329 drbd_wait_ee_list_empty()
332 struct drbd_peer_request *
333 drbd_alloc_peer_req(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
334 unsigned int data_size, gfp_t gfp_mask) __must_hold(local)
336 struct drbd_device *device = peer_device->device;
337 struct drbd_peer_request *peer_req;
338 struct page *page = NULL;
339 unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
341 if (drbd_insert_fault(device, DRBD_FAULT_AL_EE))
344 peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
346 if (!(gfp_mask & __GFP_NOWARN))
347 drbd_err(device, "%s: allocation failed\n", __func__);
352 page = drbd_alloc_pages(peer_device, nr_pages, (gfp_mask & __GFP_WAIT));
357 drbd_clear_interval(&peer_req->i);
358 peer_req->i.size = data_size;
359 peer_req->i.sector = sector;
360 peer_req->i.local = false;
361 peer_req->i.waiting = false;
363 peer_req->epoch = NULL;
364 peer_req->peer_device = peer_device;
365 peer_req->pages = page;
366 atomic_set(&peer_req->pending_bios, 0);
369 * The block_id is opaque to the receiver. It is not endianness
370 * converted, and sent back to the sender unchanged.
372 peer_req->block_id = id;
377 mempool_free(peer_req, drbd_ee_mempool);
381 void __drbd_free_peer_req(struct drbd_device *device, struct drbd_peer_request *peer_req,
384 if (peer_req->flags & EE_HAS_DIGEST)
385 kfree(peer_req->digest);
386 drbd_free_pages(device, peer_req->pages, is_net);
387 D_ASSERT(device, atomic_read(&peer_req->pending_bios) == 0);
388 D_ASSERT(device, drbd_interval_empty(&peer_req->i));
389 mempool_free(peer_req, drbd_ee_mempool);
392 int drbd_free_peer_reqs(struct drbd_device *device, struct list_head *list)
394 LIST_HEAD(work_list);
395 struct drbd_peer_request *peer_req, *t;
397 int is_net = list == &device->net_ee;
399 spin_lock_irq(&device->resource->req_lock);
400 list_splice_init(list, &work_list);
401 spin_unlock_irq(&device->resource->req_lock);
403 list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
404 __drbd_free_peer_req(device, peer_req, is_net);
411 * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
413 static int drbd_finish_peer_reqs(struct drbd_device *device)
415 LIST_HEAD(work_list);
416 LIST_HEAD(reclaimed);
417 struct drbd_peer_request *peer_req, *t;
420 spin_lock_irq(&device->resource->req_lock);
421 reclaim_finished_net_peer_reqs(device, &reclaimed);
422 list_splice_init(&device->done_ee, &work_list);
423 spin_unlock_irq(&device->resource->req_lock);
425 list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
426 drbd_free_net_peer_req(device, peer_req);
428 /* possible callbacks here:
429 * e_end_block, and e_end_resync_block, e_send_superseded.
430 * all ignore the last argument.
432 list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
435 /* list_del not necessary, next/prev members not touched */
436 err2 = peer_req->w.cb(&peer_req->w, !!err);
439 drbd_free_peer_req(device, peer_req);
441 wake_up(&device->ee_wait);
446 static void _drbd_wait_ee_list_empty(struct drbd_device *device,
447 struct list_head *head)
451 /* avoids spin_lock/unlock
452 * and calling prepare_to_wait in the fast path */
453 while (!list_empty(head)) {
454 prepare_to_wait(&device->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
455 spin_unlock_irq(&device->resource->req_lock);
457 finish_wait(&device->ee_wait, &wait);
458 spin_lock_irq(&device->resource->req_lock);
462 static void drbd_wait_ee_list_empty(struct drbd_device *device,
463 struct list_head *head)
465 spin_lock_irq(&device->resource->req_lock);
466 _drbd_wait_ee_list_empty(device, head);
467 spin_unlock_irq(&device->resource->req_lock);
470 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
477 struct msghdr msg = {
479 .msg_iov = (struct iovec *)&iov,
480 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
486 rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
492 static int drbd_recv(struct drbd_connection *connection, void *buf, size_t size)
496 rv = drbd_recv_short(connection->data.socket, buf, size, 0);
499 if (rv == -ECONNRESET)
500 drbd_info(connection, "sock was reset by peer\n");
501 else if (rv != -ERESTARTSYS)
502 drbd_err(connection, "sock_recvmsg returned %d\n", rv);
503 } else if (rv == 0) {
504 if (test_bit(DISCONNECT_SENT, &connection->flags)) {
507 t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
510 t = wait_event_timeout(connection->ping_wait, connection->cstate < C_WF_REPORT_PARAMS, t);
515 drbd_info(connection, "sock was shut down by peer\n");
519 conn_request_state(connection, NS(conn, C_BROKEN_PIPE), CS_HARD);
525 static int drbd_recv_all(struct drbd_connection *connection, void *buf, size_t size)
529 err = drbd_recv(connection, buf, size);
538 static int drbd_recv_all_warn(struct drbd_connection *connection, void *buf, size_t size)
542 err = drbd_recv_all(connection, buf, size);
543 if (err && !signal_pending(current))
544 drbd_warn(connection, "short read (expected size %d)\n", (int)size);
549 * On individual connections, the socket buffer size must be set prior to the
550 * listen(2) or connect(2) calls in order to have it take effect.
551 * This is our wrapper to do so.
553 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
556 /* open coded SO_SNDBUF, SO_RCVBUF */
558 sock->sk->sk_sndbuf = snd;
559 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
562 sock->sk->sk_rcvbuf = rcv;
563 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
567 static struct socket *drbd_try_connect(struct drbd_connection *connection)
571 struct sockaddr_in6 src_in6;
572 struct sockaddr_in6 peer_in6;
574 int err, peer_addr_len, my_addr_len;
575 int sndbuf_size, rcvbuf_size, connect_int;
576 int disconnect_on_error = 1;
579 nc = rcu_dereference(connection->net_conf);
584 sndbuf_size = nc->sndbuf_size;
585 rcvbuf_size = nc->rcvbuf_size;
586 connect_int = nc->connect_int;
589 my_addr_len = min_t(int, connection->my_addr_len, sizeof(src_in6));
590 memcpy(&src_in6, &connection->my_addr, my_addr_len);
592 if (((struct sockaddr *)&connection->my_addr)->sa_family == AF_INET6)
593 src_in6.sin6_port = 0;
595 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
597 peer_addr_len = min_t(int, connection->peer_addr_len, sizeof(src_in6));
598 memcpy(&peer_in6, &connection->peer_addr, peer_addr_len);
600 what = "sock_create_kern";
601 err = sock_create_kern(((struct sockaddr *)&src_in6)->sa_family,
602 SOCK_STREAM, IPPROTO_TCP, &sock);
608 sock->sk->sk_rcvtimeo =
609 sock->sk->sk_sndtimeo = connect_int * HZ;
610 drbd_setbufsize(sock, sndbuf_size, rcvbuf_size);
612 /* explicitly bind to the configured IP as source IP
613 * for the outgoing connections.
614 * This is needed for multihomed hosts and to be
615 * able to use lo: interfaces for drbd.
616 * Make sure to use 0 as port number, so linux selects
617 * a free one dynamically.
619 what = "bind before connect";
620 err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len);
624 /* connect may fail, peer not yet available.
625 * stay C_WF_CONNECTION, don't go Disconnecting! */
626 disconnect_on_error = 0;
628 err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0);
637 /* timeout, busy, signal pending */
638 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
639 case EINTR: case ERESTARTSYS:
640 /* peer not (yet) available, network problem */
641 case ECONNREFUSED: case ENETUNREACH:
642 case EHOSTDOWN: case EHOSTUNREACH:
643 disconnect_on_error = 0;
646 drbd_err(connection, "%s failed, err = %d\n", what, err);
648 if (disconnect_on_error)
649 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
655 struct accept_wait_data {
656 struct drbd_connection *connection;
657 struct socket *s_listen;
658 struct completion door_bell;
659 void (*original_sk_state_change)(struct sock *sk);
663 static void drbd_incoming_connection(struct sock *sk)
665 struct accept_wait_data *ad = sk->sk_user_data;
666 void (*state_change)(struct sock *sk);
668 state_change = ad->original_sk_state_change;
669 if (sk->sk_state == TCP_ESTABLISHED)
670 complete(&ad->door_bell);
674 static int prepare_listen_socket(struct drbd_connection *connection, struct accept_wait_data *ad)
676 int err, sndbuf_size, rcvbuf_size, my_addr_len;
677 struct sockaddr_in6 my_addr;
678 struct socket *s_listen;
683 nc = rcu_dereference(connection->net_conf);
688 sndbuf_size = nc->sndbuf_size;
689 rcvbuf_size = nc->rcvbuf_size;
692 my_addr_len = min_t(int, connection->my_addr_len, sizeof(struct sockaddr_in6));
693 memcpy(&my_addr, &connection->my_addr, my_addr_len);
695 what = "sock_create_kern";
696 err = sock_create_kern(((struct sockaddr *)&my_addr)->sa_family,
697 SOCK_STREAM, IPPROTO_TCP, &s_listen);
703 s_listen->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
704 drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size);
706 what = "bind before listen";
707 err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len);
711 ad->s_listen = s_listen;
712 write_lock_bh(&s_listen->sk->sk_callback_lock);
713 ad->original_sk_state_change = s_listen->sk->sk_state_change;
714 s_listen->sk->sk_state_change = drbd_incoming_connection;
715 s_listen->sk->sk_user_data = ad;
716 write_unlock_bh(&s_listen->sk->sk_callback_lock);
719 err = s_listen->ops->listen(s_listen, 5);
726 sock_release(s_listen);
728 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
729 drbd_err(connection, "%s failed, err = %d\n", what, err);
730 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
737 static void unregister_state_change(struct sock *sk, struct accept_wait_data *ad)
739 write_lock_bh(&sk->sk_callback_lock);
740 sk->sk_state_change = ad->original_sk_state_change;
741 sk->sk_user_data = NULL;
742 write_unlock_bh(&sk->sk_callback_lock);
745 static struct socket *drbd_wait_for_connect(struct drbd_connection *connection, struct accept_wait_data *ad)
747 int timeo, connect_int, err = 0;
748 struct socket *s_estab = NULL;
752 nc = rcu_dereference(connection->net_conf);
757 connect_int = nc->connect_int;
760 timeo = connect_int * HZ;
761 /* 28.5% random jitter */
762 timeo += (prandom_u32() & 1) ? timeo / 7 : -timeo / 7;
764 err = wait_for_completion_interruptible_timeout(&ad->door_bell, timeo);
768 err = kernel_accept(ad->s_listen, &s_estab, 0);
770 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
771 drbd_err(connection, "accept failed, err = %d\n", err);
772 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
777 unregister_state_change(s_estab->sk, ad);
782 static int decode_header(struct drbd_connection *, void *, struct packet_info *);
784 static int send_first_packet(struct drbd_connection *connection, struct drbd_socket *sock,
785 enum drbd_packet cmd)
787 if (!conn_prepare_command(connection, sock))
789 return conn_send_command(connection, sock, cmd, 0, NULL, 0);
792 static int receive_first_packet(struct drbd_connection *connection, struct socket *sock)
794 unsigned int header_size = drbd_header_size(connection);
795 struct packet_info pi;
798 err = drbd_recv_short(sock, connection->data.rbuf, header_size, 0);
799 if (err != header_size) {
804 err = decode_header(connection, connection->data.rbuf, &pi);
811 * drbd_socket_okay() - Free the socket if its connection is not okay
812 * @sock: pointer to the pointer to the socket.
814 static int drbd_socket_okay(struct socket **sock)
822 rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
824 if (rr > 0 || rr == -EAGAIN) {
832 /* Gets called if a connection is established, or if a new minor gets created
834 int drbd_connected(struct drbd_peer_device *peer_device)
836 struct drbd_device *device = peer_device->device;
839 atomic_set(&device->packet_seq, 0);
840 device->peer_seq = 0;
842 device->state_mutex = peer_device->connection->agreed_pro_version < 100 ?
843 &peer_device->connection->cstate_mutex :
844 &device->own_state_mutex;
846 err = drbd_send_sync_param(peer_device);
848 err = drbd_send_sizes(peer_device, 0, 0);
850 err = drbd_send_uuids(peer_device);
852 err = drbd_send_current_state(peer_device);
853 clear_bit(USE_DEGR_WFC_T, &device->flags);
854 clear_bit(RESIZE_PENDING, &device->flags);
855 atomic_set(&device->ap_in_flight, 0);
856 mod_timer(&device->request_timer, jiffies + HZ); /* just start it here. */
862 * 1 yes, we have a valid connection
863 * 0 oops, did not work out, please try again
864 * -1 peer talks different language,
865 * no point in trying again, please go standalone.
866 * -2 We do not have a network config...
868 static int conn_connect(struct drbd_connection *connection)
870 struct drbd_socket sock, msock;
871 struct drbd_peer_device *peer_device;
873 int vnr, timeout, h, ok;
874 bool discard_my_data;
875 enum drbd_state_rv rv;
876 struct accept_wait_data ad = {
877 .connection = connection,
878 .door_bell = COMPLETION_INITIALIZER_ONSTACK(ad.door_bell),
881 clear_bit(DISCONNECT_SENT, &connection->flags);
882 if (conn_request_state(connection, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
885 mutex_init(&sock.mutex);
886 sock.sbuf = connection->data.sbuf;
887 sock.rbuf = connection->data.rbuf;
889 mutex_init(&msock.mutex);
890 msock.sbuf = connection->meta.sbuf;
891 msock.rbuf = connection->meta.rbuf;
894 /* Assume that the peer only understands protocol 80 until we know better. */
895 connection->agreed_pro_version = 80;
897 if (prepare_listen_socket(connection, &ad))
903 s = drbd_try_connect(connection);
907 send_first_packet(connection, &sock, P_INITIAL_DATA);
908 } else if (!msock.socket) {
909 clear_bit(RESOLVE_CONFLICTS, &connection->flags);
911 send_first_packet(connection, &msock, P_INITIAL_META);
913 drbd_err(connection, "Logic error in conn_connect()\n");
914 goto out_release_sockets;
918 if (sock.socket && msock.socket) {
920 nc = rcu_dereference(connection->net_conf);
921 timeout = nc->ping_timeo * HZ / 10;
923 schedule_timeout_interruptible(timeout);
924 ok = drbd_socket_okay(&sock.socket);
925 ok = drbd_socket_okay(&msock.socket) && ok;
931 s = drbd_wait_for_connect(connection, &ad);
933 int fp = receive_first_packet(connection, s);
934 drbd_socket_okay(&sock.socket);
935 drbd_socket_okay(&msock.socket);
939 drbd_warn(connection, "initial packet S crossed\n");
940 sock_release(sock.socket);
947 set_bit(RESOLVE_CONFLICTS, &connection->flags);
949 drbd_warn(connection, "initial packet M crossed\n");
950 sock_release(msock.socket);
957 drbd_warn(connection, "Error receiving initial packet\n");
960 if (prandom_u32() & 1)
965 if (connection->cstate <= C_DISCONNECTING)
966 goto out_release_sockets;
967 if (signal_pending(current)) {
968 flush_signals(current);
970 if (get_t_state(&connection->receiver) == EXITING)
971 goto out_release_sockets;
974 ok = drbd_socket_okay(&sock.socket);
975 ok = drbd_socket_okay(&msock.socket) && ok;
979 sock_release(ad.s_listen);
981 sock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
982 msock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
984 sock.socket->sk->sk_allocation = GFP_NOIO;
985 msock.socket->sk->sk_allocation = GFP_NOIO;
987 sock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
988 msock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE;
991 * sock.socket->sk->sk_sndtimeo = connection->net_conf->timeout*HZ/10;
992 * sock.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
993 * first set it to the P_CONNECTION_FEATURES timeout,
994 * which we set to 4x the configured ping_timeout. */
996 nc = rcu_dereference(connection->net_conf);
998 sock.socket->sk->sk_sndtimeo =
999 sock.socket->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10;
1001 msock.socket->sk->sk_rcvtimeo = nc->ping_int*HZ;
1002 timeout = nc->timeout * HZ / 10;
1003 discard_my_data = nc->discard_my_data;
1006 msock.socket->sk->sk_sndtimeo = timeout;
1008 /* we don't want delays.
1009 * we use TCP_CORK where appropriate, though */
1010 drbd_tcp_nodelay(sock.socket);
1011 drbd_tcp_nodelay(msock.socket);
1013 connection->data.socket = sock.socket;
1014 connection->meta.socket = msock.socket;
1015 connection->last_received = jiffies;
1017 h = drbd_do_features(connection);
1021 if (connection->cram_hmac_tfm) {
1022 /* drbd_request_state(device, NS(conn, WFAuth)); */
1023 switch (drbd_do_auth(connection)) {
1025 drbd_err(connection, "Authentication of peer failed\n");
1028 drbd_err(connection, "Authentication of peer failed, trying again.\n");
1033 connection->data.socket->sk->sk_sndtimeo = timeout;
1034 connection->data.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1036 if (drbd_send_protocol(connection) == -EOPNOTSUPP)
1039 set_bit(STATE_SENT, &connection->flags);
1042 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1043 struct drbd_device *device = peer_device->device;
1044 kref_get(&device->kref);
1047 /* Prevent a race between resync-handshake and
1048 * being promoted to Primary.
1050 * Grab and release the state mutex, so we know that any current
1051 * drbd_set_role() is finished, and any incoming drbd_set_role
1052 * will see the STATE_SENT flag, and wait for it to be cleared.
1054 mutex_lock(device->state_mutex);
1055 mutex_unlock(device->state_mutex);
1057 if (discard_my_data)
1058 set_bit(DISCARD_MY_DATA, &device->flags);
1060 clear_bit(DISCARD_MY_DATA, &device->flags);
1062 drbd_connected(peer_device);
1063 kref_put(&device->kref, drbd_destroy_device);
1068 rv = conn_request_state(connection, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE);
1069 if (rv < SS_SUCCESS || connection->cstate != C_WF_REPORT_PARAMS) {
1070 clear_bit(STATE_SENT, &connection->flags);
1074 drbd_thread_start(&connection->asender);
1076 mutex_lock(&connection->resource->conf_update);
1077 /* The discard_my_data flag is a single-shot modifier to the next
1078 * connection attempt, the handshake of which is now well underway.
1079 * No need for rcu style copying of the whole struct
1080 * just to clear a single value. */
1081 connection->net_conf->discard_my_data = 0;
1082 mutex_unlock(&connection->resource->conf_update);
1086 out_release_sockets:
1088 sock_release(ad.s_listen);
1090 sock_release(sock.socket);
1092 sock_release(msock.socket);
1096 static int decode_header(struct drbd_connection *connection, void *header, struct packet_info *pi)
1098 unsigned int header_size = drbd_header_size(connection);
1100 if (header_size == sizeof(struct p_header100) &&
1101 *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
1102 struct p_header100 *h = header;
1104 drbd_err(connection, "Header padding is not zero\n");
1107 pi->vnr = be16_to_cpu(h->volume);
1108 pi->cmd = be16_to_cpu(h->command);
1109 pi->size = be32_to_cpu(h->length);
1110 } else if (header_size == sizeof(struct p_header95) &&
1111 *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
1112 struct p_header95 *h = header;
1113 pi->cmd = be16_to_cpu(h->command);
1114 pi->size = be32_to_cpu(h->length);
1116 } else if (header_size == sizeof(struct p_header80) &&
1117 *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1118 struct p_header80 *h = header;
1119 pi->cmd = be16_to_cpu(h->command);
1120 pi->size = be16_to_cpu(h->length);
1123 drbd_err(connection, "Wrong magic value 0x%08x in protocol version %d\n",
1124 be32_to_cpu(*(__be32 *)header),
1125 connection->agreed_pro_version);
1128 pi->data = header + header_size;
1132 static int drbd_recv_header(struct drbd_connection *connection, struct packet_info *pi)
1134 void *buffer = connection->data.rbuf;
1137 err = drbd_recv_all_warn(connection, buffer, drbd_header_size(connection));
1141 err = decode_header(connection, buffer, pi);
1142 connection->last_received = jiffies;
1147 static void drbd_flush(struct drbd_connection *connection)
1150 struct drbd_peer_device *peer_device;
1153 if (connection->write_ordering >= WO_bdev_flush) {
1155 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1156 struct drbd_device *device = peer_device->device;
1158 if (!get_ldev(device))
1160 kref_get(&device->kref);
1163 rv = blkdev_issue_flush(device->ldev->backing_bdev,
1166 drbd_info(device, "local disk flush failed with status %d\n", rv);
1167 /* would rather check on EOPNOTSUPP, but that is not reliable.
1168 * don't try again for ANY return value != 0
1169 * if (rv == -EOPNOTSUPP) */
1170 drbd_bump_write_ordering(connection, WO_drain_io);
1173 kref_put(&device->kref, drbd_destroy_device);
1184 * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1185 * @device: DRBD device.
1186 * @epoch: Epoch object.
1189 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *connection,
1190 struct drbd_epoch *epoch,
1191 enum epoch_event ev)
1194 struct drbd_epoch *next_epoch;
1195 enum finish_epoch rv = FE_STILL_LIVE;
1197 spin_lock(&connection->epoch_lock);
1201 epoch_size = atomic_read(&epoch->epoch_size);
1203 switch (ev & ~EV_CLEANUP) {
1205 atomic_dec(&epoch->active);
1207 case EV_GOT_BARRIER_NR:
1208 set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1210 case EV_BECAME_LAST:
1215 if (epoch_size != 0 &&
1216 atomic_read(&epoch->active) == 0 &&
1217 (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) {
1218 if (!(ev & EV_CLEANUP)) {
1219 spin_unlock(&connection->epoch_lock);
1220 drbd_send_b_ack(epoch->connection, epoch->barrier_nr, epoch_size);
1221 spin_lock(&connection->epoch_lock);
1224 /* FIXME: dec unacked on connection, once we have
1225 * something to count pending connection packets in. */
1226 if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags))
1227 dec_unacked(epoch->connection);
1230 if (connection->current_epoch != epoch) {
1231 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1232 list_del(&epoch->list);
1233 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1234 connection->epochs--;
1237 if (rv == FE_STILL_LIVE)
1241 atomic_set(&epoch->epoch_size, 0);
1242 /* atomic_set(&epoch->active, 0); is already zero */
1243 if (rv == FE_STILL_LIVE)
1254 spin_unlock(&connection->epoch_lock);
1260 * drbd_bump_write_ordering() - Fall back to an other write ordering method
1261 * @connection: DRBD connection.
1262 * @wo: Write ordering method to try.
1264 void drbd_bump_write_ordering(struct drbd_connection *connection, enum write_ordering_e wo)
1266 struct disk_conf *dc;
1267 struct drbd_peer_device *peer_device;
1268 enum write_ordering_e pwo;
1270 static char *write_ordering_str[] = {
1272 [WO_drain_io] = "drain",
1273 [WO_bdev_flush] = "flush",
1276 pwo = connection->write_ordering;
1279 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1280 struct drbd_device *device = peer_device->device;
1282 if (!get_ldev_if_state(device, D_ATTACHING))
1284 dc = rcu_dereference(device->ldev->disk_conf);
1286 if (wo == WO_bdev_flush && !dc->disk_flushes)
1288 if (wo == WO_drain_io && !dc->disk_drain)
1293 connection->write_ordering = wo;
1294 if (pwo != connection->write_ordering || wo == WO_bdev_flush)
1295 drbd_info(connection, "Method to ensure write ordering: %s\n", write_ordering_str[connection->write_ordering]);
1299 * drbd_submit_peer_request()
1300 * @device: DRBD device.
1301 * @peer_req: peer request
1302 * @rw: flag field, see bio->bi_rw
1304 * May spread the pages to multiple bios,
1305 * depending on bio_add_page restrictions.
1307 * Returns 0 if all bios have been submitted,
1308 * -ENOMEM if we could not allocate enough bios,
1309 * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1310 * single page to an empty bio (which should never happen and likely indicates
1311 * that the lower level IO stack is in some way broken). This has been observed
1312 * on certain Xen deployments.
1314 /* TODO allocate from our own bio_set. */
1315 int drbd_submit_peer_request(struct drbd_device *device,
1316 struct drbd_peer_request *peer_req,
1317 const unsigned rw, const int fault_type)
1319 struct bio *bios = NULL;
1321 struct page *page = peer_req->pages;
1322 sector_t sector = peer_req->i.sector;
1323 unsigned ds = peer_req->i.size;
1324 unsigned n_bios = 0;
1325 unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
1328 /* In most cases, we will only need one bio. But in case the lower
1329 * level restrictions happen to be different at this offset on this
1330 * side than those of the sending peer, we may need to submit the
1331 * request in more than one bio.
1333 * Plain bio_alloc is good enough here, this is no DRBD internally
1334 * generated bio, but a bio allocated on behalf of the peer.
1337 bio = bio_alloc(GFP_NOIO, nr_pages);
1339 drbd_err(device, "submit_ee: Allocation of a bio failed\n");
1342 /* > peer_req->i.sector, unless this is the first bio */
1343 bio->bi_iter.bi_sector = sector;
1344 bio->bi_bdev = device->ldev->backing_bdev;
1346 bio->bi_private = peer_req;
1347 bio->bi_end_io = drbd_peer_request_endio;
1349 bio->bi_next = bios;
1353 page_chain_for_each(page) {
1354 unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1355 if (!bio_add_page(bio, page, len, 0)) {
1356 /* A single page must always be possible!
1357 * But in case it fails anyways,
1358 * we deal with it, and complain (below). */
1359 if (bio->bi_vcnt == 0) {
1361 "bio_add_page failed for len=%u, "
1362 "bi_vcnt=0 (bi_sector=%llu)\n",
1363 len, (uint64_t)bio->bi_iter.bi_sector);
1373 D_ASSERT(device, page == NULL);
1374 D_ASSERT(device, ds == 0);
1376 atomic_set(&peer_req->pending_bios, n_bios);
1379 bios = bios->bi_next;
1380 bio->bi_next = NULL;
1382 drbd_generic_make_request(device, fault_type, bio);
1389 bios = bios->bi_next;
1395 static void drbd_remove_epoch_entry_interval(struct drbd_device *device,
1396 struct drbd_peer_request *peer_req)
1398 struct drbd_interval *i = &peer_req->i;
1400 drbd_remove_interval(&device->write_requests, i);
1401 drbd_clear_interval(i);
1403 /* Wake up any processes waiting for this peer request to complete. */
1405 wake_up(&device->misc_wait);
1408 static void conn_wait_active_ee_empty(struct drbd_connection *connection)
1410 struct drbd_peer_device *peer_device;
1414 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1415 struct drbd_device *device = peer_device->device;
1417 kref_get(&device->kref);
1419 drbd_wait_ee_list_empty(device, &device->active_ee);
1420 kref_put(&device->kref, drbd_destroy_device);
1426 static struct drbd_peer_device *
1427 conn_peer_device(struct drbd_connection *connection, int volume_number)
1429 return idr_find(&connection->peer_devices, volume_number);
1432 static int receive_Barrier(struct drbd_connection *connection, struct packet_info *pi)
1435 struct p_barrier *p = pi->data;
1436 struct drbd_epoch *epoch;
1438 /* FIXME these are unacked on connection,
1439 * not a specific (peer)device.
1441 connection->current_epoch->barrier_nr = p->barrier;
1442 connection->current_epoch->connection = connection;
1443 rv = drbd_may_finish_epoch(connection, connection->current_epoch, EV_GOT_BARRIER_NR);
1445 /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1446 * the activity log, which means it would not be resynced in case the
1447 * R_PRIMARY crashes now.
1448 * Therefore we must send the barrier_ack after the barrier request was
1450 switch (connection->write_ordering) {
1452 if (rv == FE_RECYCLED)
1455 /* receiver context, in the writeout path of the other node.
1456 * avoid potential distributed deadlock */
1457 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1461 drbd_warn(connection, "Allocation of an epoch failed, slowing down\n");
1466 conn_wait_active_ee_empty(connection);
1467 drbd_flush(connection);
1469 if (atomic_read(&connection->current_epoch->epoch_size)) {
1470 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1477 drbd_err(connection, "Strangeness in connection->write_ordering %d\n", connection->write_ordering);
1482 atomic_set(&epoch->epoch_size, 0);
1483 atomic_set(&epoch->active, 0);
1485 spin_lock(&connection->epoch_lock);
1486 if (atomic_read(&connection->current_epoch->epoch_size)) {
1487 list_add(&epoch->list, &connection->current_epoch->list);
1488 connection->current_epoch = epoch;
1489 connection->epochs++;
1491 /* The current_epoch got recycled while we allocated this one... */
1494 spin_unlock(&connection->epoch_lock);
1499 /* used from receive_RSDataReply (recv_resync_read)
1500 * and from receive_Data */
1501 static struct drbd_peer_request *
1502 read_in_block(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
1503 int data_size) __must_hold(local)
1505 struct drbd_device *device = peer_device->device;
1506 const sector_t capacity = drbd_get_capacity(device->this_bdev);
1507 struct drbd_peer_request *peer_req;
1510 void *dig_in = peer_device->connection->int_dig_in;
1511 void *dig_vv = peer_device->connection->int_dig_vv;
1512 unsigned long *data;
1515 if (peer_device->connection->peer_integrity_tfm) {
1516 dgs = crypto_hash_digestsize(peer_device->connection->peer_integrity_tfm);
1518 * FIXME: Receive the incoming digest into the receive buffer
1519 * here, together with its struct p_data?
1521 err = drbd_recv_all_warn(peer_device->connection, dig_in, dgs);
1527 if (!expect(IS_ALIGNED(data_size, 512)))
1529 if (!expect(data_size <= DRBD_MAX_BIO_SIZE))
1532 /* even though we trust out peer,
1533 * we sometimes have to double check. */
1534 if (sector + (data_size>>9) > capacity) {
1535 drbd_err(device, "request from peer beyond end of local disk: "
1536 "capacity: %llus < sector: %llus + size: %u\n",
1537 (unsigned long long)capacity,
1538 (unsigned long long)sector, data_size);
1542 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1543 * "criss-cross" setup, that might cause write-out on some other DRBD,
1544 * which in turn might block on the other node at this very place. */
1545 peer_req = drbd_alloc_peer_req(peer_device, id, sector, data_size, GFP_NOIO);
1553 page = peer_req->pages;
1554 page_chain_for_each(page) {
1555 unsigned len = min_t(int, ds, PAGE_SIZE);
1557 err = drbd_recv_all_warn(peer_device->connection, data, len);
1558 if (drbd_insert_fault(device, DRBD_FAULT_RECEIVE)) {
1559 drbd_err(device, "Fault injection: Corrupting data on receive\n");
1560 data[0] = data[0] ^ (unsigned long)-1;
1564 drbd_free_peer_req(device, peer_req);
1571 drbd_csum_ee(peer_device->connection->peer_integrity_tfm, peer_req, dig_vv);
1572 if (memcmp(dig_in, dig_vv, dgs)) {
1573 drbd_err(device, "Digest integrity check FAILED: %llus +%u\n",
1574 (unsigned long long)sector, data_size);
1575 drbd_free_peer_req(device, peer_req);
1579 device->recv_cnt += data_size>>9;
1583 /* drbd_drain_block() just takes a data block
1584 * out of the socket input buffer, and discards it.
1586 static int drbd_drain_block(struct drbd_peer_device *peer_device, int data_size)
1595 page = drbd_alloc_pages(peer_device, 1, 1);
1599 unsigned int len = min_t(int, data_size, PAGE_SIZE);
1601 err = drbd_recv_all_warn(peer_device->connection, data, len);
1607 drbd_free_pages(peer_device->device, page, 0);
1611 static int recv_dless_read(struct drbd_peer_device *peer_device, struct drbd_request *req,
1612 sector_t sector, int data_size)
1614 struct bio_vec bvec;
1615 struct bvec_iter iter;
1617 int dgs, err, expect;
1618 void *dig_in = peer_device->connection->int_dig_in;
1619 void *dig_vv = peer_device->connection->int_dig_vv;
1622 if (peer_device->connection->peer_integrity_tfm) {
1623 dgs = crypto_hash_digestsize(peer_device->connection->peer_integrity_tfm);
1624 err = drbd_recv_all_warn(peer_device->connection, dig_in, dgs);
1630 /* optimistically update recv_cnt. if receiving fails below,
1631 * we disconnect anyways, and counters will be reset. */
1632 peer_device->device->recv_cnt += data_size>>9;
1634 bio = req->master_bio;
1635 D_ASSERT(peer_device->device, sector == bio->bi_iter.bi_sector);
1637 bio_for_each_segment(bvec, bio, iter) {
1638 void *mapped = kmap(bvec.bv_page) + bvec.bv_offset;
1639 expect = min_t(int, data_size, bvec.bv_len);
1640 err = drbd_recv_all_warn(peer_device->connection, mapped, expect);
1641 kunmap(bvec.bv_page);
1644 data_size -= expect;
1648 drbd_csum_bio(peer_device->connection->peer_integrity_tfm, bio, dig_vv);
1649 if (memcmp(dig_in, dig_vv, dgs)) {
1650 drbd_err(peer_device, "Digest integrity check FAILED. Broken NICs?\n");
1655 D_ASSERT(peer_device->device, data_size == 0);
1660 * e_end_resync_block() is called in asender context via
1661 * drbd_finish_peer_reqs().
1663 static int e_end_resync_block(struct drbd_work *w, int unused)
1665 struct drbd_peer_request *peer_req =
1666 container_of(w, struct drbd_peer_request, w);
1667 struct drbd_peer_device *peer_device = peer_req->peer_device;
1668 struct drbd_device *device = peer_device->device;
1669 sector_t sector = peer_req->i.sector;
1672 D_ASSERT(device, drbd_interval_empty(&peer_req->i));
1674 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1675 drbd_set_in_sync(device, sector, peer_req->i.size);
1676 err = drbd_send_ack(peer_device, P_RS_WRITE_ACK, peer_req);
1678 /* Record failure to sync */
1679 drbd_rs_failed_io(device, sector, peer_req->i.size);
1681 err = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
1683 dec_unacked(device);
1688 static int recv_resync_read(struct drbd_peer_device *peer_device, sector_t sector,
1689 int data_size) __releases(local)
1691 struct drbd_device *device = peer_device->device;
1692 struct drbd_peer_request *peer_req;
1694 peer_req = read_in_block(peer_device, ID_SYNCER, sector, data_size);
1698 dec_rs_pending(device);
1700 inc_unacked(device);
1701 /* corresponding dec_unacked() in e_end_resync_block()
1702 * respective _drbd_clear_done_ee */
1704 peer_req->w.cb = e_end_resync_block;
1706 spin_lock_irq(&device->resource->req_lock);
1707 list_add(&peer_req->w.list, &device->sync_ee);
1708 spin_unlock_irq(&device->resource->req_lock);
1710 atomic_add(data_size >> 9, &device->rs_sect_ev);
1711 if (drbd_submit_peer_request(device, peer_req, WRITE, DRBD_FAULT_RS_WR) == 0)
1714 /* don't care for the reason here */
1715 drbd_err(device, "submit failed, triggering re-connect\n");
1716 spin_lock_irq(&device->resource->req_lock);
1717 list_del(&peer_req->w.list);
1718 spin_unlock_irq(&device->resource->req_lock);
1720 drbd_free_peer_req(device, peer_req);
1726 static struct drbd_request *
1727 find_request(struct drbd_device *device, struct rb_root *root, u64 id,
1728 sector_t sector, bool missing_ok, const char *func)
1730 struct drbd_request *req;
1732 /* Request object according to our peer */
1733 req = (struct drbd_request *)(unsigned long)id;
1734 if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
1737 drbd_err(device, "%s: failed to find request 0x%lx, sector %llus\n", func,
1738 (unsigned long)id, (unsigned long long)sector);
1743 static int receive_DataReply(struct drbd_connection *connection, struct packet_info *pi)
1745 struct drbd_peer_device *peer_device;
1746 struct drbd_device *device;
1747 struct drbd_request *req;
1750 struct p_data *p = pi->data;
1752 peer_device = conn_peer_device(connection, pi->vnr);
1755 device = peer_device->device;
1757 sector = be64_to_cpu(p->sector);
1759 spin_lock_irq(&device->resource->req_lock);
1760 req = find_request(device, &device->read_requests, p->block_id, sector, false, __func__);
1761 spin_unlock_irq(&device->resource->req_lock);
1765 /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
1766 * special casing it there for the various failure cases.
1767 * still no race with drbd_fail_pending_reads */
1768 err = recv_dless_read(peer_device, req, sector, pi->size);
1770 req_mod(req, DATA_RECEIVED);
1771 /* else: nothing. handled from drbd_disconnect...
1772 * I don't think we may complete this just yet
1773 * in case we are "on-disconnect: freeze" */
1778 static int receive_RSDataReply(struct drbd_connection *connection, struct packet_info *pi)
1780 struct drbd_peer_device *peer_device;
1781 struct drbd_device *device;
1784 struct p_data *p = pi->data;
1786 peer_device = conn_peer_device(connection, pi->vnr);
1789 device = peer_device->device;
1791 sector = be64_to_cpu(p->sector);
1792 D_ASSERT(device, p->block_id == ID_SYNCER);
1794 if (get_ldev(device)) {
1795 /* data is submitted to disk within recv_resync_read.
1796 * corresponding put_ldev done below on error,
1797 * or in drbd_peer_request_endio. */
1798 err = recv_resync_read(peer_device, sector, pi->size);
1800 if (__ratelimit(&drbd_ratelimit_state))
1801 drbd_err(device, "Can not write resync data to local disk.\n");
1803 err = drbd_drain_block(peer_device, pi->size);
1805 drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
1808 atomic_add(pi->size >> 9, &device->rs_sect_in);
1813 static void restart_conflicting_writes(struct drbd_device *device,
1814 sector_t sector, int size)
1816 struct drbd_interval *i;
1817 struct drbd_request *req;
1819 drbd_for_each_overlap(i, &device->write_requests, sector, size) {
1822 req = container_of(i, struct drbd_request, i);
1823 if (req->rq_state & RQ_LOCAL_PENDING ||
1824 !(req->rq_state & RQ_POSTPONED))
1826 /* as it is RQ_POSTPONED, this will cause it to
1827 * be queued on the retry workqueue. */
1828 __req_mod(req, CONFLICT_RESOLVED, NULL);
1833 * e_end_block() is called in asender context via drbd_finish_peer_reqs().
1835 static int e_end_block(struct drbd_work *w, int cancel)
1837 struct drbd_peer_request *peer_req =
1838 container_of(w, struct drbd_peer_request, w);
1839 struct drbd_peer_device *peer_device = peer_req->peer_device;
1840 struct drbd_device *device = peer_device->device;
1841 sector_t sector = peer_req->i.sector;
1844 if (peer_req->flags & EE_SEND_WRITE_ACK) {
1845 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1846 pcmd = (device->state.conn >= C_SYNC_SOURCE &&
1847 device->state.conn <= C_PAUSED_SYNC_T &&
1848 peer_req->flags & EE_MAY_SET_IN_SYNC) ?
1849 P_RS_WRITE_ACK : P_WRITE_ACK;
1850 err = drbd_send_ack(peer_device, pcmd, peer_req);
1851 if (pcmd == P_RS_WRITE_ACK)
1852 drbd_set_in_sync(device, sector, peer_req->i.size);
1854 err = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
1855 /* we expect it to be marked out of sync anyways...
1856 * maybe assert this? */
1858 dec_unacked(device);
1860 /* we delete from the conflict detection hash _after_ we sent out the
1861 * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right. */
1862 if (peer_req->flags & EE_IN_INTERVAL_TREE) {
1863 spin_lock_irq(&device->resource->req_lock);
1864 D_ASSERT(device, !drbd_interval_empty(&peer_req->i));
1865 drbd_remove_epoch_entry_interval(device, peer_req);
1866 if (peer_req->flags & EE_RESTART_REQUESTS)
1867 restart_conflicting_writes(device, sector, peer_req->i.size);
1868 spin_unlock_irq(&device->resource->req_lock);
1870 D_ASSERT(device, drbd_interval_empty(&peer_req->i));
1872 drbd_may_finish_epoch(first_peer_device(device)->connection, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1877 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
1879 struct drbd_peer_request *peer_req =
1880 container_of(w, struct drbd_peer_request, w);
1881 struct drbd_peer_device *peer_device = peer_req->peer_device;
1884 err = drbd_send_ack(peer_device, ack, peer_req);
1885 dec_unacked(peer_device->device);
1890 static int e_send_superseded(struct drbd_work *w, int unused)
1892 return e_send_ack(w, P_SUPERSEDED);
1895 static int e_send_retry_write(struct drbd_work *w, int unused)
1897 struct drbd_peer_request *peer_req =
1898 container_of(w, struct drbd_peer_request, w);
1899 struct drbd_connection *connection = peer_req->peer_device->connection;
1901 return e_send_ack(w, connection->agreed_pro_version >= 100 ?
1902 P_RETRY_WRITE : P_SUPERSEDED);
1905 static bool seq_greater(u32 a, u32 b)
1908 * We assume 32-bit wrap-around here.
1909 * For 24-bit wrap-around, we would have to shift:
1912 return (s32)a - (s32)b > 0;
1915 static u32 seq_max(u32 a, u32 b)
1917 return seq_greater(a, b) ? a : b;
1920 static void update_peer_seq(struct drbd_peer_device *peer_device, unsigned int peer_seq)
1922 struct drbd_device *device = peer_device->device;
1923 unsigned int newest_peer_seq;
1925 if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)) {
1926 spin_lock(&device->peer_seq_lock);
1927 newest_peer_seq = seq_max(device->peer_seq, peer_seq);
1928 device->peer_seq = newest_peer_seq;
1929 spin_unlock(&device->peer_seq_lock);
1930 /* wake up only if we actually changed device->peer_seq */
1931 if (peer_seq == newest_peer_seq)
1932 wake_up(&device->seq_wait);
1936 static inline int overlaps(sector_t s1, int l1, sector_t s2, int l2)
1938 return !((s1 + (l1>>9) <= s2) || (s1 >= s2 + (l2>>9)));
1941 /* maybe change sync_ee into interval trees as well? */
1942 static bool overlapping_resync_write(struct drbd_device *device, struct drbd_peer_request *peer_req)
1944 struct drbd_peer_request *rs_req;
1947 spin_lock_irq(&device->resource->req_lock);
1948 list_for_each_entry(rs_req, &device->sync_ee, w.list) {
1949 if (overlaps(peer_req->i.sector, peer_req->i.size,
1950 rs_req->i.sector, rs_req->i.size)) {
1955 spin_unlock_irq(&device->resource->req_lock);
1960 /* Called from receive_Data.
1961 * Synchronize packets on sock with packets on msock.
1963 * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1964 * packet traveling on msock, they are still processed in the order they have
1967 * Note: we don't care for Ack packets overtaking P_DATA packets.
1969 * In case packet_seq is larger than device->peer_seq number, there are
1970 * outstanding packets on the msock. We wait for them to arrive.
1971 * In case we are the logically next packet, we update device->peer_seq
1972 * ourselves. Correctly handles 32bit wrap around.
1974 * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1975 * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1976 * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1977 * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1979 * returns 0 if we may process the packet,
1980 * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1981 static int wait_for_and_update_peer_seq(struct drbd_peer_device *peer_device, const u32 peer_seq)
1983 struct drbd_device *device = peer_device->device;
1988 if (!test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags))
1991 spin_lock(&device->peer_seq_lock);
1993 if (!seq_greater(peer_seq - 1, device->peer_seq)) {
1994 device->peer_seq = seq_max(device->peer_seq, peer_seq);
1998 if (signal_pending(current)) {
2004 tp = rcu_dereference(first_peer_device(device)->connection->net_conf)->two_primaries;
2010 /* Only need to wait if two_primaries is enabled */
2011 prepare_to_wait(&device->seq_wait, &wait, TASK_INTERRUPTIBLE);
2012 spin_unlock(&device->peer_seq_lock);
2014 timeout = rcu_dereference(peer_device->connection->net_conf)->ping_timeo*HZ/10;
2016 timeout = schedule_timeout(timeout);
2017 spin_lock(&device->peer_seq_lock);
2020 drbd_err(device, "Timed out waiting for missing ack packets; disconnecting\n");
2024 spin_unlock(&device->peer_seq_lock);
2025 finish_wait(&device->seq_wait, &wait);
2029 /* see also bio_flags_to_wire()
2030 * DRBD_REQ_*, because we need to semantically map the flags to data packet
2031 * flags and back. We may replicate to other kernel versions. */
2032 static unsigned long wire_flags_to_bio(struct drbd_device *device, u32 dpf)
2034 return (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
2035 (dpf & DP_FUA ? REQ_FUA : 0) |
2036 (dpf & DP_FLUSH ? REQ_FLUSH : 0) |
2037 (dpf & DP_DISCARD ? REQ_DISCARD : 0);
2040 static void fail_postponed_requests(struct drbd_device *device, sector_t sector,
2043 struct drbd_interval *i;
2046 drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2047 struct drbd_request *req;
2048 struct bio_and_error m;
2052 req = container_of(i, struct drbd_request, i);
2053 if (!(req->rq_state & RQ_POSTPONED))
2055 req->rq_state &= ~RQ_POSTPONED;
2056 __req_mod(req, NEG_ACKED, &m);
2057 spin_unlock_irq(&device->resource->req_lock);
2059 complete_master_bio(device, &m);
2060 spin_lock_irq(&device->resource->req_lock);
2065 static int handle_write_conflicts(struct drbd_device *device,
2066 struct drbd_peer_request *peer_req)
2068 struct drbd_connection *connection = first_peer_device(device)->connection;
2069 bool resolve_conflicts = test_bit(RESOLVE_CONFLICTS, &connection->flags);
2070 sector_t sector = peer_req->i.sector;
2071 const unsigned int size = peer_req->i.size;
2072 struct drbd_interval *i;
2077 * Inserting the peer request into the write_requests tree will prevent
2078 * new conflicting local requests from being added.
2080 drbd_insert_interval(&device->write_requests, &peer_req->i);
2083 drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2084 if (i == &peer_req->i)
2089 * Our peer has sent a conflicting remote request; this
2090 * should not happen in a two-node setup. Wait for the
2091 * earlier peer request to complete.
2093 err = drbd_wait_misc(device, i);
2099 equal = i->sector == sector && i->size == size;
2100 if (resolve_conflicts) {
2102 * If the peer request is fully contained within the
2103 * overlapping request, it can be considered overwritten
2104 * and thus superseded; otherwise, it will be retried
2105 * once all overlapping requests have completed.
2107 bool superseded = i->sector <= sector && i->sector +
2108 (i->size >> 9) >= sector + (size >> 9);
2111 drbd_alert(device, "Concurrent writes detected: "
2112 "local=%llus +%u, remote=%llus +%u, "
2113 "assuming %s came first\n",
2114 (unsigned long long)i->sector, i->size,
2115 (unsigned long long)sector, size,
2116 superseded ? "local" : "remote");
2118 inc_unacked(device);
2119 peer_req->w.cb = superseded ? e_send_superseded :
2121 list_add_tail(&peer_req->w.list, &device->done_ee);
2122 wake_asender(first_peer_device(device)->connection);
2127 struct drbd_request *req =
2128 container_of(i, struct drbd_request, i);
2131 drbd_alert(device, "Concurrent writes detected: "
2132 "local=%llus +%u, remote=%llus +%u\n",
2133 (unsigned long long)i->sector, i->size,
2134 (unsigned long long)sector, size);
2136 if (req->rq_state & RQ_LOCAL_PENDING ||
2137 !(req->rq_state & RQ_POSTPONED)) {
2139 * Wait for the node with the discard flag to
2140 * decide if this request has been superseded
2141 * or needs to be retried.
2142 * Requests that have been superseded will
2143 * disappear from the write_requests tree.
2145 * In addition, wait for the conflicting
2146 * request to finish locally before submitting
2147 * the conflicting peer request.
2149 err = drbd_wait_misc(device, &req->i);
2151 _conn_request_state(first_peer_device(device)->connection,
2152 NS(conn, C_TIMEOUT),
2154 fail_postponed_requests(device, sector, size);
2160 * Remember to restart the conflicting requests after
2161 * the new peer request has completed.
2163 peer_req->flags |= EE_RESTART_REQUESTS;
2170 drbd_remove_epoch_entry_interval(device, peer_req);
2174 /* mirrored write */
2175 static int receive_Data(struct drbd_connection *connection, struct packet_info *pi)
2177 struct drbd_peer_device *peer_device;
2178 struct drbd_device *device;
2180 struct drbd_peer_request *peer_req;
2181 struct p_data *p = pi->data;
2182 u32 peer_seq = be32_to_cpu(p->seq_num);
2187 peer_device = conn_peer_device(connection, pi->vnr);
2190 device = peer_device->device;
2192 if (!get_ldev(device)) {
2195 err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2196 drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
2197 atomic_inc(&connection->current_epoch->epoch_size);
2198 err2 = drbd_drain_block(peer_device, pi->size);
2205 * Corresponding put_ldev done either below (on various errors), or in
2206 * drbd_peer_request_endio, if we successfully submit the data at the
2207 * end of this function.
2210 sector = be64_to_cpu(p->sector);
2211 peer_req = read_in_block(peer_device, p->block_id, sector, pi->size);
2217 peer_req->w.cb = e_end_block;
2219 dp_flags = be32_to_cpu(p->dp_flags);
2220 rw |= wire_flags_to_bio(device, dp_flags);
2221 if (peer_req->pages == NULL) {
2222 D_ASSERT(device, peer_req->i.size == 0);
2223 D_ASSERT(device, dp_flags & DP_FLUSH);
2226 if (dp_flags & DP_MAY_SET_IN_SYNC)
2227 peer_req->flags |= EE_MAY_SET_IN_SYNC;
2229 spin_lock(&connection->epoch_lock);
2230 peer_req->epoch = connection->current_epoch;
2231 atomic_inc(&peer_req->epoch->epoch_size);
2232 atomic_inc(&peer_req->epoch->active);
2233 spin_unlock(&connection->epoch_lock);
2236 tp = rcu_dereference(peer_device->connection->net_conf)->two_primaries;
2239 peer_req->flags |= EE_IN_INTERVAL_TREE;
2240 err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2242 goto out_interrupted;
2243 spin_lock_irq(&device->resource->req_lock);
2244 err = handle_write_conflicts(device, peer_req);
2246 spin_unlock_irq(&device->resource->req_lock);
2247 if (err == -ENOENT) {
2251 goto out_interrupted;
2254 update_peer_seq(peer_device, peer_seq);
2255 spin_lock_irq(&device->resource->req_lock);
2257 list_add(&peer_req->w.list, &device->active_ee);
2258 spin_unlock_irq(&device->resource->req_lock);
2260 if (device->state.conn == C_SYNC_TARGET)
2261 wait_event(device->ee_wait, !overlapping_resync_write(device, peer_req));
2263 if (peer_device->connection->agreed_pro_version < 100) {
2265 switch (rcu_dereference(peer_device->connection->net_conf)->wire_protocol) {
2267 dp_flags |= DP_SEND_WRITE_ACK;
2270 dp_flags |= DP_SEND_RECEIVE_ACK;
2276 if (dp_flags & DP_SEND_WRITE_ACK) {
2277 peer_req->flags |= EE_SEND_WRITE_ACK;
2278 inc_unacked(device);
2279 /* corresponding dec_unacked() in e_end_block()
2280 * respective _drbd_clear_done_ee */
2283 if (dp_flags & DP_SEND_RECEIVE_ACK) {
2284 /* I really don't like it that the receiver thread
2285 * sends on the msock, but anyways */
2286 drbd_send_ack(first_peer_device(device), P_RECV_ACK, peer_req);
2289 if (device->state.pdsk < D_INCONSISTENT) {
2290 /* In case we have the only disk of the cluster, */
2291 drbd_set_out_of_sync(device, peer_req->i.sector, peer_req->i.size);
2292 peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2293 peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2294 drbd_al_begin_io(device, &peer_req->i, true);
2297 err = drbd_submit_peer_request(device, peer_req, rw, DRBD_FAULT_DT_WR);
2301 /* don't care for the reason here */
2302 drbd_err(device, "submit failed, triggering re-connect\n");
2303 spin_lock_irq(&device->resource->req_lock);
2304 list_del(&peer_req->w.list);
2305 drbd_remove_epoch_entry_interval(device, peer_req);
2306 spin_unlock_irq(&device->resource->req_lock);
2307 if (peer_req->flags & EE_CALL_AL_COMPLETE_IO)
2308 drbd_al_complete_io(device, &peer_req->i);
2311 drbd_may_finish_epoch(connection, peer_req->epoch, EV_PUT + EV_CLEANUP);
2313 drbd_free_peer_req(device, peer_req);
2317 /* We may throttle resync, if the lower device seems to be busy,
2318 * and current sync rate is above c_min_rate.
2320 * To decide whether or not the lower device is busy, we use a scheme similar
2321 * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2322 * (more than 64 sectors) of activity we cannot account for with our own resync
2323 * activity, it obviously is "busy".
2325 * The current sync rate used here uses only the most recent two step marks,
2326 * to have a short time average so we can react faster.
2328 int drbd_rs_should_slow_down(struct drbd_device *device, sector_t sector)
2330 struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk;
2331 unsigned long db, dt, dbdt;
2332 struct lc_element *tmp;
2335 unsigned int c_min_rate;
2338 c_min_rate = rcu_dereference(device->ldev->disk_conf)->c_min_rate;
2341 /* feature disabled? */
2342 if (c_min_rate == 0)
2345 spin_lock_irq(&device->al_lock);
2346 tmp = lc_find(device->resync, BM_SECT_TO_EXT(sector));
2348 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2349 if (test_bit(BME_PRIORITY, &bm_ext->flags)) {
2350 spin_unlock_irq(&device->al_lock);
2353 /* Do not slow down if app IO is already waiting for this extent */
2355 spin_unlock_irq(&device->al_lock);
2357 curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2358 (int)part_stat_read(&disk->part0, sectors[1]) -
2359 atomic_read(&device->rs_sect_ev);
2361 if (!device->rs_last_events || curr_events - device->rs_last_events > 64) {
2362 unsigned long rs_left;
2365 device->rs_last_events = curr_events;
2367 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2369 i = (device->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2371 if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
2372 rs_left = device->ov_left;
2374 rs_left = drbd_bm_total_weight(device) - device->rs_failed;
2376 dt = ((long)jiffies - (long)device->rs_mark_time[i]) / HZ;
2379 db = device->rs_mark_left[i] - rs_left;
2380 dbdt = Bit2KB(db/dt);
2382 if (dbdt > c_min_rate)
2389 static int receive_DataRequest(struct drbd_connection *connection, struct packet_info *pi)
2391 struct drbd_peer_device *peer_device;
2392 struct drbd_device *device;
2395 struct drbd_peer_request *peer_req;
2396 struct digest_info *di = NULL;
2398 unsigned int fault_type;
2399 struct p_block_req *p = pi->data;
2401 peer_device = conn_peer_device(connection, pi->vnr);
2404 device = peer_device->device;
2405 capacity = drbd_get_capacity(device->this_bdev);
2407 sector = be64_to_cpu(p->sector);
2408 size = be32_to_cpu(p->blksize);
2410 if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2411 drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2412 (unsigned long long)sector, size);
2415 if (sector + (size>>9) > capacity) {
2416 drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2417 (unsigned long long)sector, size);
2421 if (!get_ldev_if_state(device, D_UP_TO_DATE)) {
2424 case P_DATA_REQUEST:
2425 drbd_send_ack_rp(peer_device, P_NEG_DREPLY, p);
2427 case P_RS_DATA_REQUEST:
2428 case P_CSUM_RS_REQUEST:
2430 drbd_send_ack_rp(peer_device, P_NEG_RS_DREPLY , p);
2434 dec_rs_pending(device);
2435 drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size, ID_IN_SYNC);
2440 if (verb && __ratelimit(&drbd_ratelimit_state))
2441 drbd_err(device, "Can not satisfy peer's read request, "
2442 "no local data.\n");
2444 /* drain possibly payload */
2445 return drbd_drain_block(peer_device, pi->size);
2448 /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2449 * "criss-cross" setup, that might cause write-out on some other DRBD,
2450 * which in turn might block on the other node at this very place. */
2451 peer_req = drbd_alloc_peer_req(peer_device, p->block_id, sector, size, GFP_NOIO);
2458 case P_DATA_REQUEST:
2459 peer_req->w.cb = w_e_end_data_req;
2460 fault_type = DRBD_FAULT_DT_RD;
2461 /* application IO, don't drbd_rs_begin_io */
2464 case P_RS_DATA_REQUEST:
2465 peer_req->w.cb = w_e_end_rsdata_req;
2466 fault_type = DRBD_FAULT_RS_RD;
2467 /* used in the sector offset progress display */
2468 device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2472 case P_CSUM_RS_REQUEST:
2473 fault_type = DRBD_FAULT_RS_RD;
2474 di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2478 di->digest_size = pi->size;
2479 di->digest = (((char *)di)+sizeof(struct digest_info));
2481 peer_req->digest = di;
2482 peer_req->flags |= EE_HAS_DIGEST;
2484 if (drbd_recv_all(peer_device->connection, di->digest, pi->size))
2487 if (pi->cmd == P_CSUM_RS_REQUEST) {
2488 D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
2489 peer_req->w.cb = w_e_end_csum_rs_req;
2490 /* used in the sector offset progress display */
2491 device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2492 } else if (pi->cmd == P_OV_REPLY) {
2493 /* track progress, we may need to throttle */
2494 atomic_add(size >> 9, &device->rs_sect_in);
2495 peer_req->w.cb = w_e_end_ov_reply;
2496 dec_rs_pending(device);
2497 /* drbd_rs_begin_io done when we sent this request,
2498 * but accounting still needs to be done. */
2499 goto submit_for_resync;
2504 if (device->ov_start_sector == ~(sector_t)0 &&
2505 peer_device->connection->agreed_pro_version >= 90) {
2506 unsigned long now = jiffies;
2508 device->ov_start_sector = sector;
2509 device->ov_position = sector;
2510 device->ov_left = drbd_bm_bits(device) - BM_SECT_TO_BIT(sector);
2511 device->rs_total = device->ov_left;
2512 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2513 device->rs_mark_left[i] = device->ov_left;
2514 device->rs_mark_time[i] = now;
2516 drbd_info(device, "Online Verify start sector: %llu\n",
2517 (unsigned long long)sector);
2519 peer_req->w.cb = w_e_end_ov_req;
2520 fault_type = DRBD_FAULT_RS_RD;
2527 /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2528 * wrt the receiver, but it is not as straightforward as it may seem.
2529 * Various places in the resync start and stop logic assume resync
2530 * requests are processed in order, requeuing this on the worker thread
2531 * introduces a bunch of new code for synchronization between threads.
2533 * Unlimited throttling before drbd_rs_begin_io may stall the resync
2534 * "forever", throttling after drbd_rs_begin_io will lock that extent
2535 * for application writes for the same time. For now, just throttle
2536 * here, where the rest of the code expects the receiver to sleep for
2540 /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2541 * this defers syncer requests for some time, before letting at least
2542 * on request through. The resync controller on the receiving side
2543 * will adapt to the incoming rate accordingly.
2545 * We cannot throttle here if remote is Primary/SyncTarget:
2546 * we would also throttle its application reads.
2547 * In that case, throttling is done on the SyncTarget only.
2549 if (device->state.peer != R_PRIMARY && drbd_rs_should_slow_down(device, sector))
2550 schedule_timeout_uninterruptible(HZ/10);
2551 if (drbd_rs_begin_io(device, sector))
2555 atomic_add(size >> 9, &device->rs_sect_ev);
2558 inc_unacked(device);
2559 spin_lock_irq(&device->resource->req_lock);
2560 list_add_tail(&peer_req->w.list, &device->read_ee);
2561 spin_unlock_irq(&device->resource->req_lock);
2563 if (drbd_submit_peer_request(device, peer_req, READ, fault_type) == 0)
2566 /* don't care for the reason here */
2567 drbd_err(device, "submit failed, triggering re-connect\n");
2568 spin_lock_irq(&device->resource->req_lock);
2569 list_del(&peer_req->w.list);
2570 spin_unlock_irq(&device->resource->req_lock);
2571 /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2575 drbd_free_peer_req(device, peer_req);
2580 * drbd_asb_recover_0p - Recover after split-brain with no remaining primaries
2582 static int drbd_asb_recover_0p(struct drbd_peer_device *peer_device) __must_hold(local)
2584 struct drbd_device *device = peer_device->device;
2585 int self, peer, rv = -100;
2586 unsigned long ch_self, ch_peer;
2587 enum drbd_after_sb_p after_sb_0p;
2589 self = device->ldev->md.uuid[UI_BITMAP] & 1;
2590 peer = device->p_uuid[UI_BITMAP] & 1;
2592 ch_peer = device->p_uuid[UI_SIZE];
2593 ch_self = device->comm_bm_set;
2596 after_sb_0p = rcu_dereference(peer_device->connection->net_conf)->after_sb_0p;
2598 switch (after_sb_0p) {
2600 case ASB_DISCARD_SECONDARY:
2601 case ASB_CALL_HELPER:
2603 drbd_err(device, "Configuration error.\n");
2605 case ASB_DISCONNECT:
2607 case ASB_DISCARD_YOUNGER_PRI:
2608 if (self == 0 && peer == 1) {
2612 if (self == 1 && peer == 0) {
2616 /* Else fall through to one of the other strategies... */
2617 case ASB_DISCARD_OLDER_PRI:
2618 if (self == 0 && peer == 1) {
2622 if (self == 1 && peer == 0) {
2626 /* Else fall through to one of the other strategies... */
2627 drbd_warn(device, "Discard younger/older primary did not find a decision\n"
2628 "Using discard-least-changes instead\n");
2629 case ASB_DISCARD_ZERO_CHG:
2630 if (ch_peer == 0 && ch_self == 0) {
2631 rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
2635 if (ch_peer == 0) { rv = 1; break; }
2636 if (ch_self == 0) { rv = -1; break; }
2638 if (after_sb_0p == ASB_DISCARD_ZERO_CHG)
2640 case ASB_DISCARD_LEAST_CHG:
2641 if (ch_self < ch_peer)
2643 else if (ch_self > ch_peer)
2645 else /* ( ch_self == ch_peer ) */
2646 /* Well, then use something else. */
2647 rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
2650 case ASB_DISCARD_LOCAL:
2653 case ASB_DISCARD_REMOTE:
2661 * drbd_asb_recover_1p - Recover after split-brain with one remaining primary
2663 static int drbd_asb_recover_1p(struct drbd_peer_device *peer_device) __must_hold(local)
2665 struct drbd_device *device = peer_device->device;
2667 enum drbd_after_sb_p after_sb_1p;
2670 after_sb_1p = rcu_dereference(peer_device->connection->net_conf)->after_sb_1p;
2672 switch (after_sb_1p) {
2673 case ASB_DISCARD_YOUNGER_PRI:
2674 case ASB_DISCARD_OLDER_PRI:
2675 case ASB_DISCARD_LEAST_CHG:
2676 case ASB_DISCARD_LOCAL:
2677 case ASB_DISCARD_REMOTE:
2678 case ASB_DISCARD_ZERO_CHG:
2679 drbd_err(device, "Configuration error.\n");
2681 case ASB_DISCONNECT:
2684 hg = drbd_asb_recover_0p(peer_device);
2685 if (hg == -1 && device->state.role == R_SECONDARY)
2687 if (hg == 1 && device->state.role == R_PRIMARY)
2691 rv = drbd_asb_recover_0p(peer_device);
2693 case ASB_DISCARD_SECONDARY:
2694 return device->state.role == R_PRIMARY ? 1 : -1;
2695 case ASB_CALL_HELPER:
2696 hg = drbd_asb_recover_0p(peer_device);
2697 if (hg == -1 && device->state.role == R_PRIMARY) {
2698 enum drbd_state_rv rv2;
2700 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2701 * we might be here in C_WF_REPORT_PARAMS which is transient.
2702 * we do not need to wait for the after state change work either. */
2703 rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
2704 if (rv2 != SS_SUCCESS) {
2705 drbd_khelper(device, "pri-lost-after-sb");
2707 drbd_warn(device, "Successfully gave up primary role.\n");
2718 * drbd_asb_recover_2p - Recover after split-brain with two remaining primaries
2720 static int drbd_asb_recover_2p(struct drbd_peer_device *peer_device) __must_hold(local)
2722 struct drbd_device *device = peer_device->device;
2724 enum drbd_after_sb_p after_sb_2p;
2727 after_sb_2p = rcu_dereference(peer_device->connection->net_conf)->after_sb_2p;
2729 switch (after_sb_2p) {
2730 case ASB_DISCARD_YOUNGER_PRI:
2731 case ASB_DISCARD_OLDER_PRI:
2732 case ASB_DISCARD_LEAST_CHG:
2733 case ASB_DISCARD_LOCAL:
2734 case ASB_DISCARD_REMOTE:
2736 case ASB_DISCARD_SECONDARY:
2737 case ASB_DISCARD_ZERO_CHG:
2738 drbd_err(device, "Configuration error.\n");
2741 rv = drbd_asb_recover_0p(peer_device);
2743 case ASB_DISCONNECT:
2745 case ASB_CALL_HELPER:
2746 hg = drbd_asb_recover_0p(peer_device);
2748 enum drbd_state_rv rv2;
2750 /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2751 * we might be here in C_WF_REPORT_PARAMS which is transient.
2752 * we do not need to wait for the after state change work either. */
2753 rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
2754 if (rv2 != SS_SUCCESS) {
2755 drbd_khelper(device, "pri-lost-after-sb");
2757 drbd_warn(device, "Successfully gave up primary role.\n");
2767 static void drbd_uuid_dump(struct drbd_device *device, char *text, u64 *uuid,
2768 u64 bits, u64 flags)
2771 drbd_info(device, "%s uuid info vanished while I was looking!\n", text);
2774 drbd_info(device, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2776 (unsigned long long)uuid[UI_CURRENT],
2777 (unsigned long long)uuid[UI_BITMAP],
2778 (unsigned long long)uuid[UI_HISTORY_START],
2779 (unsigned long long)uuid[UI_HISTORY_END],
2780 (unsigned long long)bits,
2781 (unsigned long long)flags);
2785 100 after split brain try auto recover
2786 2 C_SYNC_SOURCE set BitMap
2787 1 C_SYNC_SOURCE use BitMap
2789 -1 C_SYNC_TARGET use BitMap
2790 -2 C_SYNC_TARGET set BitMap
2791 -100 after split brain, disconnect
2792 -1000 unrelated data
2793 -1091 requires proto 91
2794 -1096 requires proto 96
2796 static int drbd_uuid_compare(struct drbd_device *device, int *rule_nr) __must_hold(local)
2801 self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2802 peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
2805 if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2809 if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2810 peer != UUID_JUST_CREATED)
2814 if (self != UUID_JUST_CREATED &&
2815 (peer == UUID_JUST_CREATED || peer == (u64)0))
2819 int rct, dc; /* roles at crash time */
2821 if (device->p_uuid[UI_BITMAP] == (u64)0 && device->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2823 if (first_peer_device(device)->connection->agreed_pro_version < 91)
2826 if ((device->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2827 (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2828 drbd_info(device, "was SyncSource, missed the resync finished event, corrected myself:\n");
2829 drbd_uuid_move_history(device);
2830 device->ldev->md.uuid[UI_HISTORY_START] = device->ldev->md.uuid[UI_BITMAP];
2831 device->ldev->md.uuid[UI_BITMAP] = 0;
2833 drbd_uuid_dump(device, "self", device->ldev->md.uuid,
2834 device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
2837 drbd_info(device, "was SyncSource (peer failed to write sync_uuid)\n");
2844 if (device->ldev->md.uuid[UI_BITMAP] == (u64)0 && device->p_uuid[UI_BITMAP] != (u64)0) {
2846 if (first_peer_device(device)->connection->agreed_pro_version < 91)
2849 if ((device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2850 (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2851 drbd_info(device, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2853 device->p_uuid[UI_HISTORY_START + 1] = device->p_uuid[UI_HISTORY_START];
2854 device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_BITMAP];
2855 device->p_uuid[UI_BITMAP] = 0UL;
2857 drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
2860 drbd_info(device, "was SyncTarget (failed to write sync_uuid)\n");
2867 /* Common power [off|failure] */
2868 rct = (test_bit(CRASHED_PRIMARY, &device->flags) ? 1 : 0) +
2869 (device->p_uuid[UI_FLAGS] & 2);
2870 /* lowest bit is set when we were primary,
2871 * next bit (weight 2) is set when peer was primary */
2875 case 0: /* !self_pri && !peer_pri */ return 0;
2876 case 1: /* self_pri && !peer_pri */ return 1;
2877 case 2: /* !self_pri && peer_pri */ return -1;
2878 case 3: /* self_pri && peer_pri */
2879 dc = test_bit(RESOLVE_CONFLICTS, &first_peer_device(device)->connection->flags);
2885 peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
2890 peer = device->p_uuid[UI_HISTORY_START] & ~((u64)1);
2892 if (first_peer_device(device)->connection->agreed_pro_version < 96 ?
2893 (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
2894 (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
2895 peer + UUID_NEW_BM_OFFSET == (device->p_uuid[UI_BITMAP] & ~((u64)1))) {
2896 /* The last P_SYNC_UUID did not get though. Undo the last start of
2897 resync as sync source modifications of the peer's UUIDs. */
2899 if (first_peer_device(device)->connection->agreed_pro_version < 91)
2902 device->p_uuid[UI_BITMAP] = device->p_uuid[UI_HISTORY_START];
2903 device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_HISTORY_START + 1];
2905 drbd_info(device, "Lost last syncUUID packet, corrected:\n");
2906 drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
2913 self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2914 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2915 peer = device->p_uuid[i] & ~((u64)1);
2921 self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2922 peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
2927 self = device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2929 if (first_peer_device(device)->connection->agreed_pro_version < 96 ?
2930 (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
2931 (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
2932 self + UUID_NEW_BM_OFFSET == (device->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
2933 /* The last P_SYNC_UUID did not get though. Undo the last start of
2934 resync as sync source modifications of our UUIDs. */
2936 if (first_peer_device(device)->connection->agreed_pro_version < 91)
2939 __drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_HISTORY_START]);
2940 __drbd_uuid_set(device, UI_HISTORY_START, device->ldev->md.uuid[UI_HISTORY_START + 1]);
2942 drbd_info(device, "Last syncUUID did not get through, corrected:\n");
2943 drbd_uuid_dump(device, "self", device->ldev->md.uuid,
2944 device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
2952 peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
2953 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2954 self = device->ldev->md.uuid[i] & ~((u64)1);
2960 self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2961 peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
2962 if (self == peer && self != ((u64)0))
2966 for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2967 self = device->ldev->md.uuid[i] & ~((u64)1);
2968 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2969 peer = device->p_uuid[j] & ~((u64)1);
2978 /* drbd_sync_handshake() returns the new conn state on success, or
2979 CONN_MASK (-1) on failure.
2981 static enum drbd_conns drbd_sync_handshake(struct drbd_peer_device *peer_device,
2982 enum drbd_role peer_role,
2983 enum drbd_disk_state peer_disk) __must_hold(local)
2985 struct drbd_device *device = peer_device->device;
2986 enum drbd_conns rv = C_MASK;
2987 enum drbd_disk_state mydisk;
2988 struct net_conf *nc;
2989 int hg, rule_nr, rr_conflict, tentative;
2991 mydisk = device->state.disk;
2992 if (mydisk == D_NEGOTIATING)
2993 mydisk = device->new_state_tmp.disk;
2995 drbd_info(device, "drbd_sync_handshake:\n");
2997 spin_lock_irq(&device->ldev->md.uuid_lock);
2998 drbd_uuid_dump(device, "self", device->ldev->md.uuid, device->comm_bm_set, 0);
2999 drbd_uuid_dump(device, "peer", device->p_uuid,
3000 device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3002 hg = drbd_uuid_compare(device, &rule_nr);
3003 spin_unlock_irq(&device->ldev->md.uuid_lock);
3005 drbd_info(device, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
3008 drbd_alert(device, "Unrelated data, aborting!\n");
3012 drbd_alert(device, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
3016 if ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
3017 (peer_disk == D_INCONSISTENT && mydisk > D_INCONSISTENT)) {
3018 int f = (hg == -100) || abs(hg) == 2;
3019 hg = mydisk > D_INCONSISTENT ? 1 : -1;
3022 drbd_info(device, "Becoming sync %s due to disk states.\n",
3023 hg > 0 ? "source" : "target");
3027 drbd_khelper(device, "initial-split-brain");
3030 nc = rcu_dereference(peer_device->connection->net_conf);
3032 if (hg == 100 || (hg == -100 && nc->always_asbp)) {
3033 int pcount = (device->state.role == R_PRIMARY)
3034 + (peer_role == R_PRIMARY);
3035 int forced = (hg == -100);
3039 hg = drbd_asb_recover_0p(peer_device);
3042 hg = drbd_asb_recover_1p(peer_device);
3045 hg = drbd_asb_recover_2p(peer_device);
3048 if (abs(hg) < 100) {
3049 drbd_warn(device, "Split-Brain detected, %d primaries, "
3050 "automatically solved. Sync from %s node\n",
3051 pcount, (hg < 0) ? "peer" : "this");
3053 drbd_warn(device, "Doing a full sync, since"
3054 " UUIDs where ambiguous.\n");
3061 if (test_bit(DISCARD_MY_DATA, &device->flags) && !(device->p_uuid[UI_FLAGS]&1))
3063 if (!test_bit(DISCARD_MY_DATA, &device->flags) && (device->p_uuid[UI_FLAGS]&1))
3067 drbd_warn(device, "Split-Brain detected, manually solved. "
3068 "Sync from %s node\n",
3069 (hg < 0) ? "peer" : "this");
3071 rr_conflict = nc->rr_conflict;
3072 tentative = nc->tentative;
3076 /* FIXME this log message is not correct if we end up here
3077 * after an attempted attach on a diskless node.
3078 * We just refuse to attach -- well, we drop the "connection"
3079 * to that disk, in a way... */
3080 drbd_alert(device, "Split-Brain detected but unresolved, dropping connection!\n");
3081 drbd_khelper(device, "split-brain");
3085 if (hg > 0 && mydisk <= D_INCONSISTENT) {
3086 drbd_err(device, "I shall become SyncSource, but I am inconsistent!\n");
3090 if (hg < 0 && /* by intention we do not use mydisk here. */
3091 device->state.role == R_PRIMARY && device->state.disk >= D_CONSISTENT) {
3092 switch (rr_conflict) {
3093 case ASB_CALL_HELPER:
3094 drbd_khelper(device, "pri-lost");
3096 case ASB_DISCONNECT:
3097 drbd_err(device, "I shall become SyncTarget, but I am primary!\n");
3100 drbd_warn(device, "Becoming SyncTarget, violating the stable-data"
3105 if (tentative || test_bit(CONN_DRY_RUN, &peer_device->connection->flags)) {
3107 drbd_info(device, "dry-run connect: No resync, would become Connected immediately.\n");
3109 drbd_info(device, "dry-run connect: Would become %s, doing a %s resync.",
3110 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
3111 abs(hg) >= 2 ? "full" : "bit-map based");
3116 drbd_info(device, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
3117 if (drbd_bitmap_io(device, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
3118 BM_LOCKED_SET_ALLOWED))
3122 if (hg > 0) { /* become sync source. */
3124 } else if (hg < 0) { /* become sync target */
3128 if (drbd_bm_total_weight(device)) {
3129 drbd_info(device, "No resync, but %lu bits in bitmap!\n",
3130 drbd_bm_total_weight(device));
3137 static enum drbd_after_sb_p convert_after_sb(enum drbd_after_sb_p peer)
3139 /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
3140 if (peer == ASB_DISCARD_REMOTE)
3141 return ASB_DISCARD_LOCAL;
3143 /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
3144 if (peer == ASB_DISCARD_LOCAL)
3145 return ASB_DISCARD_REMOTE;
3147 /* everything else is valid if they are equal on both sides. */
3151 static int receive_protocol(struct drbd_connection *connection, struct packet_info *pi)
3153 struct p_protocol *p = pi->data;
3154 enum drbd_after_sb_p p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
3155 int p_proto, p_discard_my_data, p_two_primaries, cf;
3156 struct net_conf *nc, *old_net_conf, *new_net_conf = NULL;
3157 char integrity_alg[SHARED_SECRET_MAX] = "";
3158 struct crypto_hash *peer_integrity_tfm = NULL;
3159 void *int_dig_in = NULL, *int_dig_vv = NULL;
3161 p_proto = be32_to_cpu(p->protocol);
3162 p_after_sb_0p = be32_to_cpu(p->after_sb_0p);
3163 p_after_sb_1p = be32_to_cpu(p->after_sb_1p);
3164 p_after_sb_2p = be32_to_cpu(p->after_sb_2p);
3165 p_two_primaries = be32_to_cpu(p->two_primaries);
3166 cf = be32_to_cpu(p->conn_flags);
3167 p_discard_my_data = cf & CF_DISCARD_MY_DATA;
3169 if (connection->agreed_pro_version >= 87) {
3172 if (pi->size > sizeof(integrity_alg))
3174 err = drbd_recv_all(connection, integrity_alg, pi->size);
3177 integrity_alg[SHARED_SECRET_MAX - 1] = 0;
3180 if (pi->cmd != P_PROTOCOL_UPDATE) {
3181 clear_bit(CONN_DRY_RUN, &connection->flags);
3183 if (cf & CF_DRY_RUN)
3184 set_bit(CONN_DRY_RUN, &connection->flags);
3187 nc = rcu_dereference(connection->net_conf);
3189 if (p_proto != nc->wire_protocol) {
3190 drbd_err(connection, "incompatible %s settings\n", "protocol");
3191 goto disconnect_rcu_unlock;
3194 if (convert_after_sb(p_after_sb_0p) != nc->after_sb_0p) {
3195 drbd_err(connection, "incompatible %s settings\n", "after-sb-0pri");
3196 goto disconnect_rcu_unlock;
3199 if (convert_after_sb(p_after_sb_1p) != nc->after_sb_1p) {
3200 drbd_err(connection, "incompatible %s settings\n", "after-sb-1pri");
3201 goto disconnect_rcu_unlock;
3204 if (convert_after_sb(p_after_sb_2p) != nc->after_sb_2p) {
3205 drbd_err(connection, "incompatible %s settings\n", "after-sb-2pri");
3206 goto disconnect_rcu_unlock;
3209 if (p_discard_my_data && nc->discard_my_data) {
3210 drbd_err(connection, "incompatible %s settings\n", "discard-my-data");
3211 goto disconnect_rcu_unlock;
3214 if (p_two_primaries != nc->two_primaries) {
3215 drbd_err(connection, "incompatible %s settings\n", "allow-two-primaries");
3216 goto disconnect_rcu_unlock;
3219 if (strcmp(integrity_alg, nc->integrity_alg)) {
3220 drbd_err(connection, "incompatible %s settings\n", "data-integrity-alg");
3221 goto disconnect_rcu_unlock;
3227 if (integrity_alg[0]) {
3231 * We can only change the peer data integrity algorithm
3232 * here. Changing our own data integrity algorithm
3233 * requires that we send a P_PROTOCOL_UPDATE packet at
3234 * the same time; otherwise, the peer has no way to
3235 * tell between which packets the algorithm should
3239 peer_integrity_tfm = crypto_alloc_hash(integrity_alg, 0, CRYPTO_ALG_ASYNC);
3240 if (!peer_integrity_tfm) {
3241 drbd_err(connection, "peer data-integrity-alg %s not supported\n",
3246 hash_size = crypto_hash_digestsize(peer_integrity_tfm);
3247 int_dig_in = kmalloc(hash_size, GFP_KERNEL);
3248 int_dig_vv = kmalloc(hash_size, GFP_KERNEL);
3249 if (!(int_dig_in && int_dig_vv)) {
3250 drbd_err(connection, "Allocation of buffers for data integrity checking failed\n");
3255 new_net_conf = kmalloc(sizeof(struct net_conf), GFP_KERNEL);
3256 if (!new_net_conf) {
3257 drbd_err(connection, "Allocation of new net_conf failed\n");
3261 mutex_lock(&connection->data.mutex);
3262 mutex_lock(&connection->resource->conf_update);
3263 old_net_conf = connection->net_conf;
3264 *new_net_conf = *old_net_conf;
3266 new_net_conf->wire_protocol = p_proto;
3267 new_net_conf->after_sb_0p = convert_after_sb(p_after_sb_0p);
3268 new_net_conf->after_sb_1p = convert_after_sb(p_after_sb_1p);
3269 new_net_conf->after_sb_2p = convert_after_sb(p_after_sb_2p);
3270 new_net_conf->two_primaries = p_two_primaries;
3272 rcu_assign_pointer(connection->net_conf, new_net_conf);
3273 mutex_unlock(&connection->resource->conf_update);
3274 mutex_unlock(&connection->data.mutex);
3276 crypto_free_hash(connection->peer_integrity_tfm);
3277 kfree(connection->int_dig_in);
3278 kfree(connection->int_dig_vv);
3279 connection->peer_integrity_tfm = peer_integrity_tfm;
3280 connection->int_dig_in = int_dig_in;
3281 connection->int_dig_vv = int_dig_vv;
3283 if (strcmp(old_net_conf->integrity_alg, integrity_alg))
3284 drbd_info(connection, "peer data-integrity-alg: %s\n",
3285 integrity_alg[0] ? integrity_alg : "(none)");
3288 kfree(old_net_conf);
3291 disconnect_rcu_unlock:
3294 crypto_free_hash(peer_integrity_tfm);
3297 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
3302 * input: alg name, feature name
3303 * return: NULL (alg name was "")
3304 * ERR_PTR(error) if something goes wrong
3305 * or the crypto hash ptr, if it worked out ok. */
3307 struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_device *device,
3308 const char *alg, const char *name)
3310 struct crypto_hash *tfm;
3315 tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
3317 drbd_err(device, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3318 alg, name, PTR_ERR(tfm));
3324 static int ignore_remaining_packet(struct drbd_connection *connection, struct packet_info *pi)
3326 void *buffer = connection->data.rbuf;
3327 int size = pi->size;
3330 int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3331 s = drbd_recv(connection, buffer, s);
3345 * config_unknown_volume - device configuration command for unknown volume
3347 * When a device is added to an existing connection, the node on which the
3348 * device is added first will send configuration commands to its peer but the
3349 * peer will not know about the device yet. It will warn and ignore these
3350 * commands. Once the device is added on the second node, the second node will
3351 * send the same device configuration commands, but in the other direction.
3353 * (We can also end up here if drbd is misconfigured.)
3355 static int config_unknown_volume(struct drbd_connection *connection, struct packet_info *pi)
3357 drbd_warn(connection, "%s packet received for volume %u, which is not configured locally\n",
3358 cmdname(pi->cmd), pi->vnr);
3359 return ignore_remaining_packet(connection, pi);
3362 static int receive_SyncParam(struct drbd_connection *connection, struct packet_info *pi)
3364 struct drbd_peer_device *peer_device;
3365 struct drbd_device *device;
3366 struct p_rs_param_95 *p;
3367 unsigned int header_size, data_size, exp_max_sz;
3368 struct crypto_hash *verify_tfm = NULL;
3369 struct crypto_hash *csums_tfm = NULL;
3370 struct net_conf *old_net_conf, *new_net_conf = NULL;
3371 struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL;
3372 const int apv = connection->agreed_pro_version;
3373 struct fifo_buffer *old_plan = NULL, *new_plan = NULL;
3377 peer_device = conn_peer_device(connection, pi->vnr);
3379 return config_unknown_volume(connection, pi);
3380 device = peer_device->device;
3382 exp_max_sz = apv <= 87 ? sizeof(struct p_rs_param)
3383 : apv == 88 ? sizeof(struct p_rs_param)
3385 : apv <= 94 ? sizeof(struct p_rs_param_89)
3386 : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3388 if (pi->size > exp_max_sz) {
3389 drbd_err(device, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3390 pi->size, exp_max_sz);
3395 header_size = sizeof(struct p_rs_param);
3396 data_size = pi->size - header_size;
3397 } else if (apv <= 94) {
3398 header_size = sizeof(struct p_rs_param_89);
3399 data_size = pi->size - header_size;
3400 D_ASSERT(device, data_size == 0);
3402 header_size = sizeof(struct p_rs_param_95);
3403 data_size = pi->size - header_size;
3404 D_ASSERT(device, data_size == 0);
3407 /* initialize verify_alg and csums_alg */
3409 memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3411 err = drbd_recv_all(peer_device->connection, p, header_size);
3415 mutex_lock(&connection->resource->conf_update);
3416 old_net_conf = peer_device->connection->net_conf;
3417 if (get_ldev(device)) {
3418 new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3419 if (!new_disk_conf) {
3421 mutex_unlock(&connection->resource->conf_update);
3422 drbd_err(device, "Allocation of new disk_conf failed\n");
3426 old_disk_conf = device->ldev->disk_conf;
3427 *new_disk_conf = *old_disk_conf;
3429 new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate);
3434 if (data_size > SHARED_SECRET_MAX || data_size == 0) {
3435 drbd_err(device, "verify-alg of wrong size, "
3436 "peer wants %u, accepting only up to %u byte\n",
3437 data_size, SHARED_SECRET_MAX);
3442 err = drbd_recv_all(peer_device->connection, p->verify_alg, data_size);
3445 /* we expect NUL terminated string */
3446 /* but just in case someone tries to be evil */
3447 D_ASSERT(device, p->verify_alg[data_size-1] == 0);
3448 p->verify_alg[data_size-1] = 0;
3450 } else /* apv >= 89 */ {
3451 /* we still expect NUL terminated strings */
3452 /* but just in case someone tries to be evil */
3453 D_ASSERT(device, p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3454 D_ASSERT(device, p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3455 p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3456 p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3459 if (strcmp(old_net_conf->verify_alg, p->verify_alg)) {
3460 if (device->state.conn == C_WF_REPORT_PARAMS) {
3461 drbd_err(device, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3462 old_net_conf->verify_alg, p->verify_alg);
3465 verify_tfm = drbd_crypto_alloc_digest_safe(device,
3466 p->verify_alg, "verify-alg");
3467 if (IS_ERR(verify_tfm)) {
3473 if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) {
3474 if (device->state.conn == C_WF_REPORT_PARAMS) {
3475 drbd_err(device, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3476 old_net_conf->csums_alg, p->csums_alg);
3479 csums_tfm = drbd_crypto_alloc_digest_safe(device,
3480 p->csums_alg, "csums-alg");
3481 if (IS_ERR(csums_tfm)) {
3487 if (apv > 94 && new_disk_conf) {
3488 new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3489 new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target);
3490 new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target);
3491 new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate);
3493 fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ;
3494 if (fifo_size != device->rs_plan_s->size) {
3495 new_plan = fifo_alloc(fifo_size);
3497 drbd_err(device, "kmalloc of fifo_buffer failed");
3504 if (verify_tfm || csums_tfm) {
3505 new_net_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL);
3506 if (!new_net_conf) {
3507 drbd_err(device, "Allocation of new net_conf failed\n");
3511 *new_net_conf = *old_net_conf;
3514 strcpy(new_net_conf->verify_alg, p->verify_alg);
3515 new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3516 crypto_free_hash(peer_device->connection->verify_tfm);
3517 peer_device->connection->verify_tfm = verify_tfm;
3518 drbd_info(device, "using verify-alg: \"%s\"\n", p->verify_alg);
3521 strcpy(new_net_conf->csums_alg, p->csums_alg);
3522 new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3523 crypto_free_hash(peer_device->connection->csums_tfm);
3524 peer_device->connection->csums_tfm = csums_tfm;
3525 drbd_info(device, "using csums-alg: \"%s\"\n", p->csums_alg);
3527 rcu_assign_pointer(connection->net_conf, new_net_conf);
3531 if (new_disk_conf) {
3532 rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
3537 old_plan = device->rs_plan_s;
3538 rcu_assign_pointer(device->rs_plan_s, new_plan);
3541 mutex_unlock(&connection->resource->conf_update);
3544 kfree(old_net_conf);
3545 kfree(old_disk_conf);
3551 if (new_disk_conf) {
3553 kfree(new_disk_conf);
3555 mutex_unlock(&connection->resource->conf_update);
3560 if (new_disk_conf) {
3562 kfree(new_disk_conf);
3564 mutex_unlock(&connection->resource->conf_update);
3565 /* just for completeness: actually not needed,
3566 * as this is not reached if csums_tfm was ok. */
3567 crypto_free_hash(csums_tfm);
3568 /* but free the verify_tfm again, if csums_tfm did not work out */
3569 crypto_free_hash(verify_tfm);
3570 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3574 /* warn if the arguments differ by more than 12.5% */
3575 static void warn_if_differ_considerably(struct drbd_device *device,
3576 const char *s, sector_t a, sector_t b)
3579 if (a == 0 || b == 0)
3581 d = (a > b) ? (a - b) : (b - a);
3582 if (d > (a>>3) || d > (b>>3))
3583 drbd_warn(device, "Considerable difference in %s: %llus vs. %llus\n", s,
3584 (unsigned long long)a, (unsigned long long)b);
3587 static int receive_sizes(struct drbd_connection *connection, struct packet_info *pi)
3589 struct drbd_peer_device *peer_device;
3590 struct drbd_device *device;
3591 struct p_sizes *p = pi->data;
3592 enum determine_dev_size dd = DS_UNCHANGED;
3593 sector_t p_size, p_usize, my_usize;
3594 int ldsc = 0; /* local disk size changed */
3595 enum dds_flags ddsf;
3597 peer_device = conn_peer_device(connection, pi->vnr);
3599 return config_unknown_volume(connection, pi);
3600 device = peer_device->device;
3602 p_size = be64_to_cpu(p->d_size);
3603 p_usize = be64_to_cpu(p->u_size);
3605 /* just store the peer's disk size for now.
3606 * we still need to figure out whether we accept that. */
3607 device->p_size = p_size;
3609 if (get_ldev(device)) {
3611 my_usize = rcu_dereference(device->ldev->disk_conf)->disk_size;
3614 warn_if_differ_considerably(device, "lower level device sizes",
3615 p_size, drbd_get_max_capacity(device->ldev));
3616 warn_if_differ_considerably(device, "user requested size",
3619 /* if this is the first connect, or an otherwise expected
3620 * param exchange, choose the minimum */
3621 if (device->state.conn == C_WF_REPORT_PARAMS)
3622 p_usize = min_not_zero(my_usize, p_usize);
3624 /* Never shrink a device with usable data during connect.
3625 But allow online shrinking if we are connected. */
3626 if (drbd_new_dev_size(device, device->ldev, p_usize, 0) <
3627 drbd_get_capacity(device->this_bdev) &&
3628 device->state.disk >= D_OUTDATED &&
3629 device->state.conn < C_CONNECTED) {
3630 drbd_err(device, "The peer's disk size is too small!\n");
3631 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3636 if (my_usize != p_usize) {
3637 struct disk_conf *old_disk_conf, *new_disk_conf = NULL;
3639 new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3640 if (!new_disk_conf) {
3641 drbd_err(device, "Allocation of new disk_conf failed\n");
3646 mutex_lock(&connection->resource->conf_update);
3647 old_disk_conf = device->ldev->disk_conf;
3648 *new_disk_conf = *old_disk_conf;
3649 new_disk_conf->disk_size = p_usize;
3651 rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
3652 mutex_unlock(&connection->resource->conf_update);
3654 kfree(old_disk_conf);
3656 drbd_info(device, "Peer sets u_size to %lu sectors\n",
3657 (unsigned long)my_usize);
3663 ddsf = be16_to_cpu(p->dds_flags);
3664 if (get_ldev(device)) {
3665 dd = drbd_determine_dev_size(device, ddsf, NULL);
3669 drbd_md_sync(device);
3671 /* I am diskless, need to accept the peer's size. */
3672 drbd_set_my_capacity(device, p_size);
3675 device->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3676 drbd_reconsider_max_bio_size(device);
3678 if (get_ldev(device)) {
3679 if (device->ldev->known_size != drbd_get_capacity(device->ldev->backing_bdev)) {
3680 device->ldev->known_size = drbd_get_capacity(device->ldev->backing_bdev);
3687 if (device->state.conn > C_WF_REPORT_PARAMS) {
3688 if (be64_to_cpu(p->c_size) !=
3689 drbd_get_capacity(device->this_bdev) || ldsc) {
3690 /* we have different sizes, probably peer
3691 * needs to know my new size... */
3692 drbd_send_sizes(peer_device, 0, ddsf);
3694 if (test_and_clear_bit(RESIZE_PENDING, &device->flags) ||
3695 (dd == DS_GREW && device->state.conn == C_CONNECTED)) {
3696 if (device->state.pdsk >= D_INCONSISTENT &&
3697 device->state.disk >= D_INCONSISTENT) {
3698 if (ddsf & DDSF_NO_RESYNC)
3699 drbd_info(device, "Resync of new storage suppressed with --assume-clean\n");
3701 resync_after_online_grow(device);
3703 set_bit(RESYNC_AFTER_NEG, &device->flags);
3710 static int receive_uuids(struct drbd_connection *connection, struct packet_info *pi)
3712 struct drbd_peer_device *peer_device;
3713 struct drbd_device *device;
3714 struct p_uuids *p = pi->data;
3716 int i, updated_uuids = 0;
3718 peer_device = conn_peer_device(connection, pi->vnr);
3720 return config_unknown_volume(connection, pi);
3721 device = peer_device->device;
3723 p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3725 drbd_err(device, "kmalloc of p_uuid failed\n");
3729 for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3730 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3732 kfree(device->p_uuid);
3733 device->p_uuid = p_uuid;
3735 if (device->state.conn < C_CONNECTED &&
3736 device->state.disk < D_INCONSISTENT &&
3737 device->state.role == R_PRIMARY &&
3738 (device->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3739 drbd_err(device, "Can only connect to data with current UUID=%016llX\n",
3740 (unsigned long long)device->ed_uuid);
3741 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3745 if (get_ldev(device)) {
3746 int skip_initial_sync =
3747 device->state.conn == C_CONNECTED &&
3748 peer_device->connection->agreed_pro_version >= 90 &&
3749 device->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3750 (p_uuid[UI_FLAGS] & 8);
3751 if (skip_initial_sync) {
3752 drbd_info(device, "Accepted new current UUID, preparing to skip initial sync\n");
3753 drbd_bitmap_io(device, &drbd_bmio_clear_n_write,
3754 "clear_n_write from receive_uuids",
3755 BM_LOCKED_TEST_ALLOWED);
3756 _drbd_uuid_set(device, UI_CURRENT, p_uuid[UI_CURRENT]);
3757 _drbd_uuid_set(device, UI_BITMAP, 0);
3758 _drbd_set_state(_NS2(device, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3760 drbd_md_sync(device);
3764 } else if (device->state.disk < D_INCONSISTENT &&
3765 device->state.role == R_PRIMARY) {
3766 /* I am a diskless primary, the peer just created a new current UUID
3768 updated_uuids = drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
3771 /* Before we test for the disk state, we should wait until an eventually
3772 ongoing cluster wide state change is finished. That is important if
3773 we are primary and are detaching from our disk. We need to see the
3774 new disk state... */
3775 mutex_lock(device->state_mutex);
3776 mutex_unlock(device->state_mutex);
3777 if (device->state.conn >= C_CONNECTED && device->state.disk < D_INCONSISTENT)
3778 updated_uuids |= drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
3781 drbd_print_uuids(device, "receiver updated UUIDs to");
3787 * convert_state() - Converts the peer's view of the cluster state to our point of view
3788 * @ps: The state as seen by the peer.
3790 static union drbd_state convert_state(union drbd_state ps)
3792 union drbd_state ms;
3794 static enum drbd_conns c_tab[] = {
3795 [C_WF_REPORT_PARAMS] = C_WF_REPORT_PARAMS,
3796 [C_CONNECTED] = C_CONNECTED,
3798 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3799 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3800 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3801 [C_VERIFY_S] = C_VERIFY_T,
3807 ms.conn = c_tab[ps.conn];
3812 ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3817 static int receive_req_state(struct drbd_connection *connection, struct packet_info *pi)
3819 struct drbd_peer_device *peer_device;
3820 struct drbd_device *device;
3821 struct p_req_state *p = pi->data;
3822 union drbd_state mask, val;
3823 enum drbd_state_rv rv;
3825 peer_device = conn_peer_device(connection, pi->vnr);
3828 device = peer_device->device;
3830 mask.i = be32_to_cpu(p->mask);
3831 val.i = be32_to_cpu(p->val);
3833 if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags) &&
3834 mutex_is_locked(device->state_mutex)) {
3835 drbd_send_sr_reply(peer_device, SS_CONCURRENT_ST_CHG);
3839 mask = convert_state(mask);
3840 val = convert_state(val);
3842 rv = drbd_change_state(device, CS_VERBOSE, mask, val);
3843 drbd_send_sr_reply(peer_device, rv);
3845 drbd_md_sync(device);
3850 static int receive_req_conn_state(struct drbd_connection *connection, struct packet_info *pi)
3852 struct p_req_state *p = pi->data;
3853 union drbd_state mask, val;
3854 enum drbd_state_rv rv;
3856 mask.i = be32_to_cpu(p->mask);
3857 val.i = be32_to_cpu(p->val);
3859 if (test_bit(RESOLVE_CONFLICTS, &connection->flags) &&
3860 mutex_is_locked(&connection->cstate_mutex)) {
3861 conn_send_sr_reply(connection, SS_CONCURRENT_ST_CHG);
3865 mask = convert_state(mask);
3866 val = convert_state(val);
3868 rv = conn_request_state(connection, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
3869 conn_send_sr_reply(connection, rv);
3874 static int receive_state(struct drbd_connection *connection, struct packet_info *pi)
3876 struct drbd_peer_device *peer_device;
3877 struct drbd_device *device;
3878 struct p_state *p = pi->data;
3879 union drbd_state os, ns, peer_state;
3880 enum drbd_disk_state real_peer_disk;
3881 enum chg_state_flags cs_flags;
3884 peer_device = conn_peer_device(connection, pi->vnr);
3886 return config_unknown_volume(connection, pi);
3887 device = peer_device->device;
3889 peer_state.i = be32_to_cpu(p->state);
3891 real_peer_disk = peer_state.disk;
3892 if (peer_state.disk == D_NEGOTIATING) {
3893 real_peer_disk = device->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3894 drbd_info(device, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3897 spin_lock_irq(&device->resource->req_lock);
3899 os = ns = drbd_read_state(device);
3900 spin_unlock_irq(&device->resource->req_lock);
3902 /* If some other part of the code (asender thread, timeout)
3903 * already decided to close the connection again,
3904 * we must not "re-establish" it here. */
3905 if (os.conn <= C_TEAR_DOWN)
3908 /* If this is the "end of sync" confirmation, usually the peer disk
3909 * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits
3910 * set) resync started in PausedSyncT, or if the timing of pause-/
3911 * unpause-sync events has been "just right", the peer disk may
3912 * transition from D_CONSISTENT to D_UP_TO_DATE as well.
3914 if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) &&
3915 real_peer_disk == D_UP_TO_DATE &&
3916 os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
3917 /* If we are (becoming) SyncSource, but peer is still in sync
3918 * preparation, ignore its uptodate-ness to avoid flapping, it
3919 * will change to inconsistent once the peer reaches active
3921 * It may have changed syncer-paused flags, however, so we
3922 * cannot ignore this completely. */
3923 if (peer_state.conn > C_CONNECTED &&
3924 peer_state.conn < C_SYNC_SOURCE)
3925 real_peer_disk = D_INCONSISTENT;
3927 /* if peer_state changes to connected at the same time,
3928 * it explicitly notifies us that it finished resync.
3929 * Maybe we should finish it up, too? */
3930 else if (os.conn >= C_SYNC_SOURCE &&
3931 peer_state.conn == C_CONNECTED) {
3932 if (drbd_bm_total_weight(device) <= device->rs_failed)
3933 drbd_resync_finished(device);
3938 /* explicit verify finished notification, stop sector reached. */
3939 if (os.conn == C_VERIFY_T && os.disk == D_UP_TO_DATE &&
3940 peer_state.conn == C_CONNECTED && real_peer_disk == D_UP_TO_DATE) {
3941 ov_out_of_sync_print(device);
3942 drbd_resync_finished(device);
3946 /* peer says his disk is inconsistent, while we think it is uptodate,
3947 * and this happens while the peer still thinks we have a sync going on,
3948 * but we think we are already done with the sync.
3949 * We ignore this to avoid flapping pdsk.
3950 * This should not happen, if the peer is a recent version of drbd. */
3951 if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
3952 os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
3953 real_peer_disk = D_UP_TO_DATE;
3955 if (ns.conn == C_WF_REPORT_PARAMS)
3956 ns.conn = C_CONNECTED;
3958 if (peer_state.conn == C_AHEAD)
3961 if (device->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3962 get_ldev_if_state(device, D_NEGOTIATING)) {
3963 int cr; /* consider resync */
3965 /* if we established a new connection */
3966 cr = (os.conn < C_CONNECTED);
3967 /* if we had an established connection
3968 * and one of the nodes newly attaches a disk */
3969 cr |= (os.conn == C_CONNECTED &&
3970 (peer_state.disk == D_NEGOTIATING ||
3971 os.disk == D_NEGOTIATING));
3972 /* if we have both been inconsistent, and the peer has been
3973 * forced to be UpToDate with --overwrite-data */
3974 cr |= test_bit(CONSIDER_RESYNC, &device->flags);
3975 /* if we had been plain connected, and the admin requested to
3976 * start a sync by "invalidate" or "invalidate-remote" */
3977 cr |= (os.conn == C_CONNECTED &&
3978 (peer_state.conn >= C_STARTING_SYNC_S &&
3979 peer_state.conn <= C_WF_BITMAP_T));
3982 ns.conn = drbd_sync_handshake(peer_device, peer_state.role, real_peer_disk);
3985 if (ns.conn == C_MASK) {
3986 ns.conn = C_CONNECTED;
3987 if (device->state.disk == D_NEGOTIATING) {
3988 drbd_force_state(device, NS(disk, D_FAILED));
3989 } else if (peer_state.disk == D_NEGOTIATING) {
3990 drbd_err(device, "Disk attach process on the peer node was aborted.\n");
3991 peer_state.disk = D_DISKLESS;
3992 real_peer_disk = D_DISKLESS;
3994 if (test_and_clear_bit(CONN_DRY_RUN, &peer_device->connection->flags))
3996 D_ASSERT(device, os.conn == C_WF_REPORT_PARAMS);
3997 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4003 spin_lock_irq(&device->resource->req_lock);
4004 if (os.i != drbd_read_state(device).i)
4006 clear_bit(CONSIDER_RESYNC, &device->flags);
4007 ns.peer = peer_state.role;
4008 ns.pdsk = real_peer_disk;
4009 ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
4010 if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
4011 ns.disk = device->new_state_tmp.disk;
4012 cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
4013 if (ns.pdsk == D_CONSISTENT && drbd_suspended(device) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
4014 test_bit(NEW_CUR_UUID, &device->flags)) {
4015 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
4016 for temporal network outages! */
4017 spin_unlock_irq(&device->resource->req_lock);
4018 drbd_err(device, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
4019 tl_clear(peer_device->connection);
4020 drbd_uuid_new_current(device);
4021 clear_bit(NEW_CUR_UUID, &device->flags);
4022 conn_request_state(peer_device->connection, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
4025 rv = _drbd_set_state(device, ns, cs_flags, NULL);
4026 ns = drbd_read_state(device);
4027 spin_unlock_irq(&device->resource->req_lock);
4029 if (rv < SS_SUCCESS) {
4030 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4034 if (os.conn > C_WF_REPORT_PARAMS) {
4035 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
4036 peer_state.disk != D_NEGOTIATING ) {
4037 /* we want resync, peer has not yet decided to sync... */
4038 /* Nowadays only used when forcing a node into primary role and
4039 setting its disk to UpToDate with that */
4040 drbd_send_uuids(peer_device);
4041 drbd_send_current_state(peer_device);
4045 clear_bit(DISCARD_MY_DATA, &device->flags);
4047 drbd_md_sync(device); /* update connected indicator, la_size_sect, ... */
4052 static int receive_sync_uuid(struct drbd_connection *connection, struct packet_info *pi)
4054 struct drbd_peer_device *peer_device;
4055 struct drbd_device *device;
4056 struct p_rs_uuid *p = pi->data;
4058 peer_device = conn_peer_device(connection, pi->vnr);
4061 device = peer_device->device;
4063 wait_event(device->misc_wait,
4064 device->state.conn == C_WF_SYNC_UUID ||
4065 device->state.conn == C_BEHIND ||
4066 device->state.conn < C_CONNECTED ||
4067 device->state.disk < D_NEGOTIATING);
4069 /* D_ASSERT(device, device->state.conn == C_WF_SYNC_UUID ); */
4071 /* Here the _drbd_uuid_ functions are right, current should
4072 _not_ be rotated into the history */
4073 if (get_ldev_if_state(device, D_NEGOTIATING)) {
4074 _drbd_uuid_set(device, UI_CURRENT, be64_to_cpu(p->uuid));
4075 _drbd_uuid_set(device, UI_BITMAP, 0UL);
4077 drbd_print_uuids(device, "updated sync uuid");
4078 drbd_start_resync(device, C_SYNC_TARGET);
4082 drbd_err(device, "Ignoring SyncUUID packet!\n");
4088 * receive_bitmap_plain
4090 * Return 0 when done, 1 when another iteration is needed, and a negative error
4091 * code upon failure.
4094 receive_bitmap_plain(struct drbd_peer_device *peer_device, unsigned int size,
4095 unsigned long *p, struct bm_xfer_ctx *c)
4097 unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
4098 drbd_header_size(peer_device->connection);
4099 unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
4100 c->bm_words - c->word_offset);
4101 unsigned int want = num_words * sizeof(*p);
4105 drbd_err(peer_device, "%s:want (%u) != size (%u)\n", __func__, want, size);
4110 err = drbd_recv_all(peer_device->connection, p, want);
4114 drbd_bm_merge_lel(peer_device->device, c->word_offset, num_words, p);
4116 c->word_offset += num_words;
4117 c->bit_offset = c->word_offset * BITS_PER_LONG;
4118 if (c->bit_offset > c->bm_bits)
4119 c->bit_offset = c->bm_bits;
4124 static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
4126 return (enum drbd_bitmap_code)(p->encoding & 0x0f);
4129 static int dcbp_get_start(struct p_compressed_bm *p)
4131 return (p->encoding & 0x80) != 0;
4134 static int dcbp_get_pad_bits(struct p_compressed_bm *p)
4136 return (p->encoding >> 4) & 0x7;
4142 * Return 0 when done, 1 when another iteration is needed, and a negative error
4143 * code upon failure.
4146 recv_bm_rle_bits(struct drbd_peer_device *peer_device,
4147 struct p_compressed_bm *p,
4148 struct bm_xfer_ctx *c,
4151 struct bitstream bs;
4155 unsigned long s = c->bit_offset;
4157 int toggle = dcbp_get_start(p);
4161 bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
4163 bits = bitstream_get_bits(&bs, &look_ahead, 64);
4167 for (have = bits; have > 0; s += rl, toggle = !toggle) {
4168 bits = vli_decode_bits(&rl, look_ahead);
4174 if (e >= c->bm_bits) {
4175 drbd_err(peer_device, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
4178 _drbd_bm_set_bits(peer_device->device, s, e);
4182 drbd_err(peer_device, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
4183 have, bits, look_ahead,
4184 (unsigned int)(bs.cur.b - p->code),
4185 (unsigned int)bs.buf_len);
4188 /* if we consumed all 64 bits, assign 0; >> 64 is "undefined"; */
4189 if (likely(bits < 64))
4190 look_ahead >>= bits;
4195 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
4198 look_ahead |= tmp << have;
4203 bm_xfer_ctx_bit_to_word_offset(c);
4205 return (s != c->bm_bits);
4211 * Return 0 when done, 1 when another iteration is needed, and a negative error
4212 * code upon failure.
4215 decode_bitmap_c(struct drbd_peer_device *peer_device,
4216 struct p_compressed_bm *p,
4217 struct bm_xfer_ctx *c,
4220 if (dcbp_get_code(p) == RLE_VLI_Bits)
4221 return recv_bm_rle_bits(peer_device, p, c, len - sizeof(*p));
4223 /* other variants had been implemented for evaluation,
4224 * but have been dropped as this one turned out to be "best"
4225 * during all our tests. */
4227 drbd_err(peer_device, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
4228 conn_request_state(peer_device->connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4232 void INFO_bm_xfer_stats(struct drbd_device *device,
4233 const char *direction, struct bm_xfer_ctx *c)
4235 /* what would it take to transfer it "plaintext" */
4236 unsigned int header_size = drbd_header_size(first_peer_device(device)->connection);
4237 unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
4238 unsigned int plain =
4239 header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
4240 c->bm_words * sizeof(unsigned long);
4241 unsigned int total = c->bytes[0] + c->bytes[1];
4244 /* total can not be zero. but just in case: */
4248 /* don't report if not compressed */
4252 /* total < plain. check for overflow, still */
4253 r = (total > UINT_MAX/1000) ? (total / (plain/1000))
4254 : (1000 * total / plain);
4260 drbd_info(device, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
4261 "total %u; compression: %u.%u%%\n",
4263 c->bytes[1], c->packets[1],
4264 c->bytes[0], c->packets[0],
4265 total, r/10, r % 10);
4268 /* Since we are processing the bitfield from lower addresses to higher,
4269 it does not matter if the process it in 32 bit chunks or 64 bit
4270 chunks as long as it is little endian. (Understand it as byte stream,
4271 beginning with the lowest byte...) If we would use big endian
4272 we would need to process it from the highest address to the lowest,
4273 in order to be agnostic to the 32 vs 64 bits issue.
4275 returns 0 on failure, 1 if we successfully received it. */
4276 static int receive_bitmap(struct drbd_connection *connection, struct packet_info *pi)
4278 struct drbd_peer_device *peer_device;
4279 struct drbd_device *device;
4280 struct bm_xfer_ctx c;
4283 peer_device = conn_peer_device(connection, pi->vnr);
4286 device = peer_device->device;
4288 drbd_bm_lock(device, "receive bitmap", BM_LOCKED_SET_ALLOWED);
4289 /* you are supposed to send additional out-of-sync information
4290 * if you actually set bits during this phase */
4292 c = (struct bm_xfer_ctx) {
4293 .bm_bits = drbd_bm_bits(device),
4294 .bm_words = drbd_bm_words(device),
4298 if (pi->cmd == P_BITMAP)
4299 err = receive_bitmap_plain(peer_device, pi->size, pi->data, &c);
4300 else if (pi->cmd == P_COMPRESSED_BITMAP) {
4301 /* MAYBE: sanity check that we speak proto >= 90,
4302 * and the feature is enabled! */
4303 struct p_compressed_bm *p = pi->data;
4305 if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(connection)) {
4306 drbd_err(device, "ReportCBitmap packet too large\n");
4310 if (pi->size <= sizeof(*p)) {
4311 drbd_err(device, "ReportCBitmap packet too small (l:%u)\n", pi->size);
4315 err = drbd_recv_all(peer_device->connection, p, pi->size);
4318 err = decode_bitmap_c(peer_device, p, &c, pi->size);
4320 drbd_warn(device, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
4325 c.packets[pi->cmd == P_BITMAP]++;
4326 c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(connection) + pi->size;
4333 err = drbd_recv_header(peer_device->connection, pi);
4338 INFO_bm_xfer_stats(device, "receive", &c);
4340 if (device->state.conn == C_WF_BITMAP_T) {
4341 enum drbd_state_rv rv;
4343 err = drbd_send_bitmap(device);
4346 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
4347 rv = _drbd_request_state(device, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
4348 D_ASSERT(device, rv == SS_SUCCESS);
4349 } else if (device->state.conn != C_WF_BITMAP_S) {
4350 /* admin may have requested C_DISCONNECTING,
4351 * other threads may have noticed network errors */
4352 drbd_info(device, "unexpected cstate (%s) in receive_bitmap\n",
4353 drbd_conn_str(device->state.conn));
4358 drbd_bm_unlock(device);
4359 if (!err && device->state.conn == C_WF_BITMAP_S)
4360 drbd_start_resync(device, C_SYNC_SOURCE);
4364 static int receive_skip(struct drbd_connection *connection, struct packet_info *pi)
4366 drbd_warn(connection, "skipping unknown optional packet type %d, l: %d!\n",
4369 return ignore_remaining_packet(connection, pi);
4372 static int receive_UnplugRemote(struct drbd_connection *connection, struct packet_info *pi)
4374 /* Make sure we've acked all the TCP data associated
4375 * with the data requests being unplugged */
4376 drbd_tcp_quickack(connection->data.socket);
4381 static int receive_out_of_sync(struct drbd_connection *connection, struct packet_info *pi)
4383 struct drbd_peer_device *peer_device;
4384 struct drbd_device *device;
4385 struct p_block_desc *p = pi->data;
4387 peer_device = conn_peer_device(connection, pi->vnr);
4390 device = peer_device->device;
4392 switch (device->state.conn) {
4393 case C_WF_SYNC_UUID:
4398 drbd_err(device, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
4399 drbd_conn_str(device->state.conn));
4402 drbd_set_out_of_sync(device, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
4410 int (*fn)(struct drbd_connection *, struct packet_info *);
4413 static struct data_cmd drbd_cmd_handler[] = {
4414 [P_DATA] = { 1, sizeof(struct p_data), receive_Data },
4415 [P_DATA_REPLY] = { 1, sizeof(struct p_data), receive_DataReply },
4416 [P_RS_DATA_REPLY] = { 1, sizeof(struct p_data), receive_RSDataReply } ,
4417 [P_BARRIER] = { 0, sizeof(struct p_barrier), receive_Barrier } ,
4418 [P_BITMAP] = { 1, 0, receive_bitmap } ,
4419 [P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
4420 [P_UNPLUG_REMOTE] = { 0, 0, receive_UnplugRemote },
4421 [P_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4422 [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4423 [P_SYNC_PARAM] = { 1, 0, receive_SyncParam },
4424 [P_SYNC_PARAM89] = { 1, 0, receive_SyncParam },
4425 [P_PROTOCOL] = { 1, sizeof(struct p_protocol), receive_protocol },
4426 [P_UUIDS] = { 0, sizeof(struct p_uuids), receive_uuids },
4427 [P_SIZES] = { 0, sizeof(struct p_sizes), receive_sizes },
4428 [P_STATE] = { 0, sizeof(struct p_state), receive_state },
4429 [P_STATE_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_state },
4430 [P_SYNC_UUID] = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
4431 [P_OV_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4432 [P_OV_REPLY] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4433 [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4434 [P_DELAY_PROBE] = { 0, sizeof(struct p_delay_probe93), receive_skip },
4435 [P_OUT_OF_SYNC] = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
4436 [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
4437 [P_PROTOCOL_UPDATE] = { 1, sizeof(struct p_protocol), receive_protocol },
4440 static void drbdd(struct drbd_connection *connection)
4442 struct packet_info pi;
4443 size_t shs; /* sub header size */
4446 while (get_t_state(&connection->receiver) == RUNNING) {
4447 struct data_cmd *cmd;
4449 drbd_thread_current_set_cpu(&connection->receiver);
4450 if (drbd_recv_header(connection, &pi))
4453 cmd = &drbd_cmd_handler[pi.cmd];
4454 if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
4455 drbd_err(connection, "Unexpected data packet %s (0x%04x)",
4456 cmdname(pi.cmd), pi.cmd);
4460 shs = cmd->pkt_size;
4461 if (pi.size > shs && !cmd->expect_payload) {
4462 drbd_err(connection, "No payload expected %s l:%d\n",
4463 cmdname(pi.cmd), pi.size);
4468 err = drbd_recv_all_warn(connection, pi.data, shs);
4474 err = cmd->fn(connection, &pi);
4476 drbd_err(connection, "error receiving %s, e: %d l: %d!\n",
4477 cmdname(pi.cmd), err, pi.size);
4484 conn_request_state(connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4487 static void conn_disconnect(struct drbd_connection *connection)
4489 struct drbd_peer_device *peer_device;
4493 if (connection->cstate == C_STANDALONE)
4496 /* We are about to start the cleanup after connection loss.
4497 * Make sure drbd_make_request knows about that.
4498 * Usually we should be in some network failure state already,
4499 * but just in case we are not, we fix it up here.
4501 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
4503 /* asender does not clean up anything. it must not interfere, either */
4504 drbd_thread_stop(&connection->asender);
4505 drbd_free_sock(connection);
4508 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
4509 struct drbd_device *device = peer_device->device;
4510 kref_get(&device->kref);
4512 drbd_disconnected(peer_device);
4513 kref_put(&device->kref, drbd_destroy_device);
4518 if (!list_empty(&connection->current_epoch->list))
4519 drbd_err(connection, "ASSERTION FAILED: connection->current_epoch->list not empty\n");
4520 /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
4521 atomic_set(&connection->current_epoch->epoch_size, 0);
4522 connection->send.seen_any_write_yet = false;
4524 drbd_info(connection, "Connection closed\n");
4526 if (conn_highest_role(connection) == R_PRIMARY && conn_highest_pdsk(connection) >= D_UNKNOWN)
4527 conn_try_outdate_peer_async(connection);
4529 spin_lock_irq(&connection->resource->req_lock);
4530 oc = connection->cstate;
4531 if (oc >= C_UNCONNECTED)
4532 _conn_request_state(connection, NS(conn, C_UNCONNECTED), CS_VERBOSE);
4534 spin_unlock_irq(&connection->resource->req_lock);
4536 if (oc == C_DISCONNECTING)
4537 conn_request_state(connection, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD);
4540 static int drbd_disconnected(struct drbd_peer_device *peer_device)
4542 struct drbd_device *device = peer_device->device;
4545 /* wait for current activity to cease. */
4546 spin_lock_irq(&device->resource->req_lock);
4547 _drbd_wait_ee_list_empty(device, &device->active_ee);
4548 _drbd_wait_ee_list_empty(device, &device->sync_ee);
4549 _drbd_wait_ee_list_empty(device, &device->read_ee);
4550 spin_unlock_irq(&device->resource->req_lock);
4552 /* We do not have data structures that would allow us to
4553 * get the rs_pending_cnt down to 0 again.
4554 * * On C_SYNC_TARGET we do not have any data structures describing
4555 * the pending RSDataRequest's we have sent.
4556 * * On C_SYNC_SOURCE there is no data structure that tracks
4557 * the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
4558 * And no, it is not the sum of the reference counts in the
4559 * resync_LRU. The resync_LRU tracks the whole operation including
4560 * the disk-IO, while the rs_pending_cnt only tracks the blocks
4562 drbd_rs_cancel_all(device);
4563 device->rs_total = 0;
4564 device->rs_failed = 0;
4565 atomic_set(&device->rs_pending_cnt, 0);
4566 wake_up(&device->misc_wait);
4568 del_timer_sync(&device->resync_timer);
4569 resync_timer_fn((unsigned long)device);
4571 /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
4572 * w_make_resync_request etc. which may still be on the worker queue
4573 * to be "canceled" */
4574 drbd_flush_workqueue(&peer_device->connection->sender_work);
4576 drbd_finish_peer_reqs(device);
4578 /* This second workqueue flush is necessary, since drbd_finish_peer_reqs()
4579 might have issued a work again. The one before drbd_finish_peer_reqs() is
4580 necessary to reclain net_ee in drbd_finish_peer_reqs(). */
4581 drbd_flush_workqueue(&peer_device->connection->sender_work);
4583 /* need to do it again, drbd_finish_peer_reqs() may have populated it
4584 * again via drbd_try_clear_on_disk_bm(). */
4585 drbd_rs_cancel_all(device);
4587 kfree(device->p_uuid);
4588 device->p_uuid = NULL;
4590 if (!drbd_suspended(device))
4591 tl_clear(peer_device->connection);
4593 drbd_md_sync(device);
4595 /* serialize with bitmap writeout triggered by the state change,
4597 wait_event(device->misc_wait, !test_bit(BITMAP_IO, &device->flags));
4599 /* tcp_close and release of sendpage pages can be deferred. I don't
4600 * want to use SO_LINGER, because apparently it can be deferred for
4601 * more than 20 seconds (longest time I checked).
4603 * Actually we don't care for exactly when the network stack does its
4604 * put_page(), but release our reference on these pages right here.
4606 i = drbd_free_peer_reqs(device, &device->net_ee);
4608 drbd_info(device, "net_ee not empty, killed %u entries\n", i);
4609 i = atomic_read(&device->pp_in_use_by_net);
4611 drbd_info(device, "pp_in_use_by_net = %d, expected 0\n", i);
4612 i = atomic_read(&device->pp_in_use);
4614 drbd_info(device, "pp_in_use = %d, expected 0\n", i);
4616 D_ASSERT(device, list_empty(&device->read_ee));
4617 D_ASSERT(device, list_empty(&device->active_ee));
4618 D_ASSERT(device, list_empty(&device->sync_ee));
4619 D_ASSERT(device, list_empty(&device->done_ee));
4625 * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
4626 * we can agree on is stored in agreed_pro_version.
4628 * feature flags and the reserved array should be enough room for future
4629 * enhancements of the handshake protocol, and possible plugins...
4631 * for now, they are expected to be zero, but ignored.
4633 static int drbd_send_features(struct drbd_connection *connection)
4635 struct drbd_socket *sock;
4636 struct p_connection_features *p;
4638 sock = &connection->data;
4639 p = conn_prepare_command(connection, sock);
4642 memset(p, 0, sizeof(*p));
4643 p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
4644 p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
4645 return conn_send_command(connection, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
4650 * 1 yes, we have a valid connection
4651 * 0 oops, did not work out, please try again
4652 * -1 peer talks different language,
4653 * no point in trying again, please go standalone.
4655 static int drbd_do_features(struct drbd_connection *connection)
4657 /* ASSERT current == connection->receiver ... */
4658 struct p_connection_features *p;
4659 const int expect = sizeof(struct p_connection_features);
4660 struct packet_info pi;
4663 err = drbd_send_features(connection);
4667 err = drbd_recv_header(connection, &pi);
4671 if (pi.cmd != P_CONNECTION_FEATURES) {
4672 drbd_err(connection, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
4673 cmdname(pi.cmd), pi.cmd);
4677 if (pi.size != expect) {
4678 drbd_err(connection, "expected ConnectionFeatures length: %u, received: %u\n",
4684 err = drbd_recv_all_warn(connection, p, expect);
4688 p->protocol_min = be32_to_cpu(p->protocol_min);
4689 p->protocol_max = be32_to_cpu(p->protocol_max);
4690 if (p->protocol_max == 0)
4691 p->protocol_max = p->protocol_min;
4693 if (PRO_VERSION_MAX < p->protocol_min ||
4694 PRO_VERSION_MIN > p->protocol_max)
4697 connection->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
4699 drbd_info(connection, "Handshake successful: "
4700 "Agreed network protocol version %d\n", connection->agreed_pro_version);
4705 drbd_err(connection, "incompatible DRBD dialects: "
4706 "I support %d-%d, peer supports %d-%d\n",
4707 PRO_VERSION_MIN, PRO_VERSION_MAX,
4708 p->protocol_min, p->protocol_max);
4712 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4713 static int drbd_do_auth(struct drbd_connection *connection)
4715 drbd_err(connection, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4716 drbd_err(connection, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
4720 #define CHALLENGE_LEN 64
4724 0 - failed, try again (network error),
4725 -1 - auth failed, don't try again.
4728 static int drbd_do_auth(struct drbd_connection *connection)
4730 struct drbd_socket *sock;
4731 char my_challenge[CHALLENGE_LEN]; /* 64 Bytes... */
4732 struct scatterlist sg;
4733 char *response = NULL;
4734 char *right_response = NULL;
4735 char *peers_ch = NULL;
4736 unsigned int key_len;
4737 char secret[SHARED_SECRET_MAX]; /* 64 byte */
4738 unsigned int resp_size;
4739 struct hash_desc desc;
4740 struct packet_info pi;
4741 struct net_conf *nc;
4744 /* FIXME: Put the challenge/response into the preallocated socket buffer. */
4747 nc = rcu_dereference(connection->net_conf);
4748 key_len = strlen(nc->shared_secret);
4749 memcpy(secret, nc->shared_secret, key_len);
4752 desc.tfm = connection->cram_hmac_tfm;
4755 rv = crypto_hash_setkey(connection->cram_hmac_tfm, (u8 *)secret, key_len);
4757 drbd_err(connection, "crypto_hash_setkey() failed with %d\n", rv);
4762 get_random_bytes(my_challenge, CHALLENGE_LEN);
4764 sock = &connection->data;
4765 if (!conn_prepare_command(connection, sock)) {
4769 rv = !conn_send_command(connection, sock, P_AUTH_CHALLENGE, 0,
4770 my_challenge, CHALLENGE_LEN);
4774 err = drbd_recv_header(connection, &pi);
4780 if (pi.cmd != P_AUTH_CHALLENGE) {
4781 drbd_err(connection, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4782 cmdname(pi.cmd), pi.cmd);
4787 if (pi.size > CHALLENGE_LEN * 2) {
4788 drbd_err(connection, "expected AuthChallenge payload too big.\n");
4793 peers_ch = kmalloc(pi.size, GFP_NOIO);
4794 if (peers_ch == NULL) {
4795 drbd_err(connection, "kmalloc of peers_ch failed\n");
4800 err = drbd_recv_all_warn(connection, peers_ch, pi.size);
4806 resp_size = crypto_hash_digestsize(connection->cram_hmac_tfm);
4807 response = kmalloc(resp_size, GFP_NOIO);
4808 if (response == NULL) {
4809 drbd_err(connection, "kmalloc of response failed\n");
4814 sg_init_table(&sg, 1);
4815 sg_set_buf(&sg, peers_ch, pi.size);
4817 rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4819 drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
4824 if (!conn_prepare_command(connection, sock)) {
4828 rv = !conn_send_command(connection, sock, P_AUTH_RESPONSE, 0,
4829 response, resp_size);
4833 err = drbd_recv_header(connection, &pi);
4839 if (pi.cmd != P_AUTH_RESPONSE) {
4840 drbd_err(connection, "expected AuthResponse packet, received: %s (0x%04x)\n",
4841 cmdname(pi.cmd), pi.cmd);
4846 if (pi.size != resp_size) {
4847 drbd_err(connection, "expected AuthResponse payload of wrong size\n");
4852 err = drbd_recv_all_warn(connection, response , resp_size);
4858 right_response = kmalloc(resp_size, GFP_NOIO);
4859 if (right_response == NULL) {
4860 drbd_err(connection, "kmalloc of right_response failed\n");
4865 sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4867 rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4869 drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
4874 rv = !memcmp(response, right_response, resp_size);
4877 drbd_info(connection, "Peer authenticated using %d bytes HMAC\n",
4885 kfree(right_response);
4891 int drbd_receiver(struct drbd_thread *thi)
4893 struct drbd_connection *connection = thi->connection;
4896 drbd_info(connection, "receiver (re)started\n");
4899 h = conn_connect(connection);
4901 conn_disconnect(connection);
4902 schedule_timeout_interruptible(HZ);
4905 drbd_warn(connection, "Discarding network configuration.\n");
4906 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
4913 conn_disconnect(connection);
4915 drbd_info(connection, "receiver terminated\n");
4919 /* ********* acknowledge sender ******** */
4921 static int got_conn_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
4923 struct p_req_state_reply *p = pi->data;
4924 int retcode = be32_to_cpu(p->retcode);
4926 if (retcode >= SS_SUCCESS) {
4927 set_bit(CONN_WD_ST_CHG_OKAY, &connection->flags);
4929 set_bit(CONN_WD_ST_CHG_FAIL, &connection->flags);
4930 drbd_err(connection, "Requested state change failed by peer: %s (%d)\n",
4931 drbd_set_st_err_str(retcode), retcode);
4933 wake_up(&connection->ping_wait);
4938 static int got_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
4940 struct drbd_peer_device *peer_device;
4941 struct drbd_device *device;
4942 struct p_req_state_reply *p = pi->data;
4943 int retcode = be32_to_cpu(p->retcode);
4945 peer_device = conn_peer_device(connection, pi->vnr);
4948 device = peer_device->device;
4950 if (test_bit(CONN_WD_ST_CHG_REQ, &connection->flags)) {
4951 D_ASSERT(device, connection->agreed_pro_version < 100);
4952 return got_conn_RqSReply(connection, pi);
4955 if (retcode >= SS_SUCCESS) {
4956 set_bit(CL_ST_CHG_SUCCESS, &device->flags);
4958 set_bit(CL_ST_CHG_FAIL, &device->flags);
4959 drbd_err(device, "Requested state change failed by peer: %s (%d)\n",
4960 drbd_set_st_err_str(retcode), retcode);
4962 wake_up(&device->state_wait);
4967 static int got_Ping(struct drbd_connection *connection, struct packet_info *pi)
4969 return drbd_send_ping_ack(connection);
4973 static int got_PingAck(struct drbd_connection *connection, struct packet_info *pi)
4975 /* restore idle timeout */
4976 connection->meta.socket->sk->sk_rcvtimeo = connection->net_conf->ping_int*HZ;
4977 if (!test_and_set_bit(GOT_PING_ACK, &connection->flags))
4978 wake_up(&connection->ping_wait);
4983 static int got_IsInSync(struct drbd_connection *connection, struct packet_info *pi)
4985 struct drbd_peer_device *peer_device;
4986 struct drbd_device *device;
4987 struct p_block_ack *p = pi->data;
4988 sector_t sector = be64_to_cpu(p->sector);
4989 int blksize = be32_to_cpu(p->blksize);
4991 peer_device = conn_peer_device(connection, pi->vnr);
4994 device = peer_device->device;
4996 D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
4998 update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5000 if (get_ldev(device)) {
5001 drbd_rs_complete_io(device, sector);
5002 drbd_set_in_sync(device, sector, blksize);
5003 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
5004 device->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
5007 dec_rs_pending(device);
5008 atomic_add(blksize >> 9, &device->rs_sect_in);
5014 validate_req_change_req_state(struct drbd_device *device, u64 id, sector_t sector,
5015 struct rb_root *root, const char *func,
5016 enum drbd_req_event what, bool missing_ok)
5018 struct drbd_request *req;
5019 struct bio_and_error m;
5021 spin_lock_irq(&device->resource->req_lock);
5022 req = find_request(device, root, id, sector, missing_ok, func);
5023 if (unlikely(!req)) {
5024 spin_unlock_irq(&device->resource->req_lock);
5027 __req_mod(req, what, &m);
5028 spin_unlock_irq(&device->resource->req_lock);
5031 complete_master_bio(device, &m);
5035 static int got_BlockAck(struct drbd_connection *connection, struct packet_info *pi)
5037 struct drbd_peer_device *peer_device;
5038 struct drbd_device *device;
5039 struct p_block_ack *p = pi->data;
5040 sector_t sector = be64_to_cpu(p->sector);
5041 int blksize = be32_to_cpu(p->blksize);
5042 enum drbd_req_event what;
5044 peer_device = conn_peer_device(connection, pi->vnr);
5047 device = peer_device->device;
5049 update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5051 if (p->block_id == ID_SYNCER) {
5052 drbd_set_in_sync(device, sector, blksize);
5053 dec_rs_pending(device);
5057 case P_RS_WRITE_ACK:
5058 what = WRITE_ACKED_BY_PEER_AND_SIS;
5061 what = WRITE_ACKED_BY_PEER;
5064 what = RECV_ACKED_BY_PEER;
5067 what = CONFLICT_RESOLVED;
5070 what = POSTPONE_WRITE;
5076 return validate_req_change_req_state(device, p->block_id, sector,
5077 &device->write_requests, __func__,
5081 static int got_NegAck(struct drbd_connection *connection, struct packet_info *pi)
5083 struct drbd_peer_device *peer_device;
5084 struct drbd_device *device;
5085 struct p_block_ack *p = pi->data;
5086 sector_t sector = be64_to_cpu(p->sector);
5087 int size = be32_to_cpu(p->blksize);
5090 peer_device = conn_peer_device(connection, pi->vnr);
5093 device = peer_device->device;
5095 update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5097 if (p->block_id == ID_SYNCER) {
5098 dec_rs_pending(device);
5099 drbd_rs_failed_io(device, sector, size);
5103 err = validate_req_change_req_state(device, p->block_id, sector,
5104 &device->write_requests, __func__,
5107 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
5108 The master bio might already be completed, therefore the
5109 request is no longer in the collision hash. */
5110 /* In Protocol B we might already have got a P_RECV_ACK
5111 but then get a P_NEG_ACK afterwards. */
5112 drbd_set_out_of_sync(device, sector, size);
5117 static int got_NegDReply(struct drbd_connection *connection, struct packet_info *pi)
5119 struct drbd_peer_device *peer_device;
5120 struct drbd_device *device;
5121 struct p_block_ack *p = pi->data;
5122 sector_t sector = be64_to_cpu(p->sector);
5124 peer_device = conn_peer_device(connection, pi->vnr);
5127 device = peer_device->device;
5129 update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5131 drbd_err(device, "Got NegDReply; Sector %llus, len %u.\n",
5132 (unsigned long long)sector, be32_to_cpu(p->blksize));
5134 return validate_req_change_req_state(device, p->block_id, sector,
5135 &device->read_requests, __func__,
5139 static int got_NegRSDReply(struct drbd_connection *connection, struct packet_info *pi)
5141 struct drbd_peer_device *peer_device;
5142 struct drbd_device *device;
5145 struct p_block_ack *p = pi->data;
5147 peer_device = conn_peer_device(connection, pi->vnr);
5150 device = peer_device->device;
5152 sector = be64_to_cpu(p->sector);
5153 size = be32_to_cpu(p->blksize);
5155 update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5157 dec_rs_pending(device);
5159 if (get_ldev_if_state(device, D_FAILED)) {
5160 drbd_rs_complete_io(device, sector);
5162 case P_NEG_RS_DREPLY:
5163 drbd_rs_failed_io(device, sector, size);
5175 static int got_BarrierAck(struct drbd_connection *connection, struct packet_info *pi)
5177 struct p_barrier_ack *p = pi->data;
5178 struct drbd_peer_device *peer_device;
5181 tl_release(connection, p->barrier, be32_to_cpu(p->set_size));
5184 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5185 struct drbd_device *device = peer_device->device;
5187 if (device->state.conn == C_AHEAD &&
5188 atomic_read(&device->ap_in_flight) == 0 &&
5189 !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &device->flags)) {
5190 device->start_resync_timer.expires = jiffies + HZ;
5191 add_timer(&device->start_resync_timer);
5199 static int got_OVResult(struct drbd_connection *connection, struct packet_info *pi)
5201 struct drbd_peer_device *peer_device;
5202 struct drbd_device *device;
5203 struct p_block_ack *p = pi->data;
5204 struct drbd_device_work *dw;
5208 peer_device = conn_peer_device(connection, pi->vnr);
5211 device = peer_device->device;
5213 sector = be64_to_cpu(p->sector);
5214 size = be32_to_cpu(p->blksize);
5216 update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5218 if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
5219 drbd_ov_out_of_sync_found(device, sector, size);
5221 ov_out_of_sync_print(device);
5223 if (!get_ldev(device))
5226 drbd_rs_complete_io(device, sector);
5227 dec_rs_pending(device);
5231 /* let's advance progress step marks only for every other megabyte */
5232 if ((device->ov_left & 0x200) == 0x200)
5233 drbd_advance_rs_marks(device, device->ov_left);
5235 if (device->ov_left == 0) {
5236 dw = kmalloc(sizeof(*dw), GFP_NOIO);
5238 dw->w.cb = w_ov_finished;
5239 dw->device = device;
5240 drbd_queue_work(&peer_device->connection->sender_work, &dw->w);
5242 drbd_err(device, "kmalloc(dw) failed.");
5243 ov_out_of_sync_print(device);
5244 drbd_resync_finished(device);
5251 static int got_skip(struct drbd_connection *connection, struct packet_info *pi)
5256 static int connection_finish_peer_reqs(struct drbd_connection *connection)
5258 struct drbd_peer_device *peer_device;
5259 int vnr, not_empty = 0;
5262 clear_bit(SIGNAL_ASENDER, &connection->flags);
5263 flush_signals(current);
5266 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5267 struct drbd_device *device = peer_device->device;
5268 kref_get(&device->kref);
5270 if (drbd_finish_peer_reqs(device)) {
5271 kref_put(&device->kref, drbd_destroy_device);
5274 kref_put(&device->kref, drbd_destroy_device);
5277 set_bit(SIGNAL_ASENDER, &connection->flags);
5279 spin_lock_irq(&connection->resource->req_lock);
5280 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5281 struct drbd_device *device = peer_device->device;
5282 not_empty = !list_empty(&device->done_ee);
5286 spin_unlock_irq(&connection->resource->req_lock);
5288 } while (not_empty);
5293 struct asender_cmd {
5295 int (*fn)(struct drbd_connection *connection, struct packet_info *);
5298 static struct asender_cmd asender_tbl[] = {
5299 [P_PING] = { 0, got_Ping },
5300 [P_PING_ACK] = { 0, got_PingAck },
5301 [P_RECV_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
5302 [P_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
5303 [P_RS_WRITE_ACK] = { sizeof(struct p_block_ack), got_BlockAck },
5304 [P_SUPERSEDED] = { sizeof(struct p_block_ack), got_BlockAck },
5305 [P_NEG_ACK] = { sizeof(struct p_block_ack), got_NegAck },
5306 [P_NEG_DREPLY] = { sizeof(struct p_block_ack), got_NegDReply },
5307 [P_NEG_RS_DREPLY] = { sizeof(struct p_block_ack), got_NegRSDReply },
5308 [P_OV_RESULT] = { sizeof(struct p_block_ack), got_OVResult },
5309 [P_BARRIER_ACK] = { sizeof(struct p_barrier_ack), got_BarrierAck },
5310 [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
5311 [P_RS_IS_IN_SYNC] = { sizeof(struct p_block_ack), got_IsInSync },
5312 [P_DELAY_PROBE] = { sizeof(struct p_delay_probe93), got_skip },
5313 [P_RS_CANCEL] = { sizeof(struct p_block_ack), got_NegRSDReply },
5314 [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
5315 [P_RETRY_WRITE] = { sizeof(struct p_block_ack), got_BlockAck },
5318 int drbd_asender(struct drbd_thread *thi)
5320 struct drbd_connection *connection = thi->connection;
5321 struct asender_cmd *cmd = NULL;
5322 struct packet_info pi;
5324 void *buf = connection->meta.rbuf;
5326 unsigned int header_size = drbd_header_size(connection);
5327 int expect = header_size;
5328 bool ping_timeout_active = false;
5329 struct net_conf *nc;
5330 int ping_timeo, tcp_cork, ping_int;
5331 struct sched_param param = { .sched_priority = 2 };
5333 rv = sched_setscheduler(current, SCHED_RR, ¶m);
5335 drbd_err(connection, "drbd_asender: ERROR set priority, ret=%d\n", rv);
5337 while (get_t_state(thi) == RUNNING) {
5338 drbd_thread_current_set_cpu(thi);
5341 nc = rcu_dereference(connection->net_conf);
5342 ping_timeo = nc->ping_timeo;
5343 tcp_cork = nc->tcp_cork;
5344 ping_int = nc->ping_int;
5347 if (test_and_clear_bit(SEND_PING, &connection->flags)) {
5348 if (drbd_send_ping(connection)) {
5349 drbd_err(connection, "drbd_send_ping has failed\n");
5352 connection->meta.socket->sk->sk_rcvtimeo = ping_timeo * HZ / 10;
5353 ping_timeout_active = true;
5356 /* TODO: conditionally cork; it may hurt latency if we cork without
5359 drbd_tcp_cork(connection->meta.socket);
5360 if (connection_finish_peer_reqs(connection)) {
5361 drbd_err(connection, "connection_finish_peer_reqs() failed\n");
5364 /* but unconditionally uncork unless disabled */
5366 drbd_tcp_uncork(connection->meta.socket);
5368 /* short circuit, recv_msg would return EINTR anyways. */
5369 if (signal_pending(current))
5372 rv = drbd_recv_short(connection->meta.socket, buf, expect-received, 0);
5373 clear_bit(SIGNAL_ASENDER, &connection->flags);
5375 flush_signals(current);
5378 * -EINTR (on meta) we got a signal
5379 * -EAGAIN (on meta) rcvtimeo expired
5380 * -ECONNRESET other side closed the connection
5381 * -ERESTARTSYS (on data) we got a signal
5382 * rv < 0 other than above: unexpected error!
5383 * rv == expected: full header or command
5384 * rv < expected: "woken" by signal during receive
5385 * rv == 0 : "connection shut down by peer"
5387 if (likely(rv > 0)) {
5390 } else if (rv == 0) {
5391 if (test_bit(DISCONNECT_SENT, &connection->flags)) {
5394 t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
5397 t = wait_event_timeout(connection->ping_wait,
5398 connection->cstate < C_WF_REPORT_PARAMS,
5403 drbd_err(connection, "meta connection shut down by peer.\n");
5405 } else if (rv == -EAGAIN) {
5406 /* If the data socket received something meanwhile,
5407 * that is good enough: peer is still alive. */
5408 if (time_after(connection->last_received,
5409 jiffies - connection->meta.socket->sk->sk_rcvtimeo))
5411 if (ping_timeout_active) {
5412 drbd_err(connection, "PingAck did not arrive in time.\n");
5415 set_bit(SEND_PING, &connection->flags);
5417 } else if (rv == -EINTR) {
5420 drbd_err(connection, "sock_recvmsg returned %d\n", rv);
5424 if (received == expect && cmd == NULL) {
5425 if (decode_header(connection, connection->meta.rbuf, &pi))
5427 cmd = &asender_tbl[pi.cmd];
5428 if (pi.cmd >= ARRAY_SIZE(asender_tbl) || !cmd->fn) {
5429 drbd_err(connection, "Unexpected meta packet %s (0x%04x)\n",
5430 cmdname(pi.cmd), pi.cmd);
5433 expect = header_size + cmd->pkt_size;
5434 if (pi.size != expect - header_size) {
5435 drbd_err(connection, "Wrong packet size on meta (c: %d, l: %d)\n",
5440 if (received == expect) {
5443 err = cmd->fn(connection, &pi);
5445 drbd_err(connection, "%pf failed\n", cmd->fn);
5449 connection->last_received = jiffies;
5451 if (cmd == &asender_tbl[P_PING_ACK]) {
5452 /* restore idle timeout */
5453 connection->meta.socket->sk->sk_rcvtimeo = ping_int * HZ;
5454 ping_timeout_active = false;
5457 buf = connection->meta.rbuf;
5459 expect = header_size;
5466 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5467 conn_md_sync(connection);
5471 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
5473 clear_bit(SIGNAL_ASENDER, &connection->flags);
5475 drbd_info(connection, "asender terminated\n");