4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
10 Thanks to Carter Burden, Bart Grantham and Gennadiy Nerubayev
11 from Logicworks, Inc. for making SDP replication support possible.
13 drbd is free software; you can redistribute it and/or modify
14 it under the terms of the GNU General Public License as published by
15 the Free Software Foundation; either version 2, or (at your option)
18 drbd is distributed in the hope that it will be useful,
19 but WITHOUT ANY WARRANTY; without even the implied warranty of
20 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
21 GNU General Public License for more details.
23 You should have received a copy of the GNU General Public License
24 along with drbd; see the file COPYING. If not, write to
25 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
29 #include <linux/module.h>
30 #include <linux/drbd.h>
31 #include <asm/uaccess.h>
32 #include <asm/types.h>
34 #include <linux/ctype.h>
35 #include <linux/mutex.h>
37 #include <linux/file.h>
38 #include <linux/proc_fs.h>
39 #include <linux/init.h>
41 #include <linux/memcontrol.h>
42 #include <linux/mm_inline.h>
43 #include <linux/slab.h>
44 #include <linux/random.h>
45 #include <linux/reboot.h>
46 #include <linux/notifier.h>
47 #include <linux/kthread.h>
49 #define __KERNEL_SYSCALLS__
50 #include <linux/unistd.h>
51 #include <linux/vmalloc.h>
53 #include <linux/drbd_limits.h>
55 #include "drbd_req.h" /* only for _req_mod in tl_release and tl_clear */
59 static DEFINE_MUTEX(drbd_main_mutex);
60 int drbdd_init(struct drbd_thread *);
61 int drbd_worker(struct drbd_thread *);
62 int drbd_asender(struct drbd_thread *);
65 static int drbd_open(struct block_device *bdev, fmode_t mode);
66 static int drbd_release(struct gendisk *gd, fmode_t mode);
67 static int w_md_sync(struct drbd_work *w, int unused);
68 static void md_sync_timer_fn(unsigned long data);
69 static int w_bitmap_io(struct drbd_work *w, int unused);
70 static int w_go_diskless(struct drbd_work *w, int unused);
72 MODULE_AUTHOR("Philipp Reisner <phil@linbit.com>, "
73 "Lars Ellenberg <lars@linbit.com>");
74 MODULE_DESCRIPTION("drbd - Distributed Replicated Block Device v" REL_VERSION);
75 MODULE_VERSION(REL_VERSION);
76 MODULE_LICENSE("GPL");
77 MODULE_PARM_DESC(minor_count, "Approximate number of drbd devices ("
78 __stringify(DRBD_MINOR_COUNT_MIN) "-" __stringify(DRBD_MINOR_COUNT_MAX) ")");
79 MODULE_ALIAS_BLOCKDEV_MAJOR(DRBD_MAJOR);
81 #include <linux/moduleparam.h>
82 /* allow_open_on_secondary */
83 MODULE_PARM_DESC(allow_oos, "DONT USE!");
84 /* thanks to these macros, if compiled into the kernel (not-module),
85 * this becomes the boot parameter drbd.minor_count */
86 module_param(minor_count, uint, 0444);
87 module_param(disable_sendpage, bool, 0644);
88 module_param(allow_oos, bool, 0);
89 module_param(proc_details, int, 0644);
91 #ifdef CONFIG_DRBD_FAULT_INJECTION
94 static int fault_count;
96 /* bitmap of enabled faults */
97 module_param(enable_faults, int, 0664);
98 /* fault rate % value - applies to all enabled faults */
99 module_param(fault_rate, int, 0664);
100 /* count of faults inserted */
101 module_param(fault_count, int, 0664);
102 /* bitmap of devices to insert faults on */
103 module_param(fault_devs, int, 0644);
106 /* module parameter, defined */
107 unsigned int minor_count = DRBD_MINOR_COUNT_DEF;
108 int disable_sendpage;
110 int proc_details; /* Detail level in proc drbd*/
112 /* Module parameter for setting the user mode helper program
113 * to run. Default is /sbin/drbdadm */
114 char usermode_helper[80] = "/sbin/drbdadm";
116 module_param_string(usermode_helper, usermode_helper, sizeof(usermode_helper), 0644);
118 /* in 2.6.x, our device mapping and config info contains our virtual gendisks
119 * as member "struct gendisk *vdisk;"
122 struct list_head drbd_tconns; /* list of struct drbd_tconn */
123 DECLARE_RWSEM(drbd_cfg_rwsem);
125 struct kmem_cache *drbd_request_cache;
126 struct kmem_cache *drbd_ee_cache; /* peer requests */
127 struct kmem_cache *drbd_bm_ext_cache; /* bitmap extents */
128 struct kmem_cache *drbd_al_ext_cache; /* activity log extents */
129 mempool_t *drbd_request_mempool;
130 mempool_t *drbd_ee_mempool;
131 mempool_t *drbd_md_io_page_pool;
132 struct bio_set *drbd_md_io_bio_set;
134 /* I do not use a standard mempool, because:
135 1) I want to hand out the pre-allocated objects first.
136 2) I want to be able to interrupt sleeping allocation with a signal.
137 Note: This is a single linked list, the next pointer is the private
138 member of struct page.
140 struct page *drbd_pp_pool;
141 spinlock_t drbd_pp_lock;
143 wait_queue_head_t drbd_pp_wait;
145 DEFINE_RATELIMIT_STATE(drbd_ratelimit_state, 5 * HZ, 5);
147 static const struct block_device_operations drbd_ops = {
148 .owner = THIS_MODULE,
150 .release = drbd_release,
153 static void bio_destructor_drbd(struct bio *bio)
155 bio_free(bio, drbd_md_io_bio_set);
158 struct bio *bio_alloc_drbd(gfp_t gfp_mask)
162 if (!drbd_md_io_bio_set)
163 return bio_alloc(gfp_mask, 1);
165 bio = bio_alloc_bioset(gfp_mask, 1, drbd_md_io_bio_set);
168 bio->bi_destructor = bio_destructor_drbd;
173 /* When checking with sparse, and this is an inline function, sparse will
174 give tons of false positives. When this is a real functions sparse works.
176 int _get_ldev_if_state(struct drbd_conf *mdev, enum drbd_disk_state mins)
180 atomic_inc(&mdev->local_cnt);
181 io_allowed = (mdev->state.disk >= mins);
183 if (atomic_dec_and_test(&mdev->local_cnt))
184 wake_up(&mdev->misc_wait);
192 * DOC: The transfer log
194 * The transfer log is a single linked list of &struct drbd_tl_epoch objects.
195 * mdev->tconn->newest_tle points to the head, mdev->tconn->oldest_tle points to the tail
196 * of the list. There is always at least one &struct drbd_tl_epoch object.
198 * Each &struct drbd_tl_epoch has a circular double linked list of requests
201 static int tl_init(struct drbd_tconn *tconn)
203 struct drbd_tl_epoch *b;
205 /* during device minor initialization, we may well use GFP_KERNEL */
206 b = kmalloc(sizeof(struct drbd_tl_epoch), GFP_KERNEL);
209 INIT_LIST_HEAD(&b->requests);
210 INIT_LIST_HEAD(&b->w.list);
214 b->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
216 tconn->oldest_tle = b;
217 tconn->newest_tle = b;
218 INIT_LIST_HEAD(&tconn->out_of_sequence_requests);
223 static void tl_cleanup(struct drbd_tconn *tconn)
225 if (tconn->oldest_tle != tconn->newest_tle)
226 conn_err(tconn, "ASSERT FAILED: oldest_tle == newest_tle\n");
227 if (!list_empty(&tconn->out_of_sequence_requests))
228 conn_err(tconn, "ASSERT FAILED: list_empty(out_of_sequence_requests)\n");
229 kfree(tconn->oldest_tle);
230 tconn->oldest_tle = NULL;
231 kfree(tconn->unused_spare_tle);
232 tconn->unused_spare_tle = NULL;
236 * _tl_add_barrier() - Adds a barrier to the transfer log
237 * @mdev: DRBD device.
238 * @new: Barrier to be added before the current head of the TL.
240 * The caller must hold the req_lock.
242 void _tl_add_barrier(struct drbd_tconn *tconn, struct drbd_tl_epoch *new)
244 struct drbd_tl_epoch *newest_before;
246 INIT_LIST_HEAD(&new->requests);
247 INIT_LIST_HEAD(&new->w.list);
248 new->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
252 newest_before = tconn->newest_tle;
253 /* never send a barrier number == 0, because that is special-cased
254 * when using TCQ for our write ordering code */
255 new->br_number = (newest_before->br_number+1) ?: 1;
256 if (tconn->newest_tle != new) {
257 tconn->newest_tle->next = new;
258 tconn->newest_tle = new;
263 * tl_release() - Free or recycle the oldest &struct drbd_tl_epoch object of the TL
264 * @mdev: DRBD device.
265 * @barrier_nr: Expected identifier of the DRBD write barrier packet.
266 * @set_size: Expected number of requests before that barrier.
268 * In case the passed barrier_nr or set_size does not match the oldest
269 * &struct drbd_tl_epoch objects this function will cause a termination
272 void tl_release(struct drbd_tconn *tconn, unsigned int barrier_nr,
273 unsigned int set_size)
275 struct drbd_conf *mdev;
276 struct drbd_tl_epoch *b, *nob; /* next old barrier */
277 struct list_head *le, *tle;
278 struct drbd_request *r;
280 spin_lock_irq(&tconn->req_lock);
282 b = tconn->oldest_tle;
284 /* first some paranoia code */
286 conn_err(tconn, "BAD! BarrierAck #%u received, but no epoch in tl!?\n",
290 if (b->br_number != barrier_nr) {
291 conn_err(tconn, "BAD! BarrierAck #%u received, expected #%u!\n",
292 barrier_nr, b->br_number);
295 if (b->n_writes != set_size) {
296 conn_err(tconn, "BAD! BarrierAck #%u received with n_writes=%u, expected n_writes=%u!\n",
297 barrier_nr, set_size, b->n_writes);
301 /* Clean up list of requests processed during current epoch */
302 list_for_each_safe(le, tle, &b->requests) {
303 r = list_entry(le, struct drbd_request, tl_requests);
304 _req_mod(r, BARRIER_ACKED);
306 /* There could be requests on the list waiting for completion
307 of the write to the local disk. To avoid corruptions of
308 slab's data structures we have to remove the lists head.
310 Also there could have been a barrier ack out of sequence, overtaking
311 the write acks - which would be a bug and violating write ordering.
312 To not deadlock in case we lose connection while such requests are
313 still pending, we need some way to find them for the
314 _req_mode(CONNECTION_LOST_WHILE_PENDING).
316 These have been list_move'd to the out_of_sequence_requests list in
317 _req_mod(, BARRIER_ACKED) above.
319 list_del_init(&b->requests);
323 if (test_and_clear_bit(CREATE_BARRIER, &mdev->flags)) {
324 _tl_add_barrier(tconn, b);
326 tconn->oldest_tle = nob;
327 /* if nob == NULL b was the only barrier, and becomes the new
328 barrier. Therefore tconn->oldest_tle points already to b */
330 D_ASSERT(nob != NULL);
331 tconn->oldest_tle = nob;
335 spin_unlock_irq(&tconn->req_lock);
336 dec_ap_pending(mdev);
341 spin_unlock_irq(&tconn->req_lock);
342 conn_request_state(tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
347 * _tl_restart() - Walks the transfer log, and applies an action to all requests
348 * @mdev: DRBD device.
349 * @what: The action/event to perform with all request objects
351 * @what might be one of CONNECTION_LOST_WHILE_PENDING, RESEND, FAIL_FROZEN_DISK_IO,
352 * RESTART_FROZEN_DISK_IO.
354 void _tl_restart(struct drbd_tconn *tconn, enum drbd_req_event what)
356 struct drbd_tl_epoch *b, *tmp, **pn;
357 struct list_head *le, *tle, carry_reads;
358 struct drbd_request *req;
359 int rv, n_writes, n_reads;
361 b = tconn->oldest_tle;
362 pn = &tconn->oldest_tle;
366 INIT_LIST_HEAD(&carry_reads);
367 list_for_each_safe(le, tle, &b->requests) {
368 req = list_entry(le, struct drbd_request, tl_requests);
369 rv = _req_mod(req, what);
371 n_writes += (rv & MR_WRITE) >> MR_WRITE_SHIFT;
372 n_reads += (rv & MR_READ) >> MR_READ_SHIFT;
377 if (what == RESEND) {
378 b->n_writes = n_writes;
379 if (b->w.cb == NULL) {
380 b->w.cb = w_send_barrier;
381 inc_ap_pending(b->w.mdev);
382 set_bit(CREATE_BARRIER, &b->w.mdev->flags);
385 drbd_queue_work(&tconn->data.work, &b->w);
390 list_add(&carry_reads, &b->requests);
391 /* there could still be requests on that ring list,
392 * in case local io is still pending */
393 list_del(&b->requests);
395 /* dec_ap_pending corresponding to queue_barrier.
396 * the newest barrier may not have been queued yet,
397 * in which case w.cb is still NULL. */
399 dec_ap_pending(b->w.mdev);
401 if (b == tconn->newest_tle) {
402 /* recycle, but reinit! */
404 conn_err(tconn, "ASSERT FAILED tmp == NULL");
405 INIT_LIST_HEAD(&b->requests);
406 list_splice(&carry_reads, &b->requests);
407 INIT_LIST_HEAD(&b->w.list);
409 b->br_number = net_random();
419 list_splice(&carry_reads, &b->requests);
425 * tl_clear() - Clears all requests and &struct drbd_tl_epoch objects out of the TL
426 * @mdev: DRBD device.
428 * This is called after the connection to the peer was lost. The storage covered
429 * by the requests on the transfer gets marked as our of sync. Called from the
430 * receiver thread and the worker thread.
432 void tl_clear(struct drbd_tconn *tconn)
434 struct drbd_conf *mdev;
435 struct list_head *le, *tle;
436 struct drbd_request *r;
439 spin_lock_irq(&tconn->req_lock);
441 _tl_restart(tconn, CONNECTION_LOST_WHILE_PENDING);
443 /* we expect this list to be empty. */
444 if (!list_empty(&tconn->out_of_sequence_requests))
445 conn_err(tconn, "ASSERT FAILED list_empty(&out_of_sequence_requests)\n");
447 /* but just in case, clean it up anyways! */
448 list_for_each_safe(le, tle, &tconn->out_of_sequence_requests) {
449 r = list_entry(le, struct drbd_request, tl_requests);
450 /* It would be nice to complete outside of spinlock.
451 * But this is easier for now. */
452 _req_mod(r, CONNECTION_LOST_WHILE_PENDING);
455 /* ensure bit indicating barrier is required is clear */
457 idr_for_each_entry(&tconn->volumes, mdev, vnr)
458 clear_bit(CREATE_BARRIER, &mdev->flags);
461 spin_unlock_irq(&tconn->req_lock);
464 void tl_restart(struct drbd_tconn *tconn, enum drbd_req_event what)
466 spin_lock_irq(&tconn->req_lock);
467 _tl_restart(tconn, what);
468 spin_unlock_irq(&tconn->req_lock);
471 static int drbd_thread_setup(void *arg)
473 struct drbd_thread *thi = (struct drbd_thread *) arg;
474 struct drbd_tconn *tconn = thi->tconn;
478 snprintf(current->comm, sizeof(current->comm), "drbd_%c_%s",
479 thi->name[0], thi->tconn->name);
482 retval = thi->function(thi);
484 spin_lock_irqsave(&thi->t_lock, flags);
486 /* if the receiver has been "EXITING", the last thing it did
487 * was set the conn state to "StandAlone",
488 * if now a re-connect request comes in, conn state goes C_UNCONNECTED,
489 * and receiver thread will be "started".
490 * drbd_thread_start needs to set "RESTARTING" in that case.
491 * t_state check and assignment needs to be within the same spinlock,
492 * so either thread_start sees EXITING, and can remap to RESTARTING,
493 * or thread_start see NONE, and can proceed as normal.
496 if (thi->t_state == RESTARTING) {
497 conn_info(tconn, "Restarting %s thread\n", thi->name);
498 thi->t_state = RUNNING;
499 spin_unlock_irqrestore(&thi->t_lock, flags);
506 complete(&thi->stop);
507 spin_unlock_irqrestore(&thi->t_lock, flags);
509 conn_info(tconn, "Terminating %s\n", current->comm);
511 /* Release mod reference taken when thread was started */
513 kref_put(&tconn->kref, &conn_destroy);
514 module_put(THIS_MODULE);
518 static void drbd_thread_init(struct drbd_tconn *tconn, struct drbd_thread *thi,
519 int (*func) (struct drbd_thread *), char *name)
521 spin_lock_init(&thi->t_lock);
524 thi->function = func;
526 strncpy(thi->name, name, ARRAY_SIZE(thi->name));
529 int drbd_thread_start(struct drbd_thread *thi)
531 struct drbd_tconn *tconn = thi->tconn;
532 struct task_struct *nt;
535 /* is used from state engine doing drbd_thread_stop_nowait,
536 * while holding the req lock irqsave */
537 spin_lock_irqsave(&thi->t_lock, flags);
539 switch (thi->t_state) {
541 conn_info(tconn, "Starting %s thread (from %s [%d])\n",
542 thi->name, current->comm, current->pid);
544 /* Get ref on module for thread - this is released when thread exits */
545 if (!try_module_get(THIS_MODULE)) {
546 conn_err(tconn, "Failed to get module reference in drbd_thread_start\n");
547 spin_unlock_irqrestore(&thi->t_lock, flags);
551 kref_get(&thi->tconn->kref);
553 init_completion(&thi->stop);
554 thi->reset_cpu_mask = 1;
555 thi->t_state = RUNNING;
556 spin_unlock_irqrestore(&thi->t_lock, flags);
557 flush_signals(current); /* otherw. may get -ERESTARTNOINTR */
559 nt = kthread_create(drbd_thread_setup, (void *) thi,
560 "drbd_%c_%s", thi->name[0], thi->tconn->name);
563 conn_err(tconn, "Couldn't start thread\n");
565 kref_put(&tconn->kref, &conn_destroy);
566 module_put(THIS_MODULE);
569 spin_lock_irqsave(&thi->t_lock, flags);
571 thi->t_state = RUNNING;
572 spin_unlock_irqrestore(&thi->t_lock, flags);
576 thi->t_state = RESTARTING;
577 conn_info(tconn, "Restarting %s thread (from %s [%d])\n",
578 thi->name, current->comm, current->pid);
583 spin_unlock_irqrestore(&thi->t_lock, flags);
591 void _drbd_thread_stop(struct drbd_thread *thi, int restart, int wait)
595 enum drbd_thread_state ns = restart ? RESTARTING : EXITING;
597 /* may be called from state engine, holding the req lock irqsave */
598 spin_lock_irqsave(&thi->t_lock, flags);
600 if (thi->t_state == NONE) {
601 spin_unlock_irqrestore(&thi->t_lock, flags);
603 drbd_thread_start(thi);
607 if (thi->t_state != ns) {
608 if (thi->task == NULL) {
609 spin_unlock_irqrestore(&thi->t_lock, flags);
615 init_completion(&thi->stop);
616 if (thi->task != current)
617 force_sig(DRBD_SIGKILL, thi->task);
620 spin_unlock_irqrestore(&thi->t_lock, flags);
623 wait_for_completion(&thi->stop);
626 static struct drbd_thread *drbd_task_to_thread(struct drbd_tconn *tconn, struct task_struct *task)
628 struct drbd_thread *thi =
629 task == tconn->receiver.task ? &tconn->receiver :
630 task == tconn->asender.task ? &tconn->asender :
631 task == tconn->worker.task ? &tconn->worker : NULL;
636 char *drbd_task_to_thread_name(struct drbd_tconn *tconn, struct task_struct *task)
638 struct drbd_thread *thi = drbd_task_to_thread(tconn, task);
639 return thi ? thi->name : task->comm;
642 int conn_lowest_minor(struct drbd_tconn *tconn)
644 struct drbd_conf *mdev;
648 mdev = idr_get_next(&tconn->volumes, &vnr);
649 m = mdev ? mdev_to_minor(mdev) : -1;
657 * drbd_calc_cpu_mask() - Generate CPU masks, spread over all CPUs
658 * @mdev: DRBD device.
660 * Forces all threads of a device onto the same CPU. This is beneficial for
661 * DRBD's performance. May be overwritten by user's configuration.
663 void drbd_calc_cpu_mask(struct drbd_tconn *tconn)
668 if (cpumask_weight(tconn->cpu_mask))
671 ord = conn_lowest_minor(tconn) % cpumask_weight(cpu_online_mask);
672 for_each_online_cpu(cpu) {
674 cpumask_set_cpu(cpu, tconn->cpu_mask);
678 /* should not be reached */
679 cpumask_setall(tconn->cpu_mask);
683 * drbd_thread_current_set_cpu() - modifies the cpu mask of the _current_ thread
684 * @mdev: DRBD device.
685 * @thi: drbd_thread object
687 * call in the "main loop" of _all_ threads, no need for any mutex, current won't die
690 void drbd_thread_current_set_cpu(struct drbd_thread *thi)
692 struct task_struct *p = current;
694 if (!thi->reset_cpu_mask)
696 thi->reset_cpu_mask = 0;
697 set_cpus_allowed_ptr(p, thi->tconn->cpu_mask);
702 * drbd_header_size - size of a packet header
704 * The header size is a multiple of 8, so any payload following the header is
705 * word aligned on 64-bit architectures. (The bitmap send and receive code
708 unsigned int drbd_header_size(struct drbd_tconn *tconn)
710 if (tconn->agreed_pro_version >= 100) {
711 BUILD_BUG_ON(!IS_ALIGNED(sizeof(struct p_header100), 8));
712 return sizeof(struct p_header100);
714 BUILD_BUG_ON(sizeof(struct p_header80) !=
715 sizeof(struct p_header95));
716 BUILD_BUG_ON(!IS_ALIGNED(sizeof(struct p_header80), 8));
717 return sizeof(struct p_header80);
721 static unsigned int prepare_header80(struct p_header80 *h, enum drbd_packet cmd, int size)
723 h->magic = cpu_to_be32(DRBD_MAGIC);
724 h->command = cpu_to_be16(cmd);
725 h->length = cpu_to_be16(size);
726 return sizeof(struct p_header80);
729 static unsigned int prepare_header95(struct p_header95 *h, enum drbd_packet cmd, int size)
731 h->magic = cpu_to_be16(DRBD_MAGIC_BIG);
732 h->command = cpu_to_be16(cmd);
733 h->length = cpu_to_be32(size);
734 return sizeof(struct p_header95);
737 static unsigned int prepare_header100(struct p_header100 *h, enum drbd_packet cmd,
740 h->magic = cpu_to_be32(DRBD_MAGIC_100);
741 h->volume = cpu_to_be16(vnr);
742 h->command = cpu_to_be16(cmd);
743 h->length = cpu_to_be32(size);
745 return sizeof(struct p_header100);
748 static unsigned int prepare_header(struct drbd_tconn *tconn, int vnr,
749 void *buffer, enum drbd_packet cmd, int size)
751 if (tconn->agreed_pro_version >= 100)
752 return prepare_header100(buffer, cmd, size, vnr);
753 else if (tconn->agreed_pro_version >= 95 &&
754 size > DRBD_MAX_SIZE_H80_PACKET)
755 return prepare_header95(buffer, cmd, size);
757 return prepare_header80(buffer, cmd, size);
760 void *conn_prepare_command(struct drbd_tconn *tconn, struct drbd_socket *sock)
762 mutex_lock(&sock->mutex);
764 mutex_unlock(&sock->mutex);
767 return sock->sbuf + drbd_header_size(tconn);
770 void *drbd_prepare_command(struct drbd_conf *mdev, struct drbd_socket *sock)
772 return conn_prepare_command(mdev->tconn, sock);
775 static int __send_command(struct drbd_tconn *tconn, int vnr,
776 struct drbd_socket *sock, enum drbd_packet cmd,
777 unsigned int header_size, void *data,
784 * Called with @data == NULL and the size of the data blocks in @size
785 * for commands that send data blocks. For those commands, omit the
786 * MSG_MORE flag: this will increase the likelihood that data blocks
787 * which are page aligned on the sender will end up page aligned on the
790 msg_flags = data ? MSG_MORE : 0;
792 header_size += prepare_header(tconn, vnr, sock->sbuf, cmd,
794 err = drbd_send_all(tconn, sock->socket, sock->sbuf, header_size,
797 err = drbd_send_all(tconn, sock->socket, data, size, 0);
801 int conn_send_command(struct drbd_tconn *tconn, struct drbd_socket *sock,
802 enum drbd_packet cmd, unsigned int header_size,
803 void *data, unsigned int size)
807 err = __send_command(tconn, 0, sock, cmd, header_size, data, size);
808 mutex_unlock(&sock->mutex);
812 int drbd_send_command(struct drbd_conf *mdev, struct drbd_socket *sock,
813 enum drbd_packet cmd, unsigned int header_size,
814 void *data, unsigned int size)
818 err = __send_command(mdev->tconn, mdev->vnr, sock, cmd, header_size,
820 mutex_unlock(&sock->mutex);
824 int drbd_send_ping(struct drbd_tconn *tconn)
826 struct drbd_socket *sock;
829 if (!conn_prepare_command(tconn, sock))
831 return conn_send_command(tconn, sock, P_PING, 0, NULL, 0);
834 int drbd_send_ping_ack(struct drbd_tconn *tconn)
836 struct drbd_socket *sock;
839 if (!conn_prepare_command(tconn, sock))
841 return conn_send_command(tconn, sock, P_PING_ACK, 0, NULL, 0);
844 int drbd_send_sync_param(struct drbd_conf *mdev)
846 struct drbd_socket *sock;
847 struct p_rs_param_95 *p;
849 const int apv = mdev->tconn->agreed_pro_version;
850 enum drbd_packet cmd;
853 sock = &mdev->tconn->data;
854 p = drbd_prepare_command(mdev, sock);
859 nc = rcu_dereference(mdev->tconn->net_conf);
861 size = apv <= 87 ? sizeof(struct p_rs_param)
862 : apv == 88 ? sizeof(struct p_rs_param)
863 + strlen(nc->verify_alg) + 1
864 : apv <= 94 ? sizeof(struct p_rs_param_89)
865 : /* apv >= 95 */ sizeof(struct p_rs_param_95);
867 cmd = apv >= 89 ? P_SYNC_PARAM89 : P_SYNC_PARAM;
869 /* initialize verify_alg and csums_alg */
870 memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
872 if (get_ldev(mdev)) {
873 p->rate = cpu_to_be32(mdev->ldev->dc.resync_rate);
874 p->c_plan_ahead = cpu_to_be32(mdev->ldev->dc.c_plan_ahead);
875 p->c_delay_target = cpu_to_be32(mdev->ldev->dc.c_delay_target);
876 p->c_fill_target = cpu_to_be32(mdev->ldev->dc.c_fill_target);
877 p->c_max_rate = cpu_to_be32(mdev->ldev->dc.c_max_rate);
880 p->rate = cpu_to_be32(DRBD_RATE_DEF);
881 p->c_plan_ahead = cpu_to_be32(DRBD_C_PLAN_AHEAD_DEF);
882 p->c_delay_target = cpu_to_be32(DRBD_C_DELAY_TARGET_DEF);
883 p->c_fill_target = cpu_to_be32(DRBD_C_FILL_TARGET_DEF);
884 p->c_max_rate = cpu_to_be32(DRBD_C_MAX_RATE_DEF);
888 strcpy(p->verify_alg, nc->verify_alg);
890 strcpy(p->csums_alg, nc->csums_alg);
893 return drbd_send_command(mdev, sock, cmd, size, NULL, 0);
896 int drbd_send_protocol(struct drbd_tconn *tconn)
898 struct drbd_socket *sock;
899 struct p_protocol *p;
904 p = conn_prepare_command(tconn, sock);
909 nc = rcu_dereference(tconn->net_conf);
911 if (nc->dry_run && tconn->agreed_pro_version < 92) {
913 mutex_unlock(&sock->mutex);
914 conn_err(tconn, "--dry-run is not supported by peer");
919 if (tconn->agreed_pro_version >= 87)
920 size += strlen(nc->integrity_alg) + 1;
922 p->protocol = cpu_to_be32(nc->wire_protocol);
923 p->after_sb_0p = cpu_to_be32(nc->after_sb_0p);
924 p->after_sb_1p = cpu_to_be32(nc->after_sb_1p);
925 p->after_sb_2p = cpu_to_be32(nc->after_sb_2p);
926 p->two_primaries = cpu_to_be32(nc->two_primaries);
932 p->conn_flags = cpu_to_be32(cf);
934 if (tconn->agreed_pro_version >= 87)
935 strcpy(p->integrity_alg, nc->integrity_alg);
938 return conn_send_command(tconn, sock, P_PROTOCOL, size, NULL, 0);
941 int _drbd_send_uuids(struct drbd_conf *mdev, u64 uuid_flags)
943 struct drbd_socket *sock;
947 if (!get_ldev_if_state(mdev, D_NEGOTIATING))
950 sock = &mdev->tconn->data;
951 p = drbd_prepare_command(mdev, sock);
956 for (i = UI_CURRENT; i < UI_SIZE; i++)
957 p->uuid[i] = mdev->ldev ? cpu_to_be64(mdev->ldev->md.uuid[i]) : 0;
959 mdev->comm_bm_set = drbd_bm_total_weight(mdev);
960 p->uuid[UI_SIZE] = cpu_to_be64(mdev->comm_bm_set);
962 uuid_flags |= rcu_dereference(mdev->tconn->net_conf)->want_lose ? 1 : 0;
964 uuid_flags |= test_bit(CRASHED_PRIMARY, &mdev->flags) ? 2 : 0;
965 uuid_flags |= mdev->new_state_tmp.disk == D_INCONSISTENT ? 4 : 0;
966 p->uuid[UI_FLAGS] = cpu_to_be64(uuid_flags);
969 return drbd_send_command(mdev, sock, P_UUIDS, sizeof(*p), NULL, 0);
972 int drbd_send_uuids(struct drbd_conf *mdev)
974 return _drbd_send_uuids(mdev, 0);
977 int drbd_send_uuids_skip_initial_sync(struct drbd_conf *mdev)
979 return _drbd_send_uuids(mdev, 8);
982 void drbd_print_uuids(struct drbd_conf *mdev, const char *text)
984 if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
985 u64 *uuid = mdev->ldev->md.uuid;
986 dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX\n",
988 (unsigned long long)uuid[UI_CURRENT],
989 (unsigned long long)uuid[UI_BITMAP],
990 (unsigned long long)uuid[UI_HISTORY_START],
991 (unsigned long long)uuid[UI_HISTORY_END]);
994 dev_info(DEV, "%s effective data uuid: %016llX\n",
996 (unsigned long long)mdev->ed_uuid);
1000 void drbd_gen_and_send_sync_uuid(struct drbd_conf *mdev)
1002 struct drbd_socket *sock;
1003 struct p_rs_uuid *p;
1006 D_ASSERT(mdev->state.disk == D_UP_TO_DATE);
1008 uuid = mdev->ldev->md.uuid[UI_BITMAP] + UUID_NEW_BM_OFFSET;
1009 drbd_uuid_set(mdev, UI_BITMAP, uuid);
1010 drbd_print_uuids(mdev, "updated sync UUID");
1013 sock = &mdev->tconn->data;
1014 p = drbd_prepare_command(mdev, sock);
1016 p->uuid = cpu_to_be64(uuid);
1017 drbd_send_command(mdev, sock, P_SYNC_UUID, sizeof(*p), NULL, 0);
1021 int drbd_send_sizes(struct drbd_conf *mdev, int trigger_reply, enum dds_flags flags)
1023 struct drbd_socket *sock;
1025 sector_t d_size, u_size;
1026 int q_order_type, max_bio_size;
1028 if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
1029 D_ASSERT(mdev->ldev->backing_bdev);
1030 d_size = drbd_get_max_capacity(mdev->ldev);
1031 u_size = mdev->ldev->dc.disk_size;
1032 q_order_type = drbd_queue_order_type(mdev);
1033 max_bio_size = queue_max_hw_sectors(mdev->ldev->backing_bdev->bd_disk->queue) << 9;
1034 max_bio_size = min_t(int, max_bio_size, DRBD_MAX_BIO_SIZE);
1039 q_order_type = QUEUE_ORDERED_NONE;
1040 max_bio_size = DRBD_MAX_BIO_SIZE; /* ... multiple BIOs per peer_request */
1043 sock = &mdev->tconn->data;
1044 p = drbd_prepare_command(mdev, sock);
1047 p->d_size = cpu_to_be64(d_size);
1048 p->u_size = cpu_to_be64(u_size);
1049 p->c_size = cpu_to_be64(trigger_reply ? 0 : drbd_get_capacity(mdev->this_bdev));
1050 p->max_bio_size = cpu_to_be32(max_bio_size);
1051 p->queue_order_type = cpu_to_be16(q_order_type);
1052 p->dds_flags = cpu_to_be16(flags);
1053 return drbd_send_command(mdev, sock, P_SIZES, sizeof(*p), NULL, 0);
1057 * drbd_send_state() - Sends the drbd state to the peer
1058 * @mdev: DRBD device.
1060 int drbd_send_state(struct drbd_conf *mdev)
1062 struct drbd_socket *sock;
1065 sock = &mdev->tconn->data;
1066 p = drbd_prepare_command(mdev, sock);
1069 p->state = cpu_to_be32(mdev->state.i); /* Within the send mutex */
1070 return drbd_send_command(mdev, sock, P_STATE, sizeof(*p), NULL, 0);
1073 int drbd_send_state_req(struct drbd_conf *mdev, union drbd_state mask, union drbd_state val)
1075 struct drbd_socket *sock;
1076 struct p_req_state *p;
1078 sock = &mdev->tconn->data;
1079 p = drbd_prepare_command(mdev, sock);
1082 p->mask = cpu_to_be32(mask.i);
1083 p->val = cpu_to_be32(val.i);
1084 return drbd_send_command(mdev, sock, P_STATE_CHG_REQ, sizeof(*p), NULL, 0);
1088 int conn_send_state_req(struct drbd_tconn *tconn, union drbd_state mask, union drbd_state val)
1090 enum drbd_packet cmd;
1091 struct drbd_socket *sock;
1092 struct p_req_state *p;
1094 cmd = tconn->agreed_pro_version < 100 ? P_STATE_CHG_REQ : P_CONN_ST_CHG_REQ;
1095 sock = &tconn->data;
1096 p = conn_prepare_command(tconn, sock);
1099 p->mask = cpu_to_be32(mask.i);
1100 p->val = cpu_to_be32(val.i);
1101 return conn_send_command(tconn, sock, cmd, sizeof(*p), NULL, 0);
1104 void drbd_send_sr_reply(struct drbd_conf *mdev, enum drbd_state_rv retcode)
1106 struct drbd_socket *sock;
1107 struct p_req_state_reply *p;
1109 sock = &mdev->tconn->meta;
1110 p = drbd_prepare_command(mdev, sock);
1112 p->retcode = cpu_to_be32(retcode);
1113 drbd_send_command(mdev, sock, P_STATE_CHG_REPLY, sizeof(*p), NULL, 0);
1117 void conn_send_sr_reply(struct drbd_tconn *tconn, enum drbd_state_rv retcode)
1119 struct drbd_socket *sock;
1120 struct p_req_state_reply *p;
1121 enum drbd_packet cmd = tconn->agreed_pro_version < 100 ? P_STATE_CHG_REPLY : P_CONN_ST_CHG_REPLY;
1123 sock = &tconn->meta;
1124 p = conn_prepare_command(tconn, sock);
1126 p->retcode = cpu_to_be32(retcode);
1127 conn_send_command(tconn, sock, cmd, sizeof(*p), NULL, 0);
1131 static void dcbp_set_code(struct p_compressed_bm *p, enum drbd_bitmap_code code)
1133 BUG_ON(code & ~0xf);
1134 p->encoding = (p->encoding & ~0xf) | code;
1137 static void dcbp_set_start(struct p_compressed_bm *p, int set)
1139 p->encoding = (p->encoding & ~0x80) | (set ? 0x80 : 0);
1142 static void dcbp_set_pad_bits(struct p_compressed_bm *p, int n)
1145 p->encoding = (p->encoding & (~0x7 << 4)) | (n << 4);
1148 int fill_bitmap_rle_bits(struct drbd_conf *mdev,
1149 struct p_compressed_bm *p,
1151 struct bm_xfer_ctx *c)
1153 struct bitstream bs;
1154 unsigned long plain_bits;
1161 /* may we use this feature? */
1163 use_rle = rcu_dereference(mdev->tconn->net_conf)->use_rle;
1165 if (!use_rle || mdev->tconn->agreed_pro_version < 90)
1168 if (c->bit_offset >= c->bm_bits)
1169 return 0; /* nothing to do. */
1171 /* use at most thus many bytes */
1172 bitstream_init(&bs, p->code, size, 0);
1173 memset(p->code, 0, size);
1174 /* plain bits covered in this code string */
1177 /* p->encoding & 0x80 stores whether the first run length is set.
1178 * bit offset is implicit.
1179 * start with toggle == 2 to be able to tell the first iteration */
1182 /* see how much plain bits we can stuff into one packet
1183 * using RLE and VLI. */
1185 tmp = (toggle == 0) ? _drbd_bm_find_next_zero(mdev, c->bit_offset)
1186 : _drbd_bm_find_next(mdev, c->bit_offset);
1189 rl = tmp - c->bit_offset;
1191 if (toggle == 2) { /* first iteration */
1193 /* the first checked bit was set,
1194 * store start value, */
1195 dcbp_set_start(p, 1);
1196 /* but skip encoding of zero run length */
1200 dcbp_set_start(p, 0);
1203 /* paranoia: catch zero runlength.
1204 * can only happen if bitmap is modified while we scan it. */
1206 dev_err(DEV, "unexpected zero runlength while encoding bitmap "
1207 "t:%u bo:%lu\n", toggle, c->bit_offset);
1211 bits = vli_encode_bits(&bs, rl);
1212 if (bits == -ENOBUFS) /* buffer full */
1215 dev_err(DEV, "error while encoding bitmap: %d\n", bits);
1221 c->bit_offset = tmp;
1222 } while (c->bit_offset < c->bm_bits);
1224 len = bs.cur.b - p->code + !!bs.cur.bit;
1226 if (plain_bits < (len << 3)) {
1227 /* incompressible with this method.
1228 * we need to rewind both word and bit position. */
1229 c->bit_offset -= plain_bits;
1230 bm_xfer_ctx_bit_to_word_offset(c);
1231 c->bit_offset = c->word_offset * BITS_PER_LONG;
1235 /* RLE + VLI was able to compress it just fine.
1236 * update c->word_offset. */
1237 bm_xfer_ctx_bit_to_word_offset(c);
1239 /* store pad_bits */
1240 dcbp_set_pad_bits(p, (8 - bs.cur.bit) & 0x7);
1246 * send_bitmap_rle_or_plain
1248 * Return 0 when done, 1 when another iteration is needed, and a negative error
1249 * code upon failure.
1252 send_bitmap_rle_or_plain(struct drbd_conf *mdev, struct bm_xfer_ctx *c)
1254 struct drbd_socket *sock = &mdev->tconn->data;
1255 unsigned int header_size = drbd_header_size(mdev->tconn);
1256 struct p_compressed_bm *p = sock->sbuf + header_size;
1259 len = fill_bitmap_rle_bits(mdev, p,
1260 DRBD_SOCKET_BUFFER_SIZE - header_size - sizeof(*p), c);
1265 dcbp_set_code(p, RLE_VLI_Bits);
1266 err = __send_command(mdev->tconn, mdev->vnr, sock,
1267 P_COMPRESSED_BITMAP, sizeof(*p) + len,
1270 c->bytes[0] += header_size + sizeof(*p) + len;
1272 if (c->bit_offset >= c->bm_bits)
1275 /* was not compressible.
1276 * send a buffer full of plain text bits instead. */
1277 unsigned int data_size;
1278 unsigned long num_words;
1279 unsigned long *p = sock->sbuf + header_size;
1281 data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
1282 num_words = min_t(size_t, data_size / sizeof(*p),
1283 c->bm_words - c->word_offset);
1284 len = num_words * sizeof(*p);
1286 drbd_bm_get_lel(mdev, c->word_offset, num_words, p);
1287 err = __send_command(mdev->tconn, mdev->vnr, sock, P_BITMAP, len, NULL, 0);
1288 c->word_offset += num_words;
1289 c->bit_offset = c->word_offset * BITS_PER_LONG;
1292 c->bytes[1] += header_size + len;
1294 if (c->bit_offset > c->bm_bits)
1295 c->bit_offset = c->bm_bits;
1299 INFO_bm_xfer_stats(mdev, "send", c);
1307 /* See the comment at receive_bitmap() */
1308 static int _drbd_send_bitmap(struct drbd_conf *mdev)
1310 struct bm_xfer_ctx c;
1313 if (!expect(mdev->bitmap))
1316 if (get_ldev(mdev)) {
1317 if (drbd_md_test_flag(mdev->ldev, MDF_FULL_SYNC)) {
1318 dev_info(DEV, "Writing the whole bitmap, MDF_FullSync was set.\n");
1319 drbd_bm_set_all(mdev);
1320 if (drbd_bm_write(mdev)) {
1321 /* write_bm did fail! Leave full sync flag set in Meta P_DATA
1322 * but otherwise process as per normal - need to tell other
1323 * side that a full resync is required! */
1324 dev_err(DEV, "Failed to write bitmap to disk!\n");
1326 drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
1333 c = (struct bm_xfer_ctx) {
1334 .bm_bits = drbd_bm_bits(mdev),
1335 .bm_words = drbd_bm_words(mdev),
1339 err = send_bitmap_rle_or_plain(mdev, &c);
1345 int drbd_send_bitmap(struct drbd_conf *mdev)
1347 struct drbd_socket *sock = &mdev->tconn->data;
1350 mutex_lock(&sock->mutex);
1352 err = !_drbd_send_bitmap(mdev);
1353 mutex_unlock(&sock->mutex);
1357 void drbd_send_b_ack(struct drbd_conf *mdev, u32 barrier_nr, u32 set_size)
1359 struct drbd_socket *sock;
1360 struct p_barrier_ack *p;
1362 if (mdev->state.conn < C_CONNECTED)
1365 sock = &mdev->tconn->meta;
1366 p = drbd_prepare_command(mdev, sock);
1369 p->barrier = barrier_nr;
1370 p->set_size = cpu_to_be32(set_size);
1371 drbd_send_command(mdev, sock, P_BARRIER_ACK, sizeof(*p), NULL, 0);
1375 * _drbd_send_ack() - Sends an ack packet
1376 * @mdev: DRBD device.
1377 * @cmd: Packet command code.
1378 * @sector: sector, needs to be in big endian byte order
1379 * @blksize: size in byte, needs to be in big endian byte order
1380 * @block_id: Id, big endian byte order
1382 static int _drbd_send_ack(struct drbd_conf *mdev, enum drbd_packet cmd,
1383 u64 sector, u32 blksize, u64 block_id)
1385 struct drbd_socket *sock;
1386 struct p_block_ack *p;
1388 if (mdev->state.conn < C_CONNECTED)
1391 sock = &mdev->tconn->meta;
1392 p = drbd_prepare_command(mdev, sock);
1396 p->block_id = block_id;
1397 p->blksize = blksize;
1398 p->seq_num = cpu_to_be32(atomic_inc_return(&mdev->packet_seq));
1399 return drbd_send_command(mdev, sock, cmd, sizeof(*p), NULL, 0);
1402 /* dp->sector and dp->block_id already/still in network byte order,
1403 * data_size is payload size according to dp->head,
1404 * and may need to be corrected for digest size. */
1405 void drbd_send_ack_dp(struct drbd_conf *mdev, enum drbd_packet cmd,
1406 struct p_data *dp, int data_size)
1408 data_size -= (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_r_tfm) ?
1409 crypto_hash_digestsize(mdev->tconn->integrity_r_tfm) : 0;
1410 _drbd_send_ack(mdev, cmd, dp->sector, cpu_to_be32(data_size),
1414 void drbd_send_ack_rp(struct drbd_conf *mdev, enum drbd_packet cmd,
1415 struct p_block_req *rp)
1417 _drbd_send_ack(mdev, cmd, rp->sector, rp->blksize, rp->block_id);
1421 * drbd_send_ack() - Sends an ack packet
1422 * @mdev: DRBD device
1423 * @cmd: packet command code
1424 * @peer_req: peer request
1426 int drbd_send_ack(struct drbd_conf *mdev, enum drbd_packet cmd,
1427 struct drbd_peer_request *peer_req)
1429 return _drbd_send_ack(mdev, cmd,
1430 cpu_to_be64(peer_req->i.sector),
1431 cpu_to_be32(peer_req->i.size),
1432 peer_req->block_id);
1435 /* This function misuses the block_id field to signal if the blocks
1436 * are is sync or not. */
1437 int drbd_send_ack_ex(struct drbd_conf *mdev, enum drbd_packet cmd,
1438 sector_t sector, int blksize, u64 block_id)
1440 return _drbd_send_ack(mdev, cmd,
1441 cpu_to_be64(sector),
1442 cpu_to_be32(blksize),
1443 cpu_to_be64(block_id));
1446 int drbd_send_drequest(struct drbd_conf *mdev, int cmd,
1447 sector_t sector, int size, u64 block_id)
1449 struct drbd_socket *sock;
1450 struct p_block_req *p;
1452 sock = &mdev->tconn->data;
1453 p = drbd_prepare_command(mdev, sock);
1456 p->sector = cpu_to_be64(sector);
1457 p->block_id = block_id;
1458 p->blksize = cpu_to_be32(size);
1459 return drbd_send_command(mdev, sock, cmd, sizeof(*p), NULL, 0);
1462 int drbd_send_drequest_csum(struct drbd_conf *mdev, sector_t sector, int size,
1463 void *digest, int digest_size, enum drbd_packet cmd)
1465 struct drbd_socket *sock;
1466 struct p_block_req *p;
1468 /* FIXME: Put the digest into the preallocated socket buffer. */
1470 sock = &mdev->tconn->data;
1471 p = drbd_prepare_command(mdev, sock);
1474 p->sector = cpu_to_be64(sector);
1475 p->block_id = ID_SYNCER /* unused */;
1476 p->blksize = cpu_to_be32(size);
1477 return drbd_send_command(mdev, sock, cmd, sizeof(*p),
1478 digest, digest_size);
1481 int drbd_send_ov_request(struct drbd_conf *mdev, sector_t sector, int size)
1483 struct drbd_socket *sock;
1484 struct p_block_req *p;
1486 sock = &mdev->tconn->data;
1487 p = drbd_prepare_command(mdev, sock);
1490 p->sector = cpu_to_be64(sector);
1491 p->block_id = ID_SYNCER /* unused */;
1492 p->blksize = cpu_to_be32(size);
1493 return drbd_send_command(mdev, sock, P_OV_REQUEST, sizeof(*p), NULL, 0);
1496 /* called on sndtimeo
1497 * returns false if we should retry,
1498 * true if we think connection is dead
1500 static int we_should_drop_the_connection(struct drbd_tconn *tconn, struct socket *sock)
1503 /* long elapsed = (long)(jiffies - mdev->last_received); */
1505 drop_it = tconn->meta.socket == sock
1506 || !tconn->asender.task
1507 || get_t_state(&tconn->asender) != RUNNING
1508 || tconn->cstate < C_WF_REPORT_PARAMS;
1513 drop_it = !--tconn->ko_count;
1515 conn_err(tconn, "[%s/%d] sock_sendmsg time expired, ko = %u\n",
1516 current->comm, current->pid, tconn->ko_count);
1517 request_ping(tconn);
1520 return drop_it; /* && (mdev->state == R_PRIMARY) */;
1523 static void drbd_update_congested(struct drbd_tconn *tconn)
1525 struct sock *sk = tconn->data.socket->sk;
1526 if (sk->sk_wmem_queued > sk->sk_sndbuf * 4 / 5)
1527 set_bit(NET_CONGESTED, &tconn->flags);
1530 /* The idea of sendpage seems to be to put some kind of reference
1531 * to the page into the skb, and to hand it over to the NIC. In
1532 * this process get_page() gets called.
1534 * As soon as the page was really sent over the network put_page()
1535 * gets called by some part of the network layer. [ NIC driver? ]
1537 * [ get_page() / put_page() increment/decrement the count. If count
1538 * reaches 0 the page will be freed. ]
1540 * This works nicely with pages from FSs.
1541 * But this means that in protocol A we might signal IO completion too early!
1543 * In order not to corrupt data during a resync we must make sure
1544 * that we do not reuse our own buffer pages (EEs) to early, therefore
1545 * we have the net_ee list.
1547 * XFS seems to have problems, still, it submits pages with page_count == 0!
1548 * As a workaround, we disable sendpage on pages
1549 * with page_count == 0 or PageSlab.
1551 static int _drbd_no_send_page(struct drbd_conf *mdev, struct page *page,
1552 int offset, size_t size, unsigned msg_flags)
1554 struct socket *socket;
1558 socket = mdev->tconn->data.socket;
1559 addr = kmap(page) + offset;
1560 err = drbd_send_all(mdev->tconn, socket, addr, size, msg_flags);
1563 mdev->send_cnt += size >> 9;
1567 static int _drbd_send_page(struct drbd_conf *mdev, struct page *page,
1568 int offset, size_t size, unsigned msg_flags)
1570 struct socket *socket = mdev->tconn->data.socket;
1571 mm_segment_t oldfs = get_fs();
1575 /* e.g. XFS meta- & log-data is in slab pages, which have a
1576 * page_count of 0 and/or have PageSlab() set.
1577 * we cannot use send_page for those, as that does get_page();
1578 * put_page(); and would cause either a VM_BUG directly, or
1579 * __page_cache_release a page that would actually still be referenced
1580 * by someone, leading to some obscure delayed Oops somewhere else. */
1581 if (disable_sendpage || (page_count(page) < 1) || PageSlab(page))
1582 return _drbd_no_send_page(mdev, page, offset, size, msg_flags);
1584 msg_flags |= MSG_NOSIGNAL;
1585 drbd_update_congested(mdev->tconn);
1590 sent = socket->ops->sendpage(socket, page, offset, len, msg_flags);
1592 if (sent == -EAGAIN) {
1593 if (we_should_drop_the_connection(mdev->tconn, socket))
1597 dev_warn(DEV, "%s: size=%d len=%d sent=%d\n",
1598 __func__, (int)size, len, sent);
1605 } while (len > 0 /* THINK && mdev->cstate >= C_CONNECTED*/);
1607 clear_bit(NET_CONGESTED, &mdev->tconn->flags);
1611 mdev->send_cnt += size >> 9;
1616 static int _drbd_send_bio(struct drbd_conf *mdev, struct bio *bio)
1618 struct bio_vec *bvec;
1620 /* hint all but last page with MSG_MORE */
1621 __bio_for_each_segment(bvec, bio, i, 0) {
1624 err = _drbd_no_send_page(mdev, bvec->bv_page,
1625 bvec->bv_offset, bvec->bv_len,
1626 i == bio->bi_vcnt - 1 ? 0 : MSG_MORE);
1633 static int _drbd_send_zc_bio(struct drbd_conf *mdev, struct bio *bio)
1635 struct bio_vec *bvec;
1637 /* hint all but last page with MSG_MORE */
1638 __bio_for_each_segment(bvec, bio, i, 0) {
1641 err = _drbd_send_page(mdev, bvec->bv_page,
1642 bvec->bv_offset, bvec->bv_len,
1643 i == bio->bi_vcnt - 1 ? 0 : MSG_MORE);
1650 static int _drbd_send_zc_ee(struct drbd_conf *mdev,
1651 struct drbd_peer_request *peer_req)
1653 struct page *page = peer_req->pages;
1654 unsigned len = peer_req->i.size;
1657 /* hint all but last page with MSG_MORE */
1658 page_chain_for_each(page) {
1659 unsigned l = min_t(unsigned, len, PAGE_SIZE);
1661 err = _drbd_send_page(mdev, page, 0, l,
1662 page_chain_next(page) ? MSG_MORE : 0);
1670 static u32 bio_flags_to_wire(struct drbd_conf *mdev, unsigned long bi_rw)
1672 if (mdev->tconn->agreed_pro_version >= 95)
1673 return (bi_rw & REQ_SYNC ? DP_RW_SYNC : 0) |
1674 (bi_rw & REQ_FUA ? DP_FUA : 0) |
1675 (bi_rw & REQ_FLUSH ? DP_FLUSH : 0) |
1676 (bi_rw & REQ_DISCARD ? DP_DISCARD : 0);
1678 return bi_rw & REQ_SYNC ? DP_RW_SYNC : 0;
1681 /* Used to send write requests
1682 * R_PRIMARY -> Peer (P_DATA)
1684 int drbd_send_dblock(struct drbd_conf *mdev, struct drbd_request *req)
1686 struct drbd_socket *sock;
1688 unsigned int dp_flags = 0;
1692 dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_w_tfm) ?
1693 crypto_hash_digestsize(mdev->tconn->integrity_w_tfm) : 0;
1695 sock = &mdev->tconn->data;
1696 p = drbd_prepare_command(mdev, sock);
1699 p->sector = cpu_to_be64(req->i.sector);
1700 p->block_id = (unsigned long)req;
1701 p->seq_num = cpu_to_be32(req->seq_num = atomic_inc_return(&mdev->packet_seq));
1702 dp_flags = bio_flags_to_wire(mdev, req->master_bio->bi_rw);
1703 if (mdev->state.conn >= C_SYNC_SOURCE &&
1704 mdev->state.conn <= C_PAUSED_SYNC_T)
1705 dp_flags |= DP_MAY_SET_IN_SYNC;
1706 if (mdev->tconn->agreed_pro_version >= 100) {
1707 if (req->rq_state & RQ_EXP_RECEIVE_ACK)
1708 dp_flags |= DP_SEND_RECEIVE_ACK;
1709 if (req->rq_state & RQ_EXP_WRITE_ACK)
1710 dp_flags |= DP_SEND_WRITE_ACK;
1712 p->dp_flags = cpu_to_be32(dp_flags);
1714 drbd_csum_bio(mdev, mdev->tconn->integrity_w_tfm, req->master_bio, p + 1);
1715 err = __send_command(mdev->tconn, mdev->vnr, sock, P_DATA, sizeof(*p) + dgs, NULL, req->i.size);
1717 /* For protocol A, we have to memcpy the payload into
1718 * socket buffers, as we may complete right away
1719 * as soon as we handed it over to tcp, at which point the data
1720 * pages may become invalid.
1722 * For data-integrity enabled, we copy it as well, so we can be
1723 * sure that even if the bio pages may still be modified, it
1724 * won't change the data on the wire, thus if the digest checks
1725 * out ok after sending on this side, but does not fit on the
1726 * receiving side, we sure have detected corruption elsewhere.
1728 if (!(req->rq_state & (RQ_EXP_RECEIVE_ACK | RQ_EXP_WRITE_ACK)) || dgs)
1729 err = _drbd_send_bio(mdev, req->master_bio);
1731 err = _drbd_send_zc_bio(mdev, req->master_bio);
1733 /* double check digest, sometimes buffers have been modified in flight. */
1734 if (dgs > 0 && dgs <= 64) {
1735 /* 64 byte, 512 bit, is the largest digest size
1736 * currently supported in kernel crypto. */
1737 unsigned char digest[64];
1738 drbd_csum_bio(mdev, mdev->tconn->integrity_w_tfm, req->master_bio, digest);
1739 if (memcmp(p + 1, digest, dgs)) {
1741 "Digest mismatch, buffer modified by upper layers during write: %llus +%u\n",
1742 (unsigned long long)req->i.sector, req->i.size);
1744 } /* else if (dgs > 64) {
1745 ... Be noisy about digest too large ...
1748 mutex_unlock(&sock->mutex); /* locked by drbd_prepare_command() */
1753 /* answer packet, used to send data back for read requests:
1754 * Peer -> (diskless) R_PRIMARY (P_DATA_REPLY)
1755 * C_SYNC_SOURCE -> C_SYNC_TARGET (P_RS_DATA_REPLY)
1757 int drbd_send_block(struct drbd_conf *mdev, enum drbd_packet cmd,
1758 struct drbd_peer_request *peer_req)
1760 struct drbd_socket *sock;
1765 dgs = (mdev->tconn->agreed_pro_version >= 87 && mdev->tconn->integrity_w_tfm) ?
1766 crypto_hash_digestsize(mdev->tconn->integrity_w_tfm) : 0;
1768 sock = &mdev->tconn->data;
1769 p = drbd_prepare_command(mdev, sock);
1772 p->sector = cpu_to_be64(peer_req->i.sector);
1773 p->block_id = peer_req->block_id;
1774 p->seq_num = 0; /* unused */
1776 drbd_csum_ee(mdev, mdev->tconn->integrity_w_tfm, peer_req, p + 1);
1777 err = __send_command(mdev->tconn, mdev->vnr, sock, cmd, sizeof(*p) + dgs, NULL, peer_req->i.size);
1779 err = _drbd_send_zc_ee(mdev, peer_req);
1780 mutex_unlock(&sock->mutex); /* locked by drbd_prepare_command() */
1785 int drbd_send_out_of_sync(struct drbd_conf *mdev, struct drbd_request *req)
1787 struct drbd_socket *sock;
1788 struct p_block_desc *p;
1790 sock = &mdev->tconn->data;
1791 p = drbd_prepare_command(mdev, sock);
1794 p->sector = cpu_to_be64(req->i.sector);
1795 p->blksize = cpu_to_be32(req->i.size);
1796 return drbd_send_command(mdev, sock, P_OUT_OF_SYNC, sizeof(*p), NULL, 0);
1800 drbd_send distinguishes two cases:
1802 Packets sent via the data socket "sock"
1803 and packets sent via the meta data socket "msock"
1806 -----------------+-------------------------+------------------------------
1807 timeout conf.timeout / 2 conf.timeout / 2
1808 timeout action send a ping via msock Abort communication
1809 and close all sockets
1813 * you must have down()ed the appropriate [m]sock_mutex elsewhere!
1815 int drbd_send(struct drbd_tconn *tconn, struct socket *sock,
1816 void *buf, size_t size, unsigned msg_flags)
1825 /* THINK if (signal_pending) return ... ? */
1830 msg.msg_name = NULL;
1831 msg.msg_namelen = 0;
1832 msg.msg_control = NULL;
1833 msg.msg_controllen = 0;
1834 msg.msg_flags = msg_flags | MSG_NOSIGNAL;
1836 if (sock == tconn->data.socket) {
1838 tconn->ko_count = rcu_dereference(tconn->net_conf)->ko_count;
1840 drbd_update_congested(tconn);
1844 * tcp_sendmsg does _not_ use its size parameter at all ?
1846 * -EAGAIN on timeout, -EINTR on signal.
1849 * do we need to block DRBD_SIG if sock == &meta.socket ??
1850 * otherwise wake_asender() might interrupt some send_*Ack !
1852 rv = kernel_sendmsg(sock, &msg, &iov, 1, size);
1853 if (rv == -EAGAIN) {
1854 if (we_should_drop_the_connection(tconn, sock))
1860 flush_signals(current);
1868 } while (sent < size);
1870 if (sock == tconn->data.socket)
1871 clear_bit(NET_CONGESTED, &tconn->flags);
1874 if (rv != -EAGAIN) {
1875 conn_err(tconn, "%s_sendmsg returned %d\n",
1876 sock == tconn->meta.socket ? "msock" : "sock",
1878 conn_request_state(tconn, NS(conn, C_BROKEN_PIPE), CS_HARD);
1880 conn_request_state(tconn, NS(conn, C_TIMEOUT), CS_HARD);
1887 * drbd_send_all - Send an entire buffer
1889 * Returns 0 upon success and a negative error value otherwise.
1891 int drbd_send_all(struct drbd_tconn *tconn, struct socket *sock, void *buffer,
1892 size_t size, unsigned msg_flags)
1896 err = drbd_send(tconn, sock, buffer, size, msg_flags);
1904 static int drbd_open(struct block_device *bdev, fmode_t mode)
1906 struct drbd_conf *mdev = bdev->bd_disk->private_data;
1907 unsigned long flags;
1910 mutex_lock(&drbd_main_mutex);
1911 spin_lock_irqsave(&mdev->tconn->req_lock, flags);
1912 /* to have a stable mdev->state.role
1913 * and no race with updating open_cnt */
1915 if (mdev->state.role != R_PRIMARY) {
1916 if (mode & FMODE_WRITE)
1918 else if (!allow_oos)
1924 spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
1925 mutex_unlock(&drbd_main_mutex);
1930 static int drbd_release(struct gendisk *gd, fmode_t mode)
1932 struct drbd_conf *mdev = gd->private_data;
1933 mutex_lock(&drbd_main_mutex);
1935 mutex_unlock(&drbd_main_mutex);
1939 static void drbd_set_defaults(struct drbd_conf *mdev)
1941 /* Beware! The actual layout differs
1942 * between big endian and little endian */
1943 mdev->state = (union drbd_dev_state) {
1944 { .role = R_SECONDARY,
1946 .conn = C_STANDALONE,
1952 void drbd_init_set_defaults(struct drbd_conf *mdev)
1954 /* the memset(,0,) did most of this.
1955 * note: only assignments, no allocation in here */
1957 drbd_set_defaults(mdev);
1959 atomic_set(&mdev->ap_bio_cnt, 0);
1960 atomic_set(&mdev->ap_pending_cnt, 0);
1961 atomic_set(&mdev->rs_pending_cnt, 0);
1962 atomic_set(&mdev->unacked_cnt, 0);
1963 atomic_set(&mdev->local_cnt, 0);
1964 atomic_set(&mdev->pp_in_use_by_net, 0);
1965 atomic_set(&mdev->rs_sect_in, 0);
1966 atomic_set(&mdev->rs_sect_ev, 0);
1967 atomic_set(&mdev->ap_in_flight, 0);
1969 mutex_init(&mdev->md_io_mutex);
1970 mutex_init(&mdev->own_state_mutex);
1971 mdev->state_mutex = &mdev->own_state_mutex;
1973 spin_lock_init(&mdev->al_lock);
1974 spin_lock_init(&mdev->peer_seq_lock);
1975 spin_lock_init(&mdev->epoch_lock);
1977 INIT_LIST_HEAD(&mdev->active_ee);
1978 INIT_LIST_HEAD(&mdev->sync_ee);
1979 INIT_LIST_HEAD(&mdev->done_ee);
1980 INIT_LIST_HEAD(&mdev->read_ee);
1981 INIT_LIST_HEAD(&mdev->net_ee);
1982 INIT_LIST_HEAD(&mdev->resync_reads);
1983 INIT_LIST_HEAD(&mdev->resync_work.list);
1984 INIT_LIST_HEAD(&mdev->unplug_work.list);
1985 INIT_LIST_HEAD(&mdev->go_diskless.list);
1986 INIT_LIST_HEAD(&mdev->md_sync_work.list);
1987 INIT_LIST_HEAD(&mdev->start_resync_work.list);
1988 INIT_LIST_HEAD(&mdev->bm_io_work.w.list);
1990 mdev->resync_work.cb = w_resync_timer;
1991 mdev->unplug_work.cb = w_send_write_hint;
1992 mdev->go_diskless.cb = w_go_diskless;
1993 mdev->md_sync_work.cb = w_md_sync;
1994 mdev->bm_io_work.w.cb = w_bitmap_io;
1995 mdev->start_resync_work.cb = w_start_resync;
1997 mdev->resync_work.mdev = mdev;
1998 mdev->unplug_work.mdev = mdev;
1999 mdev->go_diskless.mdev = mdev;
2000 mdev->md_sync_work.mdev = mdev;
2001 mdev->bm_io_work.w.mdev = mdev;
2002 mdev->start_resync_work.mdev = mdev;
2004 init_timer(&mdev->resync_timer);
2005 init_timer(&mdev->md_sync_timer);
2006 init_timer(&mdev->start_resync_timer);
2007 init_timer(&mdev->request_timer);
2008 mdev->resync_timer.function = resync_timer_fn;
2009 mdev->resync_timer.data = (unsigned long) mdev;
2010 mdev->md_sync_timer.function = md_sync_timer_fn;
2011 mdev->md_sync_timer.data = (unsigned long) mdev;
2012 mdev->start_resync_timer.function = start_resync_timer_fn;
2013 mdev->start_resync_timer.data = (unsigned long) mdev;
2014 mdev->request_timer.function = request_timer_fn;
2015 mdev->request_timer.data = (unsigned long) mdev;
2017 init_waitqueue_head(&mdev->misc_wait);
2018 init_waitqueue_head(&mdev->state_wait);
2019 init_waitqueue_head(&mdev->ee_wait);
2020 init_waitqueue_head(&mdev->al_wait);
2021 init_waitqueue_head(&mdev->seq_wait);
2023 /* mdev->tconn->agreed_pro_version gets initialized in drbd_connect() */
2024 mdev->write_ordering = WO_bdev_flush;
2025 mdev->resync_wenr = LC_FREE;
2026 mdev->peer_max_bio_size = DRBD_MAX_BIO_SIZE_SAFE;
2027 mdev->local_max_bio_size = DRBD_MAX_BIO_SIZE_SAFE;
2030 void drbd_mdev_cleanup(struct drbd_conf *mdev)
2033 if (mdev->tconn->receiver.t_state != NONE)
2034 dev_err(DEV, "ASSERT FAILED: receiver t_state == %d expected 0.\n",
2035 mdev->tconn->receiver.t_state);
2037 /* no need to lock it, I'm the only thread alive */
2038 if (atomic_read(&mdev->current_epoch->epoch_size) != 0)
2039 dev_err(DEV, "epoch_size:%d\n", atomic_read(&mdev->current_epoch->epoch_size));
2049 mdev->rs_failed = 0;
2050 mdev->rs_last_events = 0;
2051 mdev->rs_last_sect_ev = 0;
2052 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2053 mdev->rs_mark_left[i] = 0;
2054 mdev->rs_mark_time[i] = 0;
2056 D_ASSERT(mdev->tconn->net_conf == NULL);
2058 drbd_set_my_capacity(mdev, 0);
2060 /* maybe never allocated. */
2061 drbd_bm_resize(mdev, 0, 1);
2062 drbd_bm_cleanup(mdev);
2065 drbd_free_bc(mdev->ldev);
2068 clear_bit(AL_SUSPENDED, &mdev->flags);
2070 D_ASSERT(list_empty(&mdev->active_ee));
2071 D_ASSERT(list_empty(&mdev->sync_ee));
2072 D_ASSERT(list_empty(&mdev->done_ee));
2073 D_ASSERT(list_empty(&mdev->read_ee));
2074 D_ASSERT(list_empty(&mdev->net_ee));
2075 D_ASSERT(list_empty(&mdev->resync_reads));
2076 D_ASSERT(list_empty(&mdev->tconn->data.work.q));
2077 D_ASSERT(list_empty(&mdev->tconn->meta.work.q));
2078 D_ASSERT(list_empty(&mdev->resync_work.list));
2079 D_ASSERT(list_empty(&mdev->unplug_work.list));
2080 D_ASSERT(list_empty(&mdev->go_diskless.list));
2082 drbd_set_defaults(mdev);
2086 static void drbd_destroy_mempools(void)
2090 while (drbd_pp_pool) {
2091 page = drbd_pp_pool;
2092 drbd_pp_pool = (struct page *)page_private(page);
2097 /* D_ASSERT(atomic_read(&drbd_pp_vacant)==0); */
2099 if (drbd_md_io_bio_set)
2100 bioset_free(drbd_md_io_bio_set);
2101 if (drbd_md_io_page_pool)
2102 mempool_destroy(drbd_md_io_page_pool);
2103 if (drbd_ee_mempool)
2104 mempool_destroy(drbd_ee_mempool);
2105 if (drbd_request_mempool)
2106 mempool_destroy(drbd_request_mempool);
2108 kmem_cache_destroy(drbd_ee_cache);
2109 if (drbd_request_cache)
2110 kmem_cache_destroy(drbd_request_cache);
2111 if (drbd_bm_ext_cache)
2112 kmem_cache_destroy(drbd_bm_ext_cache);
2113 if (drbd_al_ext_cache)
2114 kmem_cache_destroy(drbd_al_ext_cache);
2116 drbd_md_io_bio_set = NULL;
2117 drbd_md_io_page_pool = NULL;
2118 drbd_ee_mempool = NULL;
2119 drbd_request_mempool = NULL;
2120 drbd_ee_cache = NULL;
2121 drbd_request_cache = NULL;
2122 drbd_bm_ext_cache = NULL;
2123 drbd_al_ext_cache = NULL;
2128 static int drbd_create_mempools(void)
2131 const int number = (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count;
2134 /* prepare our caches and mempools */
2135 drbd_request_mempool = NULL;
2136 drbd_ee_cache = NULL;
2137 drbd_request_cache = NULL;
2138 drbd_bm_ext_cache = NULL;
2139 drbd_al_ext_cache = NULL;
2140 drbd_pp_pool = NULL;
2141 drbd_md_io_page_pool = NULL;
2142 drbd_md_io_bio_set = NULL;
2145 drbd_request_cache = kmem_cache_create(
2146 "drbd_req", sizeof(struct drbd_request), 0, 0, NULL);
2147 if (drbd_request_cache == NULL)
2150 drbd_ee_cache = kmem_cache_create(
2151 "drbd_ee", sizeof(struct drbd_peer_request), 0, 0, NULL);
2152 if (drbd_ee_cache == NULL)
2155 drbd_bm_ext_cache = kmem_cache_create(
2156 "drbd_bm", sizeof(struct bm_extent), 0, 0, NULL);
2157 if (drbd_bm_ext_cache == NULL)
2160 drbd_al_ext_cache = kmem_cache_create(
2161 "drbd_al", sizeof(struct lc_element), 0, 0, NULL);
2162 if (drbd_al_ext_cache == NULL)
2166 drbd_md_io_bio_set = bioset_create(DRBD_MIN_POOL_PAGES, 0);
2167 if (drbd_md_io_bio_set == NULL)
2170 drbd_md_io_page_pool = mempool_create_page_pool(DRBD_MIN_POOL_PAGES, 0);
2171 if (drbd_md_io_page_pool == NULL)
2174 drbd_request_mempool = mempool_create(number,
2175 mempool_alloc_slab, mempool_free_slab, drbd_request_cache);
2176 if (drbd_request_mempool == NULL)
2179 drbd_ee_mempool = mempool_create(number,
2180 mempool_alloc_slab, mempool_free_slab, drbd_ee_cache);
2181 if (drbd_ee_mempool == NULL)
2184 /* drbd's page pool */
2185 spin_lock_init(&drbd_pp_lock);
2187 for (i = 0; i < number; i++) {
2188 page = alloc_page(GFP_HIGHUSER);
2191 set_page_private(page, (unsigned long)drbd_pp_pool);
2192 drbd_pp_pool = page;
2194 drbd_pp_vacant = number;
2199 drbd_destroy_mempools(); /* in case we allocated some */
2203 static int drbd_notify_sys(struct notifier_block *this, unsigned long code,
2206 /* just so we have it. you never know what interesting things we
2207 * might want to do here some day...
2213 static struct notifier_block drbd_notifier = {
2214 .notifier_call = drbd_notify_sys,
2217 static void drbd_release_all_peer_reqs(struct drbd_conf *mdev)
2221 rr = drbd_free_peer_reqs(mdev, &mdev->active_ee);
2223 dev_err(DEV, "%d EEs in active list found!\n", rr);
2225 rr = drbd_free_peer_reqs(mdev, &mdev->sync_ee);
2227 dev_err(DEV, "%d EEs in sync list found!\n", rr);
2229 rr = drbd_free_peer_reqs(mdev, &mdev->read_ee);
2231 dev_err(DEV, "%d EEs in read list found!\n", rr);
2233 rr = drbd_free_peer_reqs(mdev, &mdev->done_ee);
2235 dev_err(DEV, "%d EEs in done list found!\n", rr);
2237 rr = drbd_free_peer_reqs(mdev, &mdev->net_ee);
2239 dev_err(DEV, "%d EEs in net list found!\n", rr);
2242 /* caution. no locking. */
2243 void drbd_delete_device(struct drbd_conf *mdev)
2245 struct drbd_tconn *tconn = mdev->tconn;
2247 idr_remove(&mdev->tconn->volumes, mdev->vnr);
2248 idr_remove(&minors, mdev_to_minor(mdev));
2251 /* paranoia asserts */
2252 D_ASSERT(mdev->open_cnt == 0);
2253 D_ASSERT(list_empty(&mdev->tconn->data.work.q));
2254 /* end paranoia asserts */
2256 del_gendisk(mdev->vdisk);
2258 /* cleanup stuff that may have been allocated during
2259 * device (re-)configuration or state changes */
2261 if (mdev->this_bdev)
2262 bdput(mdev->this_bdev);
2264 drbd_free_bc(mdev->ldev);
2267 drbd_release_all_peer_reqs(mdev);
2269 lc_destroy(mdev->act_log);
2270 lc_destroy(mdev->resync);
2272 kfree(mdev->p_uuid);
2273 /* mdev->p_uuid = NULL; */
2275 kfree(mdev->current_epoch);
2276 if (mdev->bitmap) /* should no longer be there. */
2277 drbd_bm_cleanup(mdev);
2278 __free_page(mdev->md_io_page);
2279 put_disk(mdev->vdisk);
2280 blk_cleanup_queue(mdev->rq_queue);
2283 kref_put(&tconn->kref, &conn_destroy);
2286 static void drbd_cleanup(void)
2289 struct drbd_conf *mdev;
2291 unregister_reboot_notifier(&drbd_notifier);
2293 /* first remove proc,
2294 * drbdsetup uses it's presence to detect
2295 * whether DRBD is loaded.
2296 * If we would get stuck in proc removal,
2297 * but have netlink already deregistered,
2298 * some drbdsetup commands may wait forever
2302 remove_proc_entry("drbd", NULL);
2304 drbd_genl_unregister();
2306 down_write(&drbd_cfg_rwsem);
2307 idr_for_each_entry(&minors, mdev, i)
2308 drbd_delete_device(mdev);
2309 up_write(&drbd_cfg_rwsem);
2311 drbd_destroy_mempools();
2312 unregister_blkdev(DRBD_MAJOR, "drbd");
2314 idr_destroy(&minors);
2316 printk(KERN_INFO "drbd: module cleanup done.\n");
2320 * drbd_congested() - Callback for pdflush
2321 * @congested_data: User data
2322 * @bdi_bits: Bits pdflush is currently interested in
2324 * Returns 1<<BDI_async_congested and/or 1<<BDI_sync_congested if we are congested.
2326 static int drbd_congested(void *congested_data, int bdi_bits)
2328 struct drbd_conf *mdev = congested_data;
2329 struct request_queue *q;
2333 if (!may_inc_ap_bio(mdev)) {
2334 /* DRBD has frozen IO */
2340 if (get_ldev(mdev)) {
2341 q = bdev_get_queue(mdev->ldev->backing_bdev);
2342 r = bdi_congested(&q->backing_dev_info, bdi_bits);
2348 if (bdi_bits & (1 << BDI_async_congested) && test_bit(NET_CONGESTED, &mdev->tconn->flags)) {
2349 r |= (1 << BDI_async_congested);
2350 reason = reason == 'b' ? 'a' : 'n';
2354 mdev->congestion_reason = reason;
2358 static void drbd_init_workqueue(struct drbd_work_queue* wq)
2360 sema_init(&wq->s, 0);
2361 spin_lock_init(&wq->q_lock);
2362 INIT_LIST_HEAD(&wq->q);
2365 struct drbd_tconn *conn_by_name(const char *name)
2367 struct drbd_tconn *tconn;
2369 if (!name || !name[0])
2372 down_read(&drbd_cfg_rwsem);
2373 list_for_each_entry(tconn, &drbd_tconns, all_tconn) {
2374 if (!strcmp(tconn->name, name))
2379 up_read(&drbd_cfg_rwsem);
2383 static int drbd_alloc_socket(struct drbd_socket *socket)
2385 socket->rbuf = (void *) __get_free_page(GFP_KERNEL);
2388 socket->sbuf = (void *) __get_free_page(GFP_KERNEL);
2394 static void drbd_free_socket(struct drbd_socket *socket)
2396 free_page((unsigned long) socket->sbuf);
2397 free_page((unsigned long) socket->rbuf);
2400 void conn_free_crypto(struct drbd_tconn *tconn)
2402 drbd_free_sock(tconn);
2404 crypto_free_hash(tconn->csums_tfm);
2405 crypto_free_hash(tconn->verify_tfm);
2406 crypto_free_hash(tconn->cram_hmac_tfm);
2407 crypto_free_hash(tconn->integrity_w_tfm);
2408 crypto_free_hash(tconn->integrity_r_tfm);
2409 kfree(tconn->int_dig_in);
2410 kfree(tconn->int_dig_vv);
2412 tconn->csums_tfm = NULL;
2413 tconn->verify_tfm = NULL;
2414 tconn->cram_hmac_tfm = NULL;
2415 tconn->integrity_w_tfm = NULL;
2416 tconn->integrity_r_tfm = NULL;
2417 tconn->int_dig_in = NULL;
2418 tconn->int_dig_vv = NULL;
2421 struct drbd_tconn *conn_create(const char *name)
2423 struct drbd_tconn *tconn;
2425 tconn = kzalloc(sizeof(struct drbd_tconn), GFP_KERNEL);
2429 tconn->name = kstrdup(name, GFP_KERNEL);
2433 if (drbd_alloc_socket(&tconn->data))
2435 if (drbd_alloc_socket(&tconn->meta))
2438 if (!zalloc_cpumask_var(&tconn->cpu_mask, GFP_KERNEL))
2441 if (!tl_init(tconn))
2444 tconn->cstate = C_STANDALONE;
2445 mutex_init(&tconn->cstate_mutex);
2446 spin_lock_init(&tconn->req_lock);
2447 mutex_init(&tconn->net_conf_update);
2448 init_waitqueue_head(&tconn->ping_wait);
2449 idr_init(&tconn->volumes);
2451 drbd_init_workqueue(&tconn->data.work);
2452 mutex_init(&tconn->data.mutex);
2454 drbd_init_workqueue(&tconn->meta.work);
2455 mutex_init(&tconn->meta.mutex);
2457 drbd_thread_init(tconn, &tconn->receiver, drbdd_init, "receiver");
2458 drbd_thread_init(tconn, &tconn->worker, drbd_worker, "worker");
2459 drbd_thread_init(tconn, &tconn->asender, drbd_asender, "asender");
2461 tconn->res_opts = (struct res_opts) {
2462 {}, 0, /* cpu_mask */
2463 DRBD_ON_NO_DATA_DEF, /* on_no_data */
2466 down_write(&drbd_cfg_rwsem);
2467 kref_init(&tconn->kref);
2468 list_add_tail(&tconn->all_tconn, &drbd_tconns);
2469 up_write(&drbd_cfg_rwsem);
2475 free_cpumask_var(tconn->cpu_mask);
2476 drbd_free_socket(&tconn->meta);
2477 drbd_free_socket(&tconn->data);
2484 void conn_destroy(struct kref *kref)
2486 struct drbd_tconn *tconn = container_of(kref, struct drbd_tconn, kref);
2488 idr_destroy(&tconn->volumes);
2490 free_cpumask_var(tconn->cpu_mask);
2491 drbd_free_socket(&tconn->meta);
2492 drbd_free_socket(&tconn->data);
2494 kfree(tconn->int_dig_in);
2495 kfree(tconn->int_dig_vv);
2499 enum drbd_ret_code conn_new_minor(struct drbd_tconn *tconn, unsigned int minor, int vnr)
2501 struct drbd_conf *mdev;
2502 struct gendisk *disk;
2503 struct request_queue *q;
2505 int minor_got = minor;
2506 enum drbd_ret_code err = ERR_NOMEM;
2508 mdev = minor_to_mdev(minor);
2510 return ERR_MINOR_EXISTS;
2512 /* GFP_KERNEL, we are outside of all write-out paths */
2513 mdev = kzalloc(sizeof(struct drbd_conf), GFP_KERNEL);
2517 kref_get(&tconn->kref);
2518 mdev->tconn = tconn;
2520 mdev->minor = minor;
2523 drbd_init_set_defaults(mdev);
2525 q = blk_alloc_queue(GFP_KERNEL);
2529 q->queuedata = mdev;
2531 disk = alloc_disk(1);
2536 set_disk_ro(disk, true);
2539 disk->major = DRBD_MAJOR;
2540 disk->first_minor = minor;
2541 disk->fops = &drbd_ops;
2542 sprintf(disk->disk_name, "drbd%d", minor);
2543 disk->private_data = mdev;
2545 mdev->this_bdev = bdget(MKDEV(DRBD_MAJOR, minor));
2546 /* we have no partitions. we contain only ourselves. */
2547 mdev->this_bdev->bd_contains = mdev->this_bdev;
2549 q->backing_dev_info.congested_fn = drbd_congested;
2550 q->backing_dev_info.congested_data = mdev;
2552 blk_queue_make_request(q, drbd_make_request);
2553 /* Setting the max_hw_sectors to an odd value of 8kibyte here
2554 This triggers a max_bio_size message upon first attach or connect */
2555 blk_queue_max_hw_sectors(q, DRBD_MAX_BIO_SIZE_SAFE >> 8);
2556 blk_queue_bounce_limit(q, BLK_BOUNCE_ANY);
2557 blk_queue_merge_bvec(q, drbd_merge_bvec);
2558 q->queue_lock = &mdev->tconn->req_lock; /* needed since we use */
2560 mdev->md_io_page = alloc_page(GFP_KERNEL);
2561 if (!mdev->md_io_page)
2562 goto out_no_io_page;
2564 if (drbd_bm_init(mdev))
2566 mdev->read_requests = RB_ROOT;
2567 mdev->write_requests = RB_ROOT;
2569 mdev->current_epoch = kzalloc(sizeof(struct drbd_epoch), GFP_KERNEL);
2570 if (!mdev->current_epoch)
2573 INIT_LIST_HEAD(&mdev->current_epoch->list);
2576 if (!idr_pre_get(&minors, GFP_KERNEL))
2577 goto out_no_minor_idr;
2578 if (idr_get_new_above(&minors, mdev, minor, &minor_got))
2579 goto out_no_minor_idr;
2580 if (minor_got != minor) {
2581 err = ERR_MINOR_EXISTS;
2582 drbd_msg_put_info("requested minor exists already");
2583 goto out_idr_remove_minor;
2586 if (!idr_pre_get(&tconn->volumes, GFP_KERNEL))
2587 goto out_idr_remove_minor;
2588 if (idr_get_new_above(&tconn->volumes, mdev, vnr, &vnr_got))
2589 goto out_idr_remove_minor;
2590 if (vnr_got != vnr) {
2591 err = ERR_INVALID_REQUEST;
2592 drbd_msg_put_info("requested volume exists already");
2593 goto out_idr_remove_vol;
2597 /* inherit the connection state */
2598 mdev->state.conn = tconn->cstate;
2599 if (mdev->state.conn == C_WF_REPORT_PARAMS)
2600 drbd_connected(vnr, mdev, tconn);
2605 idr_remove(&tconn->volumes, vnr_got);
2606 out_idr_remove_minor:
2607 idr_remove(&minors, minor_got);
2610 kfree(mdev->current_epoch);
2612 drbd_bm_cleanup(mdev);
2614 __free_page(mdev->md_io_page);
2618 blk_cleanup_queue(q);
2621 kref_put(&tconn->kref, &conn_destroy);
2625 int __init drbd_init(void)
2629 if (minor_count < DRBD_MINOR_COUNT_MIN || minor_count > DRBD_MINOR_COUNT_MAX) {
2631 "drbd: invalid minor_count (%d)\n", minor_count);
2639 err = register_blkdev(DRBD_MAJOR, "drbd");
2642 "drbd: unable to register block device major %d\n",
2647 err = drbd_genl_register();
2649 printk(KERN_ERR "drbd: unable to register generic netlink family\n");
2654 register_reboot_notifier(&drbd_notifier);
2657 * allocate all necessary structs
2661 init_waitqueue_head(&drbd_pp_wait);
2663 drbd_proc = NULL; /* play safe for drbd_cleanup */
2666 err = drbd_create_mempools();
2670 drbd_proc = proc_create_data("drbd", S_IFREG | S_IRUGO , NULL, &drbd_proc_fops, NULL);
2672 printk(KERN_ERR "drbd: unable to register proc file\n");
2676 rwlock_init(&global_state_lock);
2677 INIT_LIST_HEAD(&drbd_tconns);
2679 printk(KERN_INFO "drbd: initialized. "
2680 "Version: " REL_VERSION " (api:%d/proto:%d-%d)\n",
2681 API_VERSION, PRO_VERSION_MIN, PRO_VERSION_MAX);
2682 printk(KERN_INFO "drbd: %s\n", drbd_buildtag());
2683 printk(KERN_INFO "drbd: registered as block device major %d\n",
2686 return 0; /* Success! */
2691 /* currently always the case */
2692 printk(KERN_ERR "drbd: ran out of memory\n");
2694 printk(KERN_ERR "drbd: initialization failure\n");
2698 void drbd_free_bc(struct drbd_backing_dev *ldev)
2703 blkdev_put(ldev->backing_bdev, FMODE_READ | FMODE_WRITE | FMODE_EXCL);
2704 blkdev_put(ldev->md_bdev, FMODE_READ | FMODE_WRITE | FMODE_EXCL);
2709 void drbd_free_sock(struct drbd_tconn *tconn)
2711 if (tconn->data.socket) {
2712 mutex_lock(&tconn->data.mutex);
2713 kernel_sock_shutdown(tconn->data.socket, SHUT_RDWR);
2714 sock_release(tconn->data.socket);
2715 tconn->data.socket = NULL;
2716 mutex_unlock(&tconn->data.mutex);
2718 if (tconn->meta.socket) {
2719 mutex_lock(&tconn->meta.mutex);
2720 kernel_sock_shutdown(tconn->meta.socket, SHUT_RDWR);
2721 sock_release(tconn->meta.socket);
2722 tconn->meta.socket = NULL;
2723 mutex_unlock(&tconn->meta.mutex);
2727 /* meta data management */
2729 struct meta_data_on_disk {
2730 u64 la_size; /* last agreed size. */
2731 u64 uuid[UI_SIZE]; /* UUIDs. */
2734 u32 flags; /* MDF */
2737 u32 al_offset; /* offset to this block */
2738 u32 al_nr_extents; /* important for restoring the AL */
2739 /* `-- act_log->nr_elements <-- ldev->dc.al_extents */
2740 u32 bm_offset; /* offset to the bitmap, from here */
2741 u32 bm_bytes_per_bit; /* BM_BLOCK_SIZE */
2742 u32 la_peer_max_bio_size; /* last peer max_bio_size */
2743 u32 reserved_u32[3];
2748 * drbd_md_sync() - Writes the meta data super block if the MD_DIRTY flag bit is set
2749 * @mdev: DRBD device.
2751 void drbd_md_sync(struct drbd_conf *mdev)
2753 struct meta_data_on_disk *buffer;
2757 del_timer(&mdev->md_sync_timer);
2758 /* timer may be rearmed by drbd_md_mark_dirty() now. */
2759 if (!test_and_clear_bit(MD_DIRTY, &mdev->flags))
2762 /* We use here D_FAILED and not D_ATTACHING because we try to write
2763 * metadata even if we detach due to a disk failure! */
2764 if (!get_ldev_if_state(mdev, D_FAILED))
2767 mutex_lock(&mdev->md_io_mutex);
2768 buffer = (struct meta_data_on_disk *)page_address(mdev->md_io_page);
2769 memset(buffer, 0, 512);
2771 buffer->la_size = cpu_to_be64(drbd_get_capacity(mdev->this_bdev));
2772 for (i = UI_CURRENT; i < UI_SIZE; i++)
2773 buffer->uuid[i] = cpu_to_be64(mdev->ldev->md.uuid[i]);
2774 buffer->flags = cpu_to_be32(mdev->ldev->md.flags);
2775 buffer->magic = cpu_to_be32(DRBD_MD_MAGIC);
2777 buffer->md_size_sect = cpu_to_be32(mdev->ldev->md.md_size_sect);
2778 buffer->al_offset = cpu_to_be32(mdev->ldev->md.al_offset);
2779 buffer->al_nr_extents = cpu_to_be32(mdev->act_log->nr_elements);
2780 buffer->bm_bytes_per_bit = cpu_to_be32(BM_BLOCK_SIZE);
2781 buffer->device_uuid = cpu_to_be64(mdev->ldev->md.device_uuid);
2783 buffer->bm_offset = cpu_to_be32(mdev->ldev->md.bm_offset);
2784 buffer->la_peer_max_bio_size = cpu_to_be32(mdev->peer_max_bio_size);
2786 D_ASSERT(drbd_md_ss__(mdev, mdev->ldev) == mdev->ldev->md.md_offset);
2787 sector = mdev->ldev->md.md_offset;
2789 if (drbd_md_sync_page_io(mdev, mdev->ldev, sector, WRITE)) {
2790 /* this was a try anyways ... */
2791 dev_err(DEV, "meta data update failed!\n");
2792 drbd_chk_io_error(mdev, 1, true);
2795 /* Update mdev->ldev->md.la_size_sect,
2796 * since we updated it on metadata. */
2797 mdev->ldev->md.la_size_sect = drbd_get_capacity(mdev->this_bdev);
2799 mutex_unlock(&mdev->md_io_mutex);
2804 * drbd_md_read() - Reads in the meta data super block
2805 * @mdev: DRBD device.
2806 * @bdev: Device from which the meta data should be read in.
2808 * Return 0 (NO_ERROR) on success, and an enum drbd_ret_code in case
2809 * something goes wrong. Currently only: ERR_IO_MD_DISK, ERR_MD_INVALID.
2811 int drbd_md_read(struct drbd_conf *mdev, struct drbd_backing_dev *bdev)
2813 struct meta_data_on_disk *buffer;
2814 int i, rv = NO_ERROR;
2816 if (!get_ldev_if_state(mdev, D_ATTACHING))
2817 return ERR_IO_MD_DISK;
2819 mutex_lock(&mdev->md_io_mutex);
2820 buffer = (struct meta_data_on_disk *)page_address(mdev->md_io_page);
2822 if (drbd_md_sync_page_io(mdev, bdev, bdev->md.md_offset, READ)) {
2823 /* NOTE: can't do normal error processing here as this is
2824 called BEFORE disk is attached */
2825 dev_err(DEV, "Error while reading metadata.\n");
2826 rv = ERR_IO_MD_DISK;
2830 if (buffer->magic != cpu_to_be32(DRBD_MD_MAGIC)) {
2831 dev_err(DEV, "Error while reading metadata, magic not found.\n");
2832 rv = ERR_MD_INVALID;
2835 if (be32_to_cpu(buffer->al_offset) != bdev->md.al_offset) {
2836 dev_err(DEV, "unexpected al_offset: %d (expected %d)\n",
2837 be32_to_cpu(buffer->al_offset), bdev->md.al_offset);
2838 rv = ERR_MD_INVALID;
2841 if (be32_to_cpu(buffer->bm_offset) != bdev->md.bm_offset) {
2842 dev_err(DEV, "unexpected bm_offset: %d (expected %d)\n",
2843 be32_to_cpu(buffer->bm_offset), bdev->md.bm_offset);
2844 rv = ERR_MD_INVALID;
2847 if (be32_to_cpu(buffer->md_size_sect) != bdev->md.md_size_sect) {
2848 dev_err(DEV, "unexpected md_size: %u (expected %u)\n",
2849 be32_to_cpu(buffer->md_size_sect), bdev->md.md_size_sect);
2850 rv = ERR_MD_INVALID;
2854 if (be32_to_cpu(buffer->bm_bytes_per_bit) != BM_BLOCK_SIZE) {
2855 dev_err(DEV, "unexpected bm_bytes_per_bit: %u (expected %u)\n",
2856 be32_to_cpu(buffer->bm_bytes_per_bit), BM_BLOCK_SIZE);
2857 rv = ERR_MD_INVALID;
2861 bdev->md.la_size_sect = be64_to_cpu(buffer->la_size);
2862 for (i = UI_CURRENT; i < UI_SIZE; i++)
2863 bdev->md.uuid[i] = be64_to_cpu(buffer->uuid[i]);
2864 bdev->md.flags = be32_to_cpu(buffer->flags);
2865 bdev->dc.al_extents = be32_to_cpu(buffer->al_nr_extents);
2866 bdev->md.device_uuid = be64_to_cpu(buffer->device_uuid);
2868 spin_lock_irq(&mdev->tconn->req_lock);
2869 if (mdev->state.conn < C_CONNECTED) {
2871 peer = be32_to_cpu(buffer->la_peer_max_bio_size);
2872 peer = max_t(int, peer, DRBD_MAX_BIO_SIZE_SAFE);
2873 mdev->peer_max_bio_size = peer;
2875 spin_unlock_irq(&mdev->tconn->req_lock);
2877 if (bdev->dc.al_extents < 7)
2878 bdev->dc.al_extents = 127;
2881 mutex_unlock(&mdev->md_io_mutex);
2888 * drbd_md_mark_dirty() - Mark meta data super block as dirty
2889 * @mdev: DRBD device.
2891 * Call this function if you change anything that should be written to
2892 * the meta-data super block. This function sets MD_DIRTY, and starts a
2893 * timer that ensures that within five seconds you have to call drbd_md_sync().
2896 void drbd_md_mark_dirty_(struct drbd_conf *mdev, unsigned int line, const char *func)
2898 if (!test_and_set_bit(MD_DIRTY, &mdev->flags)) {
2899 mod_timer(&mdev->md_sync_timer, jiffies + HZ);
2900 mdev->last_md_mark_dirty.line = line;
2901 mdev->last_md_mark_dirty.func = func;
2905 void drbd_md_mark_dirty(struct drbd_conf *mdev)
2907 if (!test_and_set_bit(MD_DIRTY, &mdev->flags))
2908 mod_timer(&mdev->md_sync_timer, jiffies + 5*HZ);
2912 static void drbd_uuid_move_history(struct drbd_conf *mdev) __must_hold(local)
2916 for (i = UI_HISTORY_START; i < UI_HISTORY_END; i++)
2917 mdev->ldev->md.uuid[i+1] = mdev->ldev->md.uuid[i];
2920 void _drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
2922 if (idx == UI_CURRENT) {
2923 if (mdev->state.role == R_PRIMARY)
2928 drbd_set_ed_uuid(mdev, val);
2931 mdev->ldev->md.uuid[idx] = val;
2932 drbd_md_mark_dirty(mdev);
2936 void drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
2938 if (mdev->ldev->md.uuid[idx]) {
2939 drbd_uuid_move_history(mdev);
2940 mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[idx];
2942 _drbd_uuid_set(mdev, idx, val);
2946 * drbd_uuid_new_current() - Creates a new current UUID
2947 * @mdev: DRBD device.
2949 * Creates a new current UUID, and rotates the old current UUID into
2950 * the bitmap slot. Causes an incremental resync upon next connect.
2952 void drbd_uuid_new_current(struct drbd_conf *mdev) __must_hold(local)
2955 unsigned long long bm_uuid = mdev->ldev->md.uuid[UI_BITMAP];
2958 dev_warn(DEV, "bm UUID was already set: %llX\n", bm_uuid);
2960 mdev->ldev->md.uuid[UI_BITMAP] = mdev->ldev->md.uuid[UI_CURRENT];
2962 get_random_bytes(&val, sizeof(u64));
2963 _drbd_uuid_set(mdev, UI_CURRENT, val);
2964 drbd_print_uuids(mdev, "new current UUID");
2965 /* get it to stable storage _now_ */
2969 void drbd_uuid_set_bm(struct drbd_conf *mdev, u64 val) __must_hold(local)
2971 if (mdev->ldev->md.uuid[UI_BITMAP] == 0 && val == 0)
2975 drbd_uuid_move_history(mdev);
2976 mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[UI_BITMAP];
2977 mdev->ldev->md.uuid[UI_BITMAP] = 0;
2979 unsigned long long bm_uuid = mdev->ldev->md.uuid[UI_BITMAP];
2981 dev_warn(DEV, "bm UUID was already set: %llX\n", bm_uuid);
2983 mdev->ldev->md.uuid[UI_BITMAP] = val & ~((u64)1);
2985 drbd_md_mark_dirty(mdev);
2989 * drbd_bmio_set_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
2990 * @mdev: DRBD device.
2992 * Sets all bits in the bitmap and writes the whole bitmap to stable storage.
2994 int drbd_bmio_set_n_write(struct drbd_conf *mdev)
2998 if (get_ldev_if_state(mdev, D_ATTACHING)) {
2999 drbd_md_set_flag(mdev, MDF_FULL_SYNC);
3001 drbd_bm_set_all(mdev);
3003 rv = drbd_bm_write(mdev);
3006 drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
3017 * drbd_bmio_clear_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
3018 * @mdev: DRBD device.
3020 * Clears all bits in the bitmap and writes the whole bitmap to stable storage.
3022 int drbd_bmio_clear_n_write(struct drbd_conf *mdev)
3026 drbd_resume_al(mdev);
3027 if (get_ldev_if_state(mdev, D_ATTACHING)) {
3028 drbd_bm_clear_all(mdev);
3029 rv = drbd_bm_write(mdev);
3036 static int w_bitmap_io(struct drbd_work *w, int unused)
3038 struct bm_io_work *work = container_of(w, struct bm_io_work, w);
3039 struct drbd_conf *mdev = w->mdev;
3042 D_ASSERT(atomic_read(&mdev->ap_bio_cnt) == 0);
3044 if (get_ldev(mdev)) {
3045 drbd_bm_lock(mdev, work->why, work->flags);
3046 rv = work->io_fn(mdev);
3047 drbd_bm_unlock(mdev);
3051 clear_bit_unlock(BITMAP_IO, &mdev->flags);
3052 wake_up(&mdev->misc_wait);
3055 work->done(mdev, rv);
3057 clear_bit(BITMAP_IO_QUEUED, &mdev->flags);
3064 void drbd_ldev_destroy(struct drbd_conf *mdev)
3066 lc_destroy(mdev->resync);
3067 mdev->resync = NULL;
3068 lc_destroy(mdev->act_log);
3069 mdev->act_log = NULL;
3071 drbd_free_bc(mdev->ldev);
3072 mdev->ldev = NULL;);
3074 clear_bit(GO_DISKLESS, &mdev->flags);
3077 static int w_go_diskless(struct drbd_work *w, int unused)
3079 struct drbd_conf *mdev = w->mdev;
3081 D_ASSERT(mdev->state.disk == D_FAILED);
3082 /* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
3083 * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
3084 * the protected members anymore, though, so once put_ldev reaches zero
3085 * again, it will be safe to free them. */
3086 drbd_force_state(mdev, NS(disk, D_DISKLESS));
3090 void drbd_go_diskless(struct drbd_conf *mdev)
3092 D_ASSERT(mdev->state.disk == D_FAILED);
3093 if (!test_and_set_bit(GO_DISKLESS, &mdev->flags))
3094 drbd_queue_work(&mdev->tconn->data.work, &mdev->go_diskless);
3098 * drbd_queue_bitmap_io() - Queues an IO operation on the whole bitmap
3099 * @mdev: DRBD device.
3100 * @io_fn: IO callback to be called when bitmap IO is possible
3101 * @done: callback to be called after the bitmap IO was performed
3102 * @why: Descriptive text of the reason for doing the IO
3104 * While IO on the bitmap happens we freeze application IO thus we ensure
3105 * that drbd_set_out_of_sync() can not be called. This function MAY ONLY be
3106 * called from worker context. It MUST NOT be used while a previous such
3107 * work is still pending!
3109 void drbd_queue_bitmap_io(struct drbd_conf *mdev,
3110 int (*io_fn)(struct drbd_conf *),
3111 void (*done)(struct drbd_conf *, int),
3112 char *why, enum bm_flag flags)
3114 D_ASSERT(current == mdev->tconn->worker.task);
3116 D_ASSERT(!test_bit(BITMAP_IO_QUEUED, &mdev->flags));
3117 D_ASSERT(!test_bit(BITMAP_IO, &mdev->flags));
3118 D_ASSERT(list_empty(&mdev->bm_io_work.w.list));
3119 if (mdev->bm_io_work.why)
3120 dev_err(DEV, "FIXME going to queue '%s' but '%s' still pending?\n",
3121 why, mdev->bm_io_work.why);
3123 mdev->bm_io_work.io_fn = io_fn;
3124 mdev->bm_io_work.done = done;
3125 mdev->bm_io_work.why = why;
3126 mdev->bm_io_work.flags = flags;
3128 spin_lock_irq(&mdev->tconn->req_lock);
3129 set_bit(BITMAP_IO, &mdev->flags);
3130 if (atomic_read(&mdev->ap_bio_cnt) == 0) {
3131 if (!test_and_set_bit(BITMAP_IO_QUEUED, &mdev->flags))
3132 drbd_queue_work(&mdev->tconn->data.work, &mdev->bm_io_work.w);
3134 spin_unlock_irq(&mdev->tconn->req_lock);
3138 * drbd_bitmap_io() - Does an IO operation on the whole bitmap
3139 * @mdev: DRBD device.
3140 * @io_fn: IO callback to be called when bitmap IO is possible
3141 * @why: Descriptive text of the reason for doing the IO
3143 * freezes application IO while that the actual IO operations runs. This
3144 * functions MAY NOT be called from worker context.
3146 int drbd_bitmap_io(struct drbd_conf *mdev, int (*io_fn)(struct drbd_conf *),
3147 char *why, enum bm_flag flags)
3151 D_ASSERT(current != mdev->tconn->worker.task);
3153 if ((flags & BM_LOCKED_SET_ALLOWED) == 0)
3154 drbd_suspend_io(mdev);
3156 drbd_bm_lock(mdev, why, flags);
3158 drbd_bm_unlock(mdev);
3160 if ((flags & BM_LOCKED_SET_ALLOWED) == 0)
3161 drbd_resume_io(mdev);
3166 void drbd_md_set_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
3168 if ((mdev->ldev->md.flags & flag) != flag) {
3169 drbd_md_mark_dirty(mdev);
3170 mdev->ldev->md.flags |= flag;
3174 void drbd_md_clear_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
3176 if ((mdev->ldev->md.flags & flag) != 0) {
3177 drbd_md_mark_dirty(mdev);
3178 mdev->ldev->md.flags &= ~flag;
3181 int drbd_md_test_flag(struct drbd_backing_dev *bdev, int flag)
3183 return (bdev->md.flags & flag) != 0;
3186 static void md_sync_timer_fn(unsigned long data)
3188 struct drbd_conf *mdev = (struct drbd_conf *) data;
3190 drbd_queue_work_front(&mdev->tconn->data.work, &mdev->md_sync_work);
3193 static int w_md_sync(struct drbd_work *w, int unused)
3195 struct drbd_conf *mdev = w->mdev;
3197 dev_warn(DEV, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
3199 dev_warn(DEV, "last md_mark_dirty: %s:%u\n",
3200 mdev->last_md_mark_dirty.func, mdev->last_md_mark_dirty.line);
3206 const char *cmdname(enum drbd_packet cmd)
3208 /* THINK may need to become several global tables
3209 * when we want to support more than
3210 * one PRO_VERSION */
3211 static const char *cmdnames[] = {
3213 [P_DATA_REPLY] = "DataReply",
3214 [P_RS_DATA_REPLY] = "RSDataReply",
3215 [P_BARRIER] = "Barrier",
3216 [P_BITMAP] = "ReportBitMap",
3217 [P_BECOME_SYNC_TARGET] = "BecomeSyncTarget",
3218 [P_BECOME_SYNC_SOURCE] = "BecomeSyncSource",
3219 [P_UNPLUG_REMOTE] = "UnplugRemote",
3220 [P_DATA_REQUEST] = "DataRequest",
3221 [P_RS_DATA_REQUEST] = "RSDataRequest",
3222 [P_SYNC_PARAM] = "SyncParam",
3223 [P_SYNC_PARAM89] = "SyncParam89",
3224 [P_PROTOCOL] = "ReportProtocol",
3225 [P_UUIDS] = "ReportUUIDs",
3226 [P_SIZES] = "ReportSizes",
3227 [P_STATE] = "ReportState",
3228 [P_SYNC_UUID] = "ReportSyncUUID",
3229 [P_AUTH_CHALLENGE] = "AuthChallenge",
3230 [P_AUTH_RESPONSE] = "AuthResponse",
3232 [P_PING_ACK] = "PingAck",
3233 [P_RECV_ACK] = "RecvAck",
3234 [P_WRITE_ACK] = "WriteAck",
3235 [P_RS_WRITE_ACK] = "RSWriteAck",
3236 [P_DISCARD_WRITE] = "DiscardWrite",
3237 [P_NEG_ACK] = "NegAck",
3238 [P_NEG_DREPLY] = "NegDReply",
3239 [P_NEG_RS_DREPLY] = "NegRSDReply",
3240 [P_BARRIER_ACK] = "BarrierAck",
3241 [P_STATE_CHG_REQ] = "StateChgRequest",
3242 [P_STATE_CHG_REPLY] = "StateChgReply",
3243 [P_OV_REQUEST] = "OVRequest",
3244 [P_OV_REPLY] = "OVReply",
3245 [P_OV_RESULT] = "OVResult",
3246 [P_CSUM_RS_REQUEST] = "CsumRSRequest",
3247 [P_RS_IS_IN_SYNC] = "CsumRSIsInSync",
3248 [P_COMPRESSED_BITMAP] = "CBitmap",
3249 [P_DELAY_PROBE] = "DelayProbe",
3250 [P_OUT_OF_SYNC] = "OutOfSync",
3251 [P_RETRY_WRITE] = "RetryWrite",
3252 [P_RS_CANCEL] = "RSCancel",
3253 [P_CONN_ST_CHG_REQ] = "conn_st_chg_req",
3254 [P_CONN_ST_CHG_REPLY] = "conn_st_chg_reply",
3256 /* enum drbd_packet, but not commands - obsoleted flags:
3262 /* too big for the array: 0xfffX */
3263 if (cmd == P_INITIAL_META)
3264 return "InitialMeta";
3265 if (cmd == P_INITIAL_DATA)
3266 return "InitialData";
3267 if (cmd == P_CONNECTION_FEATURES)
3268 return "ConnectionFeatures";
3269 if (cmd >= ARRAY_SIZE(cmdnames))
3271 return cmdnames[cmd];
3275 * drbd_wait_misc - wait for a request to make progress
3276 * @mdev: device associated with the request
3277 * @i: the struct drbd_interval embedded in struct drbd_request or
3278 * struct drbd_peer_request
3280 int drbd_wait_misc(struct drbd_conf *mdev, struct drbd_interval *i)
3282 struct net_conf *nc;
3287 nc = rcu_dereference(mdev->tconn->net_conf);
3292 timeout = nc->ko_count ? nc->timeout * HZ / 10 * nc->ko_count : MAX_SCHEDULE_TIMEOUT;
3295 /* Indicate to wake up mdev->misc_wait on progress. */
3297 prepare_to_wait(&mdev->misc_wait, &wait, TASK_INTERRUPTIBLE);
3298 spin_unlock_irq(&mdev->tconn->req_lock);
3299 timeout = schedule_timeout(timeout);
3300 finish_wait(&mdev->misc_wait, &wait);
3301 spin_lock_irq(&mdev->tconn->req_lock);
3302 if (!timeout || mdev->state.conn < C_CONNECTED)
3304 if (signal_pending(current))
3305 return -ERESTARTSYS;
3309 #ifdef CONFIG_DRBD_FAULT_INJECTION
3310 /* Fault insertion support including random number generator shamelessly
3311 * stolen from kernel/rcutorture.c */
3312 struct fault_random_state {
3313 unsigned long state;
3314 unsigned long count;
3317 #define FAULT_RANDOM_MULT 39916801 /* prime */
3318 #define FAULT_RANDOM_ADD 479001701 /* prime */
3319 #define FAULT_RANDOM_REFRESH 10000
3322 * Crude but fast random-number generator. Uses a linear congruential
3323 * generator, with occasional help from get_random_bytes().
3325 static unsigned long
3326 _drbd_fault_random(struct fault_random_state *rsp)
3330 if (!rsp->count--) {
3331 get_random_bytes(&refresh, sizeof(refresh));
3332 rsp->state += refresh;
3333 rsp->count = FAULT_RANDOM_REFRESH;
3335 rsp->state = rsp->state * FAULT_RANDOM_MULT + FAULT_RANDOM_ADD;
3336 return swahw32(rsp->state);
3340 _drbd_fault_str(unsigned int type) {
3341 static char *_faults[] = {
3342 [DRBD_FAULT_MD_WR] = "Meta-data write",
3343 [DRBD_FAULT_MD_RD] = "Meta-data read",
3344 [DRBD_FAULT_RS_WR] = "Resync write",
3345 [DRBD_FAULT_RS_RD] = "Resync read",
3346 [DRBD_FAULT_DT_WR] = "Data write",
3347 [DRBD_FAULT_DT_RD] = "Data read",
3348 [DRBD_FAULT_DT_RA] = "Data read ahead",
3349 [DRBD_FAULT_BM_ALLOC] = "BM allocation",
3350 [DRBD_FAULT_AL_EE] = "EE allocation",
3351 [DRBD_FAULT_RECEIVE] = "receive data corruption",
3354 return (type < DRBD_FAULT_MAX) ? _faults[type] : "**Unknown**";
3358 _drbd_insert_fault(struct drbd_conf *mdev, unsigned int type)
3360 static struct fault_random_state rrs = {0, 0};
3362 unsigned int ret = (
3364 ((1 << mdev_to_minor(mdev)) & fault_devs) != 0) &&
3365 (((_drbd_fault_random(&rrs) % 100) + 1) <= fault_rate));
3370 if (__ratelimit(&drbd_ratelimit_state))
3371 dev_warn(DEV, "***Simulating %s failure\n",
3372 _drbd_fault_str(type));
3379 const char *drbd_buildtag(void)
3381 /* DRBD built from external sources has here a reference to the
3382 git hash of the source code. */
3384 static char buildtag[38] = "\0uilt-in";
3386 if (buildtag[0] == 0) {
3387 #ifdef CONFIG_MODULES
3388 if (THIS_MODULE != NULL)
3389 sprintf(buildtag, "srcversion: %-24s", THIS_MODULE->srcversion);
3398 module_init(drbd_init)
3399 module_exit(drbd_cleanup)
3401 EXPORT_SYMBOL(drbd_conn_str);
3402 EXPORT_SYMBOL(drbd_role_str);
3403 EXPORT_SYMBOL(drbd_disk_str);
3404 EXPORT_SYMBOL(drbd_set_st_err_str);