4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
10 drbd is free software; you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation; either version 2, or (at your option)
15 drbd is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
20 You should have received a copy of the GNU General Public License
21 along with drbd; see the file COPYING. If not, write to
22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
26 #include <linux/module.h>
27 #include <linux/drbd.h>
28 #include <linux/sched.h>
29 #include <linux/wait.h>
31 #include <linux/memcontrol.h>
32 #include <linux/mm_inline.h>
33 #include <linux/slab.h>
34 #include <linux/random.h>
35 #include <linux/string.h>
36 #include <linux/scatterlist.h>
39 #include "drbd_protocol.h"
42 static int make_ov_request(struct drbd_device *, int);
43 static int make_resync_request(struct drbd_device *, int);
46 * drbd_md_endio (defined here)
47 * drbd_request_endio (defined here)
48 * drbd_peer_request_endio (defined here)
49 * drbd_bm_endio (defined in drbd_bitmap.c)
51 * For all these callbacks, note the following:
52 * The callbacks will be called in irq context by the IDE drivers,
53 * and in Softirqs/Tasklets/BH context by the SCSI drivers.
54 * Try to get the locking right :)
58 /* used for synchronous meta data and bitmap IO
59 * submitted by drbd_md_sync_page_io()
61 void drbd_md_endio(struct bio *bio)
63 struct drbd_device *device;
65 device = bio->bi_private;
66 device->md_io.error = bio->bi_error;
68 /* We grabbed an extra reference in _drbd_md_sync_page_io() to be able
69 * to timeout on the lower level device, and eventually detach from it.
70 * If this io completion runs after that timeout expired, this
71 * drbd_md_put_buffer() may allow us to finally try and re-attach.
72 * During normal operation, this only puts that extra reference
74 * Make sure we first drop the reference, and only then signal
75 * completion, or we may (in drbd_al_read_log()) cycle so fast into the
76 * next drbd_md_sync_page_io(), that we trigger the
77 * ASSERT(atomic_read(&device->md_io_in_use) == 1) there.
79 drbd_md_put_buffer(device);
80 device->md_io.done = 1;
81 wake_up(&device->misc_wait);
83 if (device->ldev) /* special case: drbd_md_read() during drbd_adm_attach() */
87 /* reads on behalf of the partner,
88 * "submitted" by the receiver
90 static void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local)
92 unsigned long flags = 0;
93 struct drbd_peer_device *peer_device = peer_req->peer_device;
94 struct drbd_device *device = peer_device->device;
96 spin_lock_irqsave(&device->resource->req_lock, flags);
97 device->read_cnt += peer_req->i.size >> 9;
98 list_del(&peer_req->w.list);
99 if (list_empty(&device->read_ee))
100 wake_up(&device->ee_wait);
101 if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
102 __drbd_chk_io_error(device, DRBD_READ_ERROR);
103 spin_unlock_irqrestore(&device->resource->req_lock, flags);
105 drbd_queue_work(&peer_device->connection->sender_work, &peer_req->w);
109 /* writes on behalf of the partner, or resync writes,
110 * "submitted" by the receiver, final stage. */
111 void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local)
113 unsigned long flags = 0;
114 struct drbd_peer_device *peer_device = peer_req->peer_device;
115 struct drbd_device *device = peer_device->device;
116 struct drbd_connection *connection = peer_device->connection;
117 struct drbd_interval i;
120 int do_al_complete_io;
122 /* after we moved peer_req to done_ee,
123 * we may no longer access it,
124 * it may be freed/reused already!
125 * (as soon as we release the req_lock) */
127 do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO;
128 block_id = peer_req->block_id;
129 peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
131 spin_lock_irqsave(&device->resource->req_lock, flags);
132 device->writ_cnt += peer_req->i.size >> 9;
133 list_move_tail(&peer_req->w.list, &device->done_ee);
136 * Do not remove from the write_requests tree here: we did not send the
137 * Ack yet and did not wake possibly waiting conflicting requests.
138 * Removed from the tree from "drbd_process_done_ee" within the
139 * appropriate dw.cb (e_end_block/e_end_resync_block) or from
140 * _drbd_clear_done_ee.
143 do_wake = list_empty(block_id == ID_SYNCER ? &device->sync_ee : &device->active_ee);
145 /* FIXME do we want to detach for failed REQ_DISCARD?
146 * ((peer_req->flags & (EE_WAS_ERROR|EE_IS_TRIM)) == EE_WAS_ERROR) */
147 if (peer_req->flags & EE_WAS_ERROR)
148 __drbd_chk_io_error(device, DRBD_WRITE_ERROR);
150 if (connection->cstate >= C_WF_REPORT_PARAMS) {
151 kref_get(&device->kref); /* put is in drbd_send_acks_wf() */
152 if (!queue_work(connection->ack_sender, &peer_device->send_acks_work))
153 kref_put(&device->kref, drbd_destroy_device);
155 spin_unlock_irqrestore(&device->resource->req_lock, flags);
157 if (block_id == ID_SYNCER)
158 drbd_rs_complete_io(device, i.sector);
161 wake_up(&device->ee_wait);
163 if (do_al_complete_io)
164 drbd_al_complete_io(device, &i);
169 /* writes on behalf of the partner, or resync writes,
170 * "submitted" by the receiver.
172 void drbd_peer_request_endio(struct bio *bio)
174 struct drbd_peer_request *peer_req = bio->bi_private;
175 struct drbd_device *device = peer_req->peer_device->device;
176 bool is_write = bio_data_dir(bio) == WRITE;
177 bool is_discard = !!(bio_op(bio) == REQ_OP_DISCARD);
179 if (bio->bi_error && __ratelimit(&drbd_ratelimit_state))
180 drbd_warn(device, "%s: error=%d s=%llus\n",
181 is_write ? (is_discard ? "discard" : "write")
182 : "read", bio->bi_error,
183 (unsigned long long)peer_req->i.sector);
186 set_bit(__EE_WAS_ERROR, &peer_req->flags);
188 bio_put(bio); /* no need for the bio anymore */
189 if (atomic_dec_and_test(&peer_req->pending_bios)) {
191 drbd_endio_write_sec_final(peer_req);
193 drbd_endio_read_sec_final(peer_req);
197 void drbd_panic_after_delayed_completion_of_aborted_request(struct drbd_device *device)
199 panic("drbd%u %s/%u potential random memory corruption caused by delayed completion of aborted local request\n",
200 device->minor, device->resource->name, device->vnr);
203 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
205 void drbd_request_endio(struct bio *bio)
208 struct drbd_request *req = bio->bi_private;
209 struct drbd_device *device = req->device;
210 struct bio_and_error m;
211 enum drbd_req_event what;
213 /* If this request was aborted locally before,
214 * but now was completed "successfully",
215 * chances are that this caused arbitrary data corruption.
217 * "aborting" requests, or force-detaching the disk, is intended for
218 * completely blocked/hung local backing devices which do no longer
219 * complete requests at all, not even do error completions. In this
220 * situation, usually a hard-reset and failover is the only way out.
222 * By "aborting", basically faking a local error-completion,
223 * we allow for a more graceful swichover by cleanly migrating services.
224 * Still the affected node has to be rebooted "soon".
226 * By completing these requests, we allow the upper layers to re-use
227 * the associated data pages.
229 * If later the local backing device "recovers", and now DMAs some data
230 * from disk into the original request pages, in the best case it will
231 * just put random data into unused pages; but typically it will corrupt
232 * meanwhile completely unrelated data, causing all sorts of damage.
234 * Which means delayed successful completion,
235 * especially for READ requests,
236 * is a reason to panic().
238 * We assume that a delayed *error* completion is OK,
239 * though we still will complain noisily about it.
241 if (unlikely(req->rq_state & RQ_LOCAL_ABORTED)) {
242 if (__ratelimit(&drbd_ratelimit_state))
243 drbd_emerg(device, "delayed completion of aborted local request; disk-timeout may be too aggressive\n");
246 drbd_panic_after_delayed_completion_of_aborted_request(device);
249 /* to avoid recursion in __req_mod */
250 if (unlikely(bio->bi_error)) {
251 if (bio_op(bio) == REQ_OP_DISCARD)
252 what = (bio->bi_error == -EOPNOTSUPP)
253 ? DISCARD_COMPLETED_NOTSUPP
254 : DISCARD_COMPLETED_WITH_ERROR;
256 what = (bio_data_dir(bio) == WRITE)
257 ? WRITE_COMPLETED_WITH_ERROR
258 : (bio_rw(bio) == READ)
259 ? READ_COMPLETED_WITH_ERROR
260 : READ_AHEAD_COMPLETED_WITH_ERROR;
264 bio_put(req->private_bio);
265 req->private_bio = ERR_PTR(bio->bi_error);
267 /* not req_mod(), we need irqsave here! */
268 spin_lock_irqsave(&device->resource->req_lock, flags);
269 __req_mod(req, what, &m);
270 spin_unlock_irqrestore(&device->resource->req_lock, flags);
274 complete_master_bio(device, &m);
277 void drbd_csum_ee(struct crypto_ahash *tfm, struct drbd_peer_request *peer_req, void *digest)
279 AHASH_REQUEST_ON_STACK(req, tfm);
280 struct scatterlist sg;
281 struct page *page = peer_req->pages;
285 ahash_request_set_tfm(req, tfm);
286 ahash_request_set_callback(req, 0, NULL, NULL);
288 sg_init_table(&sg, 1);
289 crypto_ahash_init(req);
291 while ((tmp = page_chain_next(page))) {
292 /* all but the last page will be fully used */
293 sg_set_page(&sg, page, PAGE_SIZE, 0);
294 ahash_request_set_crypt(req, &sg, NULL, sg.length);
295 crypto_ahash_update(req);
298 /* and now the last, possibly only partially used page */
299 len = peer_req->i.size & (PAGE_SIZE - 1);
300 sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
301 ahash_request_set_crypt(req, &sg, digest, sg.length);
302 crypto_ahash_finup(req);
303 ahash_request_zero(req);
306 void drbd_csum_bio(struct crypto_ahash *tfm, struct bio *bio, void *digest)
308 AHASH_REQUEST_ON_STACK(req, tfm);
309 struct scatterlist sg;
311 struct bvec_iter iter;
313 ahash_request_set_tfm(req, tfm);
314 ahash_request_set_callback(req, 0, NULL, NULL);
316 sg_init_table(&sg, 1);
317 crypto_ahash_init(req);
319 bio_for_each_segment(bvec, bio, iter) {
320 sg_set_page(&sg, bvec.bv_page, bvec.bv_len, bvec.bv_offset);
321 ahash_request_set_crypt(req, &sg, NULL, sg.length);
322 crypto_ahash_update(req);
323 /* REQ_OP_WRITE_SAME has only one segment,
324 * checksum the payload only once. */
325 if (bio_op(bio) == REQ_OP_WRITE_SAME)
328 ahash_request_set_crypt(req, NULL, digest, 0);
329 crypto_ahash_final(req);
330 ahash_request_zero(req);
333 /* MAYBE merge common code with w_e_end_ov_req */
334 static int w_e_send_csum(struct drbd_work *w, int cancel)
336 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
337 struct drbd_peer_device *peer_device = peer_req->peer_device;
338 struct drbd_device *device = peer_device->device;
343 if (unlikely(cancel))
346 if (unlikely((peer_req->flags & EE_WAS_ERROR) != 0))
349 digest_size = crypto_ahash_digestsize(peer_device->connection->csums_tfm);
350 digest = kmalloc(digest_size, GFP_NOIO);
352 sector_t sector = peer_req->i.sector;
353 unsigned int size = peer_req->i.size;
354 drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
355 /* Free peer_req and pages before send.
356 * In case we block on congestion, we could otherwise run into
357 * some distributed deadlock, if the other side blocks on
358 * congestion as well, because our receiver blocks in
359 * drbd_alloc_pages due to pp_in_use > max_buffers. */
360 drbd_free_peer_req(device, peer_req);
362 inc_rs_pending(device);
363 err = drbd_send_drequest_csum(peer_device, sector, size,
368 drbd_err(device, "kmalloc() of digest failed.\n");
374 drbd_free_peer_req(device, peer_req);
377 drbd_err(device, "drbd_send_drequest(..., csum) failed\n");
381 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
383 static int read_for_csum(struct drbd_peer_device *peer_device, sector_t sector, int size)
385 struct drbd_device *device = peer_device->device;
386 struct drbd_peer_request *peer_req;
388 if (!get_ldev(device))
391 /* GFP_TRY, because if there is no memory available right now, this may
392 * be rescheduled for later. It is "only" background resync, after all. */
393 peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER /* unused */, sector,
394 size, size, GFP_TRY);
398 peer_req->w.cb = w_e_send_csum;
399 spin_lock_irq(&device->resource->req_lock);
400 list_add_tail(&peer_req->w.list, &device->read_ee);
401 spin_unlock_irq(&device->resource->req_lock);
403 atomic_add(size >> 9, &device->rs_sect_ev);
404 if (drbd_submit_peer_request(device, peer_req, REQ_OP_READ, 0,
405 DRBD_FAULT_RS_RD) == 0)
408 /* If it failed because of ENOMEM, retry should help. If it failed
409 * because bio_add_page failed (probably broken lower level driver),
410 * retry may or may not help.
411 * If it does not, you may need to force disconnect. */
412 spin_lock_irq(&device->resource->req_lock);
413 list_del(&peer_req->w.list);
414 spin_unlock_irq(&device->resource->req_lock);
416 drbd_free_peer_req(device, peer_req);
422 int w_resync_timer(struct drbd_work *w, int cancel)
424 struct drbd_device *device =
425 container_of(w, struct drbd_device, resync_work);
427 switch (device->state.conn) {
429 make_ov_request(device, cancel);
432 make_resync_request(device, cancel);
439 void resync_timer_fn(unsigned long data)
441 struct drbd_device *device = (struct drbd_device *) data;
443 drbd_queue_work_if_unqueued(
444 &first_peer_device(device)->connection->sender_work,
445 &device->resync_work);
448 static void fifo_set(struct fifo_buffer *fb, int value)
452 for (i = 0; i < fb->size; i++)
453 fb->values[i] = value;
456 static int fifo_push(struct fifo_buffer *fb, int value)
460 ov = fb->values[fb->head_index];
461 fb->values[fb->head_index++] = value;
463 if (fb->head_index >= fb->size)
469 static void fifo_add_val(struct fifo_buffer *fb, int value)
473 for (i = 0; i < fb->size; i++)
474 fb->values[i] += value;
477 struct fifo_buffer *fifo_alloc(int fifo_size)
479 struct fifo_buffer *fb;
481 fb = kzalloc(sizeof(struct fifo_buffer) + sizeof(int) * fifo_size, GFP_NOIO);
486 fb->size = fifo_size;
492 static int drbd_rs_controller(struct drbd_device *device, unsigned int sect_in)
494 struct disk_conf *dc;
495 unsigned int want; /* The number of sectors we want in-flight */
496 int req_sect; /* Number of sectors to request in this turn */
497 int correction; /* Number of sectors more we need in-flight */
498 int cps; /* correction per invocation of drbd_rs_controller() */
499 int steps; /* Number of time steps to plan ahead */
502 struct fifo_buffer *plan;
504 dc = rcu_dereference(device->ldev->disk_conf);
505 plan = rcu_dereference(device->rs_plan_s);
507 steps = plan->size; /* (dc->c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
509 if (device->rs_in_flight + sect_in == 0) { /* At start of resync */
510 want = ((dc->resync_rate * 2 * SLEEP_TIME) / HZ) * steps;
511 } else { /* normal path */
512 want = dc->c_fill_target ? dc->c_fill_target :
513 sect_in * dc->c_delay_target * HZ / (SLEEP_TIME * 10);
516 correction = want - device->rs_in_flight - plan->total;
519 cps = correction / steps;
520 fifo_add_val(plan, cps);
521 plan->total += cps * steps;
523 /* What we do in this step */
524 curr_corr = fifo_push(plan, 0);
525 plan->total -= curr_corr;
527 req_sect = sect_in + curr_corr;
531 max_sect = (dc->c_max_rate * 2 * SLEEP_TIME) / HZ;
532 if (req_sect > max_sect)
536 drbd_warn(device, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
537 sect_in, device->rs_in_flight, want, correction,
538 steps, cps, device->rs_planed, curr_corr, req_sect);
544 static int drbd_rs_number_requests(struct drbd_device *device)
546 unsigned int sect_in; /* Number of sectors that came in since the last turn */
549 sect_in = atomic_xchg(&device->rs_sect_in, 0);
550 device->rs_in_flight -= sect_in;
553 mxb = drbd_get_max_buffers(device) / 2;
554 if (rcu_dereference(device->rs_plan_s)->size) {
555 number = drbd_rs_controller(device, sect_in) >> (BM_BLOCK_SHIFT - 9);
556 device->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
558 device->c_sync_rate = rcu_dereference(device->ldev->disk_conf)->resync_rate;
559 number = SLEEP_TIME * device->c_sync_rate / ((BM_BLOCK_SIZE / 1024) * HZ);
563 /* Don't have more than "max-buffers"/2 in-flight.
564 * Otherwise we may cause the remote site to stall on drbd_alloc_pages(),
565 * potentially causing a distributed deadlock on congestion during
566 * online-verify or (checksum-based) resync, if max-buffers,
567 * socket buffer sizes and resync rate settings are mis-configured. */
569 /* note that "number" is in units of "BM_BLOCK_SIZE" (which is 4k),
570 * mxb (as used here, and in drbd_alloc_pages on the peer) is
571 * "number of pages" (typically also 4k),
572 * but "rs_in_flight" is in "sectors" (512 Byte). */
573 if (mxb - device->rs_in_flight/8 < number)
574 number = mxb - device->rs_in_flight/8;
579 static int make_resync_request(struct drbd_device *const device, int cancel)
581 struct drbd_peer_device *const peer_device = first_peer_device(device);
582 struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
585 const sector_t capacity = drbd_get_capacity(device->this_bdev);
587 int number, rollback_i, size;
588 int align, requeue = 0;
590 int discard_granularity = 0;
592 if (unlikely(cancel))
595 if (device->rs_total == 0) {
597 drbd_resync_finished(device);
601 if (!get_ldev(device)) {
602 /* Since we only need to access device->rsync a
603 get_ldev_if_state(device,D_FAILED) would be sufficient, but
604 to continue resync with a broken disk makes no sense at
606 drbd_err(device, "Disk broke down during resync!\n");
610 if (connection->agreed_features & DRBD_FF_THIN_RESYNC) {
612 discard_granularity = rcu_dereference(device->ldev->disk_conf)->rs_discard_granularity;
616 max_bio_size = queue_max_hw_sectors(device->rq_queue) << 9;
617 number = drbd_rs_number_requests(device);
621 for (i = 0; i < number; i++) {
622 /* Stop generating RS requests when half of the send buffer is filled,
623 * but notify TCP that we'd like to have more space. */
624 mutex_lock(&connection->data.mutex);
625 if (connection->data.socket) {
626 struct sock *sk = connection->data.socket->sk;
627 int queued = sk->sk_wmem_queued;
628 int sndbuf = sk->sk_sndbuf;
629 if (queued > sndbuf / 2) {
632 set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
636 mutex_unlock(&connection->data.mutex);
641 size = BM_BLOCK_SIZE;
642 bit = drbd_bm_find_next(device, device->bm_resync_fo);
644 if (bit == DRBD_END_OF_BITMAP) {
645 device->bm_resync_fo = drbd_bm_bits(device);
650 sector = BM_BIT_TO_SECT(bit);
652 if (drbd_try_rs_begin_io(device, sector)) {
653 device->bm_resync_fo = bit;
656 device->bm_resync_fo = bit + 1;
658 if (unlikely(drbd_bm_test_bit(device, bit) == 0)) {
659 drbd_rs_complete_io(device, sector);
663 #if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
664 /* try to find some adjacent bits.
665 * we stop if we have already the maximum req size.
667 * Additionally always align bigger requests, in order to
668 * be prepared for all stripe sizes of software RAIDs.
673 if (size + BM_BLOCK_SIZE > max_bio_size)
676 /* Be always aligned */
677 if (sector & ((1<<(align+3))-1))
680 if (discard_granularity && size == discard_granularity)
683 /* do not cross extent boundaries */
684 if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
686 /* now, is it actually dirty, after all?
687 * caution, drbd_bm_test_bit is tri-state for some
688 * obscure reason; ( b == 0 ) would get the out-of-band
689 * only accidentally right because of the "oddly sized"
690 * adjustment below */
691 if (drbd_bm_test_bit(device, bit+1) != 1)
694 size += BM_BLOCK_SIZE;
695 if ((BM_BLOCK_SIZE << align) <= size)
699 /* if we merged some,
700 * reset the offset to start the next drbd_bm_find_next from */
701 if (size > BM_BLOCK_SIZE)
702 device->bm_resync_fo = bit + 1;
705 /* adjust very last sectors, in case we are oddly sized */
706 if (sector + (size>>9) > capacity)
707 size = (capacity-sector)<<9;
709 if (device->use_csums) {
710 switch (read_for_csum(peer_device, sector, size)) {
711 case -EIO: /* Disk failure */
714 case -EAGAIN: /* allocation failed, or ldev busy */
715 drbd_rs_complete_io(device, sector);
716 device->bm_resync_fo = BM_SECT_TO_BIT(sector);
728 inc_rs_pending(device);
729 err = drbd_send_drequest(peer_device,
730 size == discard_granularity ? P_RS_THIN_REQ : P_RS_DATA_REQUEST,
731 sector, size, ID_SYNCER);
733 drbd_err(device, "drbd_send_drequest() failed, aborting...\n");
734 dec_rs_pending(device);
741 if (device->bm_resync_fo >= drbd_bm_bits(device)) {
742 /* last syncer _request_ was sent,
743 * but the P_RS_DATA_REPLY not yet received. sync will end (and
744 * next sync group will resume), as soon as we receive the last
745 * resync data block, and the last bit is cleared.
746 * until then resync "work" is "inactive" ...
753 device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
754 mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
759 static int make_ov_request(struct drbd_device *device, int cancel)
763 const sector_t capacity = drbd_get_capacity(device->this_bdev);
764 bool stop_sector_reached = false;
766 if (unlikely(cancel))
769 number = drbd_rs_number_requests(device);
771 sector = device->ov_position;
772 for (i = 0; i < number; i++) {
773 if (sector >= capacity)
776 /* We check for "finished" only in the reply path:
777 * w_e_end_ov_reply().
778 * We need to send at least one request out. */
779 stop_sector_reached = i > 0
780 && verify_can_do_stop_sector(device)
781 && sector >= device->ov_stop_sector;
782 if (stop_sector_reached)
785 size = BM_BLOCK_SIZE;
787 if (drbd_try_rs_begin_io(device, sector)) {
788 device->ov_position = sector;
792 if (sector + (size>>9) > capacity)
793 size = (capacity-sector)<<9;
795 inc_rs_pending(device);
796 if (drbd_send_ov_request(first_peer_device(device), sector, size)) {
797 dec_rs_pending(device);
800 sector += BM_SECT_PER_BIT;
802 device->ov_position = sector;
805 device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
806 if (i == 0 || !stop_sector_reached)
807 mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
811 int w_ov_finished(struct drbd_work *w, int cancel)
813 struct drbd_device_work *dw =
814 container_of(w, struct drbd_device_work, w);
815 struct drbd_device *device = dw->device;
817 ov_out_of_sync_print(device);
818 drbd_resync_finished(device);
823 static int w_resync_finished(struct drbd_work *w, int cancel)
825 struct drbd_device_work *dw =
826 container_of(w, struct drbd_device_work, w);
827 struct drbd_device *device = dw->device;
830 drbd_resync_finished(device);
835 static void ping_peer(struct drbd_device *device)
837 struct drbd_connection *connection = first_peer_device(device)->connection;
839 clear_bit(GOT_PING_ACK, &connection->flags);
840 request_ping(connection);
841 wait_event(connection->ping_wait,
842 test_bit(GOT_PING_ACK, &connection->flags) || device->state.conn < C_CONNECTED);
845 int drbd_resync_finished(struct drbd_device *device)
847 struct drbd_connection *connection = first_peer_device(device)->connection;
848 unsigned long db, dt, dbdt;
850 union drbd_state os, ns;
851 struct drbd_device_work *dw;
852 char *khelper_cmd = NULL;
855 /* Remove all elements from the resync LRU. Since future actions
856 * might set bits in the (main) bitmap, then the entries in the
857 * resync LRU would be wrong. */
858 if (drbd_rs_del_all(device)) {
859 /* In case this is not possible now, most probably because
860 * there are P_RS_DATA_REPLY Packets lingering on the worker's
861 * queue (or even the read operations for those packets
862 * is not finished by now). Retry in 100ms. */
864 schedule_timeout_interruptible(HZ / 10);
865 dw = kmalloc(sizeof(struct drbd_device_work), GFP_ATOMIC);
867 dw->w.cb = w_resync_finished;
869 drbd_queue_work(&connection->sender_work, &dw->w);
872 drbd_err(device, "Warn failed to drbd_rs_del_all() and to kmalloc(dw).\n");
875 dt = (jiffies - device->rs_start - device->rs_paused) / HZ;
879 db = device->rs_total;
880 /* adjust for verify start and stop sectors, respective reached position */
881 if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
882 db -= device->ov_left;
884 dbdt = Bit2KB(db/dt);
885 device->rs_paused /= HZ;
887 if (!get_ldev(device))
892 spin_lock_irq(&device->resource->req_lock);
893 os = drbd_read_state(device);
895 verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
897 /* This protects us against multiple calls (that can happen in the presence
898 of application IO), and against connectivity loss just before we arrive here. */
899 if (os.conn <= C_CONNECTED)
903 ns.conn = C_CONNECTED;
905 drbd_info(device, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
906 verify_done ? "Online verify" : "Resync",
907 dt + device->rs_paused, device->rs_paused, dbdt);
909 n_oos = drbd_bm_total_weight(device);
911 if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
913 drbd_alert(device, "Online verify found %lu %dk block out of sync!\n",
915 khelper_cmd = "out-of-sync";
918 D_ASSERT(device, (n_oos - device->rs_failed) == 0);
920 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
921 khelper_cmd = "after-resync-target";
923 if (device->use_csums && device->rs_total) {
924 const unsigned long s = device->rs_same_csum;
925 const unsigned long t = device->rs_total;
928 (t < 100000) ? ((s*100)/t) : (s/(t/100));
929 drbd_info(device, "%u %% had equal checksums, eliminated: %luK; "
930 "transferred %luK total %luK\n",
932 Bit2KB(device->rs_same_csum),
933 Bit2KB(device->rs_total - device->rs_same_csum),
934 Bit2KB(device->rs_total));
938 if (device->rs_failed) {
939 drbd_info(device, " %lu failed blocks\n", device->rs_failed);
941 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
942 ns.disk = D_INCONSISTENT;
943 ns.pdsk = D_UP_TO_DATE;
945 ns.disk = D_UP_TO_DATE;
946 ns.pdsk = D_INCONSISTENT;
949 ns.disk = D_UP_TO_DATE;
950 ns.pdsk = D_UP_TO_DATE;
952 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
953 if (device->p_uuid) {
955 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
956 _drbd_uuid_set(device, i, device->p_uuid[i]);
957 drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_CURRENT]);
958 _drbd_uuid_set(device, UI_CURRENT, device->p_uuid[UI_CURRENT]);
960 drbd_err(device, "device->p_uuid is NULL! BUG\n");
964 if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
965 /* for verify runs, we don't update uuids here,
966 * so there would be nothing to report. */
967 drbd_uuid_set_bm(device, 0UL);
968 drbd_print_uuids(device, "updated UUIDs");
969 if (device->p_uuid) {
970 /* Now the two UUID sets are equal, update what we
971 * know of the peer. */
973 for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
974 device->p_uuid[i] = device->ldev->md.uuid[i];
979 _drbd_set_state(device, ns, CS_VERBOSE, NULL);
981 spin_unlock_irq(&device->resource->req_lock);
983 /* If we have been sync source, and have an effective fencing-policy,
984 * once *all* volumes are back in sync, call "unfence". */
985 if (os.conn == C_SYNC_SOURCE) {
986 enum drbd_disk_state disk_state = D_MASK;
987 enum drbd_disk_state pdsk_state = D_MASK;
988 enum drbd_fencing_p fp = FP_DONT_CARE;
991 fp = rcu_dereference(device->ldev->disk_conf)->fencing;
992 if (fp != FP_DONT_CARE) {
993 struct drbd_peer_device *peer_device;
995 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
996 struct drbd_device *device = peer_device->device;
997 disk_state = min_t(enum drbd_disk_state, disk_state, device->state.disk);
998 pdsk_state = min_t(enum drbd_disk_state, pdsk_state, device->state.pdsk);
1002 if (disk_state == D_UP_TO_DATE && pdsk_state == D_UP_TO_DATE)
1003 conn_khelper(connection, "unfence-peer");
1008 device->rs_total = 0;
1009 device->rs_failed = 0;
1010 device->rs_paused = 0;
1012 /* reset start sector, if we reached end of device */
1013 if (verify_done && device->ov_left == 0)
1014 device->ov_start_sector = 0;
1016 drbd_md_sync(device);
1019 drbd_khelper(device, khelper_cmd);
1025 static void move_to_net_ee_or_free(struct drbd_device *device, struct drbd_peer_request *peer_req)
1027 if (drbd_peer_req_has_active_page(peer_req)) {
1028 /* This might happen if sendpage() has not finished */
1029 int i = (peer_req->i.size + PAGE_SIZE -1) >> PAGE_SHIFT;
1030 atomic_add(i, &device->pp_in_use_by_net);
1031 atomic_sub(i, &device->pp_in_use);
1032 spin_lock_irq(&device->resource->req_lock);
1033 list_add_tail(&peer_req->w.list, &device->net_ee);
1034 spin_unlock_irq(&device->resource->req_lock);
1035 wake_up(&drbd_pp_wait);
1037 drbd_free_peer_req(device, peer_req);
1041 * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
1043 * @cancel: The connection will be closed anyways
1045 int w_e_end_data_req(struct drbd_work *w, int cancel)
1047 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1048 struct drbd_peer_device *peer_device = peer_req->peer_device;
1049 struct drbd_device *device = peer_device->device;
1052 if (unlikely(cancel)) {
1053 drbd_free_peer_req(device, peer_req);
1054 dec_unacked(device);
1058 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1059 err = drbd_send_block(peer_device, P_DATA_REPLY, peer_req);
1061 if (__ratelimit(&drbd_ratelimit_state))
1062 drbd_err(device, "Sending NegDReply. sector=%llus.\n",
1063 (unsigned long long)peer_req->i.sector);
1065 err = drbd_send_ack(peer_device, P_NEG_DREPLY, peer_req);
1068 dec_unacked(device);
1070 move_to_net_ee_or_free(device, peer_req);
1073 drbd_err(device, "drbd_send_block() failed\n");
1077 static bool all_zero(struct drbd_peer_request *peer_req)
1079 struct page *page = peer_req->pages;
1080 unsigned int len = peer_req->i.size;
1082 page_chain_for_each(page) {
1083 unsigned int l = min_t(unsigned int, len, PAGE_SIZE);
1084 unsigned int i, words = l / sizeof(long);
1087 d = kmap_atomic(page);
1088 for (i = 0; i < words; i++) {
1102 * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUEST
1104 * @cancel: The connection will be closed anyways
1106 int w_e_end_rsdata_req(struct drbd_work *w, int cancel)
1108 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1109 struct drbd_peer_device *peer_device = peer_req->peer_device;
1110 struct drbd_device *device = peer_device->device;
1113 if (unlikely(cancel)) {
1114 drbd_free_peer_req(device, peer_req);
1115 dec_unacked(device);
1119 if (get_ldev_if_state(device, D_FAILED)) {
1120 drbd_rs_complete_io(device, peer_req->i.sector);
1124 if (device->state.conn == C_AHEAD) {
1125 err = drbd_send_ack(peer_device, P_RS_CANCEL, peer_req);
1126 } else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1127 if (likely(device->state.pdsk >= D_INCONSISTENT)) {
1128 inc_rs_pending(device);
1129 if (peer_req->flags & EE_RS_THIN_REQ && all_zero(peer_req))
1130 err = drbd_send_rs_deallocated(peer_device, peer_req);
1132 err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1134 if (__ratelimit(&drbd_ratelimit_state))
1135 drbd_err(device, "Not sending RSDataReply, "
1136 "partner DISKLESS!\n");
1140 if (__ratelimit(&drbd_ratelimit_state))
1141 drbd_err(device, "Sending NegRSDReply. sector %llus.\n",
1142 (unsigned long long)peer_req->i.sector);
1144 err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1146 /* update resync data with failure */
1147 drbd_rs_failed_io(device, peer_req->i.sector, peer_req->i.size);
1150 dec_unacked(device);
1152 move_to_net_ee_or_free(device, peer_req);
1155 drbd_err(device, "drbd_send_block() failed\n");
1159 int w_e_end_csum_rs_req(struct drbd_work *w, int cancel)
1161 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1162 struct drbd_peer_device *peer_device = peer_req->peer_device;
1163 struct drbd_device *device = peer_device->device;
1164 struct digest_info *di;
1166 void *digest = NULL;
1169 if (unlikely(cancel)) {
1170 drbd_free_peer_req(device, peer_req);
1171 dec_unacked(device);
1175 if (get_ldev(device)) {
1176 drbd_rs_complete_io(device, peer_req->i.sector);
1180 di = peer_req->digest;
1182 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1183 /* quick hack to try to avoid a race against reconfiguration.
1184 * a real fix would be much more involved,
1185 * introducing more locking mechanisms */
1186 if (peer_device->connection->csums_tfm) {
1187 digest_size = crypto_ahash_digestsize(peer_device->connection->csums_tfm);
1188 D_ASSERT(device, digest_size == di->digest_size);
1189 digest = kmalloc(digest_size, GFP_NOIO);
1192 drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
1193 eq = !memcmp(digest, di->digest, digest_size);
1198 drbd_set_in_sync(device, peer_req->i.sector, peer_req->i.size);
1199 /* rs_same_csums unit is BM_BLOCK_SIZE */
1200 device->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT;
1201 err = drbd_send_ack(peer_device, P_RS_IS_IN_SYNC, peer_req);
1203 inc_rs_pending(device);
1204 peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1205 peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */
1207 err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1210 err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1211 if (__ratelimit(&drbd_ratelimit_state))
1212 drbd_err(device, "Sending NegDReply. I guess it gets messy.\n");
1215 dec_unacked(device);
1216 move_to_net_ee_or_free(device, peer_req);
1219 drbd_err(device, "drbd_send_block/ack() failed\n");
1223 int w_e_end_ov_req(struct drbd_work *w, int cancel)
1225 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1226 struct drbd_peer_device *peer_device = peer_req->peer_device;
1227 struct drbd_device *device = peer_device->device;
1228 sector_t sector = peer_req->i.sector;
1229 unsigned int size = peer_req->i.size;
1234 if (unlikely(cancel))
1237 digest_size = crypto_ahash_digestsize(peer_device->connection->verify_tfm);
1238 digest = kmalloc(digest_size, GFP_NOIO);
1240 err = 1; /* terminate the connection in case the allocation failed */
1244 if (likely(!(peer_req->flags & EE_WAS_ERROR)))
1245 drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1247 memset(digest, 0, digest_size);
1249 /* Free e and pages before send.
1250 * In case we block on congestion, we could otherwise run into
1251 * some distributed deadlock, if the other side blocks on
1252 * congestion as well, because our receiver blocks in
1253 * drbd_alloc_pages due to pp_in_use > max_buffers. */
1254 drbd_free_peer_req(device, peer_req);
1256 inc_rs_pending(device);
1257 err = drbd_send_drequest_csum(peer_device, sector, size, digest, digest_size, P_OV_REPLY);
1259 dec_rs_pending(device);
1264 drbd_free_peer_req(device, peer_req);
1265 dec_unacked(device);
1269 void drbd_ov_out_of_sync_found(struct drbd_device *device, sector_t sector, int size)
1271 if (device->ov_last_oos_start + device->ov_last_oos_size == sector) {
1272 device->ov_last_oos_size += size>>9;
1274 device->ov_last_oos_start = sector;
1275 device->ov_last_oos_size = size>>9;
1277 drbd_set_out_of_sync(device, sector, size);
1280 int w_e_end_ov_reply(struct drbd_work *w, int cancel)
1282 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1283 struct drbd_peer_device *peer_device = peer_req->peer_device;
1284 struct drbd_device *device = peer_device->device;
1285 struct digest_info *di;
1287 sector_t sector = peer_req->i.sector;
1288 unsigned int size = peer_req->i.size;
1291 bool stop_sector_reached = false;
1293 if (unlikely(cancel)) {
1294 drbd_free_peer_req(device, peer_req);
1295 dec_unacked(device);
1299 /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1300 * the resync lru has been cleaned up already */
1301 if (get_ldev(device)) {
1302 drbd_rs_complete_io(device, peer_req->i.sector);
1306 di = peer_req->digest;
1308 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1309 digest_size = crypto_ahash_digestsize(peer_device->connection->verify_tfm);
1310 digest = kmalloc(digest_size, GFP_NOIO);
1312 drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1314 D_ASSERT(device, digest_size == di->digest_size);
1315 eq = !memcmp(digest, di->digest, digest_size);
1320 /* Free peer_req and pages before send.
1321 * In case we block on congestion, we could otherwise run into
1322 * some distributed deadlock, if the other side blocks on
1323 * congestion as well, because our receiver blocks in
1324 * drbd_alloc_pages due to pp_in_use > max_buffers. */
1325 drbd_free_peer_req(device, peer_req);
1327 drbd_ov_out_of_sync_found(device, sector, size);
1329 ov_out_of_sync_print(device);
1331 err = drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size,
1332 eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1334 dec_unacked(device);
1338 /* let's advance progress step marks only for every other megabyte */
1339 if ((device->ov_left & 0x200) == 0x200)
1340 drbd_advance_rs_marks(device, device->ov_left);
1342 stop_sector_reached = verify_can_do_stop_sector(device) &&
1343 (sector + (size>>9)) >= device->ov_stop_sector;
1345 if (device->ov_left == 0 || stop_sector_reached) {
1346 ov_out_of_sync_print(device);
1347 drbd_resync_finished(device);
1354 * We need to track the number of pending barrier acks,
1355 * and to be able to wait for them.
1356 * See also comment in drbd_adm_attach before drbd_suspend_io.
1358 static int drbd_send_barrier(struct drbd_connection *connection)
1360 struct p_barrier *p;
1361 struct drbd_socket *sock;
1363 sock = &connection->data;
1364 p = conn_prepare_command(connection, sock);
1367 p->barrier = connection->send.current_epoch_nr;
1369 connection->send.current_epoch_writes = 0;
1370 connection->send.last_sent_barrier_jif = jiffies;
1372 return conn_send_command(connection, sock, P_BARRIER, sizeof(*p), NULL, 0);
1375 int w_send_write_hint(struct drbd_work *w, int cancel)
1377 struct drbd_device *device =
1378 container_of(w, struct drbd_device, unplug_work);
1379 struct drbd_socket *sock;
1383 sock = &first_peer_device(device)->connection->data;
1384 if (!drbd_prepare_command(first_peer_device(device), sock))
1386 return drbd_send_command(first_peer_device(device), sock, P_UNPLUG_REMOTE, 0, NULL, 0);
1389 static void re_init_if_first_write(struct drbd_connection *connection, unsigned int epoch)
1391 if (!connection->send.seen_any_write_yet) {
1392 connection->send.seen_any_write_yet = true;
1393 connection->send.current_epoch_nr = epoch;
1394 connection->send.current_epoch_writes = 0;
1395 connection->send.last_sent_barrier_jif = jiffies;
1399 static void maybe_send_barrier(struct drbd_connection *connection, unsigned int epoch)
1401 /* re-init if first write on this connection */
1402 if (!connection->send.seen_any_write_yet)
1404 if (connection->send.current_epoch_nr != epoch) {
1405 if (connection->send.current_epoch_writes)
1406 drbd_send_barrier(connection);
1407 connection->send.current_epoch_nr = epoch;
1411 int w_send_out_of_sync(struct drbd_work *w, int cancel)
1413 struct drbd_request *req = container_of(w, struct drbd_request, w);
1414 struct drbd_device *device = req->device;
1415 struct drbd_peer_device *const peer_device = first_peer_device(device);
1416 struct drbd_connection *const connection = peer_device->connection;
1419 if (unlikely(cancel)) {
1420 req_mod(req, SEND_CANCELED);
1423 req->pre_send_jif = jiffies;
1425 /* this time, no connection->send.current_epoch_writes++;
1426 * If it was sent, it was the closing barrier for the last
1427 * replicated epoch, before we went into AHEAD mode.
1428 * No more barriers will be sent, until we leave AHEAD mode again. */
1429 maybe_send_barrier(connection, req->epoch);
1431 err = drbd_send_out_of_sync(peer_device, req);
1432 req_mod(req, OOS_HANDED_TO_NETWORK);
1438 * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1440 * @cancel: The connection will be closed anyways
1442 int w_send_dblock(struct drbd_work *w, int cancel)
1444 struct drbd_request *req = container_of(w, struct drbd_request, w);
1445 struct drbd_device *device = req->device;
1446 struct drbd_peer_device *const peer_device = first_peer_device(device);
1447 struct drbd_connection *connection = peer_device->connection;
1450 if (unlikely(cancel)) {
1451 req_mod(req, SEND_CANCELED);
1454 req->pre_send_jif = jiffies;
1456 re_init_if_first_write(connection, req->epoch);
1457 maybe_send_barrier(connection, req->epoch);
1458 connection->send.current_epoch_writes++;
1460 err = drbd_send_dblock(peer_device, req);
1461 req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1467 * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1469 * @cancel: The connection will be closed anyways
1471 int w_send_read_req(struct drbd_work *w, int cancel)
1473 struct drbd_request *req = container_of(w, struct drbd_request, w);
1474 struct drbd_device *device = req->device;
1475 struct drbd_peer_device *const peer_device = first_peer_device(device);
1476 struct drbd_connection *connection = peer_device->connection;
1479 if (unlikely(cancel)) {
1480 req_mod(req, SEND_CANCELED);
1483 req->pre_send_jif = jiffies;
1485 /* Even read requests may close a write epoch,
1486 * if there was any yet. */
1487 maybe_send_barrier(connection, req->epoch);
1489 err = drbd_send_drequest(peer_device, P_DATA_REQUEST, req->i.sector, req->i.size,
1490 (unsigned long)req);
1492 req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1497 int w_restart_disk_io(struct drbd_work *w, int cancel)
1499 struct drbd_request *req = container_of(w, struct drbd_request, w);
1500 struct drbd_device *device = req->device;
1502 if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1503 drbd_al_begin_io(device, &req->i);
1505 drbd_req_make_private_bio(req, req->master_bio);
1506 req->private_bio->bi_bdev = device->ldev->backing_bdev;
1507 generic_make_request(req->private_bio);
1512 static int _drbd_may_sync_now(struct drbd_device *device)
1514 struct drbd_device *odev = device;
1518 if (!odev->ldev || odev->state.disk == D_DISKLESS)
1521 resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1523 if (resync_after == -1)
1525 odev = minor_to_device(resync_after);
1528 if ((odev->state.conn >= C_SYNC_SOURCE &&
1529 odev->state.conn <= C_PAUSED_SYNC_T) ||
1530 odev->state.aftr_isp || odev->state.peer_isp ||
1531 odev->state.user_isp)
1537 * drbd_pause_after() - Pause resync on all devices that may not resync now
1538 * @device: DRBD device.
1540 * Called from process context only (admin command and after_state_ch).
1542 static bool drbd_pause_after(struct drbd_device *device)
1544 bool changed = false;
1545 struct drbd_device *odev;
1549 idr_for_each_entry(&drbd_devices, odev, i) {
1550 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1552 if (!_drbd_may_sync_now(odev) &&
1553 _drbd_set_state(_NS(odev, aftr_isp, 1),
1554 CS_HARD, NULL) != SS_NOTHING_TO_DO)
1563 * drbd_resume_next() - Resume resync on all devices that may resync now
1564 * @device: DRBD device.
1566 * Called from process context only (admin command and worker).
1568 static bool drbd_resume_next(struct drbd_device *device)
1570 bool changed = false;
1571 struct drbd_device *odev;
1575 idr_for_each_entry(&drbd_devices, odev, i) {
1576 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1578 if (odev->state.aftr_isp) {
1579 if (_drbd_may_sync_now(odev) &&
1580 _drbd_set_state(_NS(odev, aftr_isp, 0),
1581 CS_HARD, NULL) != SS_NOTHING_TO_DO)
1589 void resume_next_sg(struct drbd_device *device)
1591 lock_all_resources();
1592 drbd_resume_next(device);
1593 unlock_all_resources();
1596 void suspend_other_sg(struct drbd_device *device)
1598 lock_all_resources();
1599 drbd_pause_after(device);
1600 unlock_all_resources();
1603 /* caller must lock_all_resources() */
1604 enum drbd_ret_code drbd_resync_after_valid(struct drbd_device *device, int o_minor)
1606 struct drbd_device *odev;
1611 if (o_minor < -1 || o_minor > MINORMASK)
1612 return ERR_RESYNC_AFTER;
1614 /* check for loops */
1615 odev = minor_to_device(o_minor);
1618 return ERR_RESYNC_AFTER_CYCLE;
1620 /* You are free to depend on diskless, non-existing,
1621 * or not yet/no longer existing minors.
1622 * We only reject dependency loops.
1623 * We cannot follow the dependency chain beyond a detached or
1626 if (!odev || !odev->ldev || odev->state.disk == D_DISKLESS)
1630 resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1632 /* dependency chain ends here, no cycles. */
1633 if (resync_after == -1)
1636 /* follow the dependency chain */
1637 odev = minor_to_device(resync_after);
1641 /* caller must lock_all_resources() */
1642 void drbd_resync_after_changed(struct drbd_device *device)
1647 changed = drbd_pause_after(device);
1648 changed |= drbd_resume_next(device);
1652 void drbd_rs_controller_reset(struct drbd_device *device)
1654 struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk;
1655 struct fifo_buffer *plan;
1657 atomic_set(&device->rs_sect_in, 0);
1658 atomic_set(&device->rs_sect_ev, 0);
1659 device->rs_in_flight = 0;
1660 device->rs_last_events =
1661 (int)part_stat_read(&disk->part0, sectors[0]) +
1662 (int)part_stat_read(&disk->part0, sectors[1]);
1664 /* Updating the RCU protected object in place is necessary since
1665 this function gets called from atomic context.
1666 It is valid since all other updates also lead to an completely
1669 plan = rcu_dereference(device->rs_plan_s);
1675 void start_resync_timer_fn(unsigned long data)
1677 struct drbd_device *device = (struct drbd_device *) data;
1678 drbd_device_post_work(device, RS_START);
1681 static void do_start_resync(struct drbd_device *device)
1683 if (atomic_read(&device->unacked_cnt) || atomic_read(&device->rs_pending_cnt)) {
1684 drbd_warn(device, "postponing start_resync ...\n");
1685 device->start_resync_timer.expires = jiffies + HZ/10;
1686 add_timer(&device->start_resync_timer);
1690 drbd_start_resync(device, C_SYNC_SOURCE);
1691 clear_bit(AHEAD_TO_SYNC_SOURCE, &device->flags);
1694 static bool use_checksum_based_resync(struct drbd_connection *connection, struct drbd_device *device)
1696 bool csums_after_crash_only;
1698 csums_after_crash_only = rcu_dereference(connection->net_conf)->csums_after_crash_only;
1700 return connection->agreed_pro_version >= 89 && /* supported? */
1701 connection->csums_tfm && /* configured? */
1702 (csums_after_crash_only == false /* use for each resync? */
1703 || test_bit(CRASHED_PRIMARY, &device->flags)); /* or only after Primary crash? */
1707 * drbd_start_resync() - Start the resync process
1708 * @device: DRBD device.
1709 * @side: Either C_SYNC_SOURCE or C_SYNC_TARGET
1711 * This function might bring you directly into one of the
1712 * C_PAUSED_SYNC_* states.
1714 void drbd_start_resync(struct drbd_device *device, enum drbd_conns side)
1716 struct drbd_peer_device *peer_device = first_peer_device(device);
1717 struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
1718 union drbd_state ns;
1721 if (device->state.conn >= C_SYNC_SOURCE && device->state.conn < C_AHEAD) {
1722 drbd_err(device, "Resync already running!\n");
1726 if (!test_bit(B_RS_H_DONE, &device->flags)) {
1727 if (side == C_SYNC_TARGET) {
1728 /* Since application IO was locked out during C_WF_BITMAP_T and
1729 C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1730 we check that we might make the data inconsistent. */
1731 r = drbd_khelper(device, "before-resync-target");
1732 r = (r >> 8) & 0xff;
1734 drbd_info(device, "before-resync-target handler returned %d, "
1735 "dropping connection.\n", r);
1736 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
1739 } else /* C_SYNC_SOURCE */ {
1740 r = drbd_khelper(device, "before-resync-source");
1741 r = (r >> 8) & 0xff;
1744 drbd_info(device, "before-resync-source handler returned %d, "
1745 "ignoring. Old userland tools?", r);
1747 drbd_info(device, "before-resync-source handler returned %d, "
1748 "dropping connection.\n", r);
1749 conn_request_state(connection,
1750 NS(conn, C_DISCONNECTING), CS_HARD);
1757 if (current == connection->worker.task) {
1758 /* The worker should not sleep waiting for state_mutex,
1759 that can take long */
1760 if (!mutex_trylock(device->state_mutex)) {
1761 set_bit(B_RS_H_DONE, &device->flags);
1762 device->start_resync_timer.expires = jiffies + HZ/5;
1763 add_timer(&device->start_resync_timer);
1767 mutex_lock(device->state_mutex);
1770 lock_all_resources();
1771 clear_bit(B_RS_H_DONE, &device->flags);
1772 /* Did some connection breakage or IO error race with us? */
1773 if (device->state.conn < C_CONNECTED
1774 || !get_ldev_if_state(device, D_NEGOTIATING)) {
1775 unlock_all_resources();
1779 ns = drbd_read_state(device);
1781 ns.aftr_isp = !_drbd_may_sync_now(device);
1785 if (side == C_SYNC_TARGET)
1786 ns.disk = D_INCONSISTENT;
1787 else /* side == C_SYNC_SOURCE */
1788 ns.pdsk = D_INCONSISTENT;
1790 r = _drbd_set_state(device, ns, CS_VERBOSE, NULL);
1791 ns = drbd_read_state(device);
1793 if (ns.conn < C_CONNECTED)
1794 r = SS_UNKNOWN_ERROR;
1796 if (r == SS_SUCCESS) {
1797 unsigned long tw = drbd_bm_total_weight(device);
1798 unsigned long now = jiffies;
1801 device->rs_failed = 0;
1802 device->rs_paused = 0;
1803 device->rs_same_csum = 0;
1804 device->rs_last_sect_ev = 0;
1805 device->rs_total = tw;
1806 device->rs_start = now;
1807 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1808 device->rs_mark_left[i] = tw;
1809 device->rs_mark_time[i] = now;
1811 drbd_pause_after(device);
1812 /* Forget potentially stale cached per resync extent bit-counts.
1813 * Open coded drbd_rs_cancel_all(device), we already have IRQs
1814 * disabled, and know the disk state is ok. */
1815 spin_lock(&device->al_lock);
1816 lc_reset(device->resync);
1817 device->resync_locked = 0;
1818 device->resync_wenr = LC_FREE;
1819 spin_unlock(&device->al_lock);
1821 unlock_all_resources();
1823 if (r == SS_SUCCESS) {
1824 wake_up(&device->al_wait); /* for lc_reset() above */
1825 /* reset rs_last_bcast when a resync or verify is started,
1826 * to deal with potential jiffies wrap. */
1827 device->rs_last_bcast = jiffies - HZ;
1829 drbd_info(device, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1830 drbd_conn_str(ns.conn),
1831 (unsigned long) device->rs_total << (BM_BLOCK_SHIFT-10),
1832 (unsigned long) device->rs_total);
1833 if (side == C_SYNC_TARGET) {
1834 device->bm_resync_fo = 0;
1835 device->use_csums = use_checksum_based_resync(connection, device);
1837 device->use_csums = false;
1840 /* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1841 * with w_send_oos, or the sync target will get confused as to
1842 * how much bits to resync. We cannot do that always, because for an
1843 * empty resync and protocol < 95, we need to do it here, as we call
1844 * drbd_resync_finished from here in that case.
1845 * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1846 * and from after_state_ch otherwise. */
1847 if (side == C_SYNC_SOURCE && connection->agreed_pro_version < 96)
1848 drbd_gen_and_send_sync_uuid(peer_device);
1850 if (connection->agreed_pro_version < 95 && device->rs_total == 0) {
1851 /* This still has a race (about when exactly the peers
1852 * detect connection loss) that can lead to a full sync
1853 * on next handshake. In 8.3.9 we fixed this with explicit
1854 * resync-finished notifications, but the fix
1855 * introduces a protocol change. Sleeping for some
1856 * time longer than the ping interval + timeout on the
1857 * SyncSource, to give the SyncTarget the chance to
1858 * detect connection loss, then waiting for a ping
1859 * response (implicit in drbd_resync_finished) reduces
1860 * the race considerably, but does not solve it. */
1861 if (side == C_SYNC_SOURCE) {
1862 struct net_conf *nc;
1866 nc = rcu_dereference(connection->net_conf);
1867 timeo = nc->ping_int * HZ + nc->ping_timeo * HZ / 9;
1869 schedule_timeout_interruptible(timeo);
1871 drbd_resync_finished(device);
1874 drbd_rs_controller_reset(device);
1875 /* ns.conn may already be != device->state.conn,
1876 * we may have been paused in between, or become paused until
1877 * the timer triggers.
1878 * No matter, that is handled in resync_timer_fn() */
1879 if (ns.conn == C_SYNC_TARGET)
1880 mod_timer(&device->resync_timer, jiffies);
1882 drbd_md_sync(device);
1886 mutex_unlock(device->state_mutex);
1889 static void update_on_disk_bitmap(struct drbd_device *device, bool resync_done)
1891 struct sib_info sib = { .sib_reason = SIB_SYNC_PROGRESS, };
1892 device->rs_last_bcast = jiffies;
1894 if (!get_ldev(device))
1897 drbd_bm_write_lazy(device, 0);
1898 if (resync_done && is_sync_state(device->state.conn))
1899 drbd_resync_finished(device);
1901 drbd_bcast_event(device, &sib);
1902 /* update timestamp, in case it took a while to write out stuff */
1903 device->rs_last_bcast = jiffies;
1907 static void drbd_ldev_destroy(struct drbd_device *device)
1909 lc_destroy(device->resync);
1910 device->resync = NULL;
1911 lc_destroy(device->act_log);
1912 device->act_log = NULL;
1915 drbd_backing_dev_free(device, device->ldev);
1916 device->ldev = NULL;
1919 clear_bit(GOING_DISKLESS, &device->flags);
1920 wake_up(&device->misc_wait);
1923 static void go_diskless(struct drbd_device *device)
1925 D_ASSERT(device, device->state.disk == D_FAILED);
1926 /* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
1927 * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
1928 * the protected members anymore, though, so once put_ldev reaches zero
1929 * again, it will be safe to free them. */
1931 /* Try to write changed bitmap pages, read errors may have just
1932 * set some bits outside the area covered by the activity log.
1934 * If we have an IO error during the bitmap writeout,
1935 * we will want a full sync next time, just in case.
1936 * (Do we want a specific meta data flag for this?)
1938 * If that does not make it to stable storage either,
1939 * we cannot do anything about that anymore.
1941 * We still need to check if both bitmap and ldev are present, we may
1942 * end up here after a failed attach, before ldev was even assigned.
1944 if (device->bitmap && device->ldev) {
1945 /* An interrupted resync or similar is allowed to recounts bits
1947 * Any modifications would not be expected anymore, though.
1949 if (drbd_bitmap_io_from_worker(device, drbd_bm_write,
1950 "detach", BM_LOCKED_TEST_ALLOWED)) {
1951 if (test_bit(WAS_READ_ERROR, &device->flags)) {
1952 drbd_md_set_flag(device, MDF_FULL_SYNC);
1953 drbd_md_sync(device);
1958 drbd_force_state(device, NS(disk, D_DISKLESS));
1961 static int do_md_sync(struct drbd_device *device)
1963 drbd_warn(device, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
1964 drbd_md_sync(device);
1968 /* only called from drbd_worker thread, no locking */
1969 void __update_timing_details(
1970 struct drbd_thread_timing_details *tdp,
1971 unsigned int *cb_nr,
1973 const char *fn, const unsigned int line)
1975 unsigned int i = *cb_nr % DRBD_THREAD_DETAILS_HIST;
1976 struct drbd_thread_timing_details *td = tdp + i;
1978 td->start_jif = jiffies;
1984 i = (i+1) % DRBD_THREAD_DETAILS_HIST;
1986 memset(td, 0, sizeof(*td));
1991 static void do_device_work(struct drbd_device *device, const unsigned long todo)
1993 if (test_bit(MD_SYNC, &todo))
1995 if (test_bit(RS_DONE, &todo) ||
1996 test_bit(RS_PROGRESS, &todo))
1997 update_on_disk_bitmap(device, test_bit(RS_DONE, &todo));
1998 if (test_bit(GO_DISKLESS, &todo))
1999 go_diskless(device);
2000 if (test_bit(DESTROY_DISK, &todo))
2001 drbd_ldev_destroy(device);
2002 if (test_bit(RS_START, &todo))
2003 do_start_resync(device);
2006 #define DRBD_DEVICE_WORK_MASK \
2007 ((1UL << GO_DISKLESS) \
2008 |(1UL << DESTROY_DISK) \
2010 |(1UL << RS_START) \
2011 |(1UL << RS_PROGRESS) \
2015 static unsigned long get_work_bits(unsigned long *flags)
2017 unsigned long old, new;
2020 new = old & ~DRBD_DEVICE_WORK_MASK;
2021 } while (cmpxchg(flags, old, new) != old);
2022 return old & DRBD_DEVICE_WORK_MASK;
2025 static void do_unqueued_work(struct drbd_connection *connection)
2027 struct drbd_peer_device *peer_device;
2031 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
2032 struct drbd_device *device = peer_device->device;
2033 unsigned long todo = get_work_bits(&device->flags);
2037 kref_get(&device->kref);
2039 do_device_work(device, todo);
2040 kref_put(&device->kref, drbd_destroy_device);
2046 static bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list)
2048 spin_lock_irq(&queue->q_lock);
2049 list_splice_tail_init(&queue->q, work_list);
2050 spin_unlock_irq(&queue->q_lock);
2051 return !list_empty(work_list);
2054 static void wait_for_work(struct drbd_connection *connection, struct list_head *work_list)
2057 struct net_conf *nc;
2060 dequeue_work_batch(&connection->sender_work, work_list);
2061 if (!list_empty(work_list))
2064 /* Still nothing to do?
2065 * Maybe we still need to close the current epoch,
2066 * even if no new requests are queued yet.
2068 * Also, poke TCP, just in case.
2069 * Then wait for new work (or signal). */
2071 nc = rcu_dereference(connection->net_conf);
2072 uncork = nc ? nc->tcp_cork : 0;
2075 mutex_lock(&connection->data.mutex);
2076 if (connection->data.socket)
2077 drbd_tcp_uncork(connection->data.socket);
2078 mutex_unlock(&connection->data.mutex);
2083 prepare_to_wait(&connection->sender_work.q_wait, &wait, TASK_INTERRUPTIBLE);
2084 spin_lock_irq(&connection->resource->req_lock);
2085 spin_lock(&connection->sender_work.q_lock); /* FIXME get rid of this one? */
2086 if (!list_empty(&connection->sender_work.q))
2087 list_splice_tail_init(&connection->sender_work.q, work_list);
2088 spin_unlock(&connection->sender_work.q_lock); /* FIXME get rid of this one? */
2089 if (!list_empty(work_list) || signal_pending(current)) {
2090 spin_unlock_irq(&connection->resource->req_lock);
2094 /* We found nothing new to do, no to-be-communicated request,
2095 * no other work item. We may still need to close the last
2096 * epoch. Next incoming request epoch will be connection ->
2097 * current transfer log epoch number. If that is different
2098 * from the epoch of the last request we communicated, it is
2099 * safe to send the epoch separating barrier now.
2102 atomic_read(&connection->current_tle_nr) !=
2103 connection->send.current_epoch_nr;
2104 spin_unlock_irq(&connection->resource->req_lock);
2107 maybe_send_barrier(connection,
2108 connection->send.current_epoch_nr + 1);
2110 if (test_bit(DEVICE_WORK_PENDING, &connection->flags))
2113 /* drbd_send() may have called flush_signals() */
2114 if (get_t_state(&connection->worker) != RUNNING)
2118 /* may be woken up for other things but new work, too,
2119 * e.g. if the current epoch got closed.
2120 * In which case we send the barrier above. */
2122 finish_wait(&connection->sender_work.q_wait, &wait);
2124 /* someone may have changed the config while we have been waiting above. */
2126 nc = rcu_dereference(connection->net_conf);
2127 cork = nc ? nc->tcp_cork : 0;
2129 mutex_lock(&connection->data.mutex);
2130 if (connection->data.socket) {
2132 drbd_tcp_cork(connection->data.socket);
2134 drbd_tcp_uncork(connection->data.socket);
2136 mutex_unlock(&connection->data.mutex);
2139 int drbd_worker(struct drbd_thread *thi)
2141 struct drbd_connection *connection = thi->connection;
2142 struct drbd_work *w = NULL;
2143 struct drbd_peer_device *peer_device;
2144 LIST_HEAD(work_list);
2147 while (get_t_state(thi) == RUNNING) {
2148 drbd_thread_current_set_cpu(thi);
2150 if (list_empty(&work_list)) {
2151 update_worker_timing_details(connection, wait_for_work);
2152 wait_for_work(connection, &work_list);
2155 if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2156 update_worker_timing_details(connection, do_unqueued_work);
2157 do_unqueued_work(connection);
2160 if (signal_pending(current)) {
2161 flush_signals(current);
2162 if (get_t_state(thi) == RUNNING) {
2163 drbd_warn(connection, "Worker got an unexpected signal\n");
2169 if (get_t_state(thi) != RUNNING)
2172 if (!list_empty(&work_list)) {
2173 w = list_first_entry(&work_list, struct drbd_work, list);
2174 list_del_init(&w->list);
2175 update_worker_timing_details(connection, w->cb);
2176 if (w->cb(w, connection->cstate < C_WF_REPORT_PARAMS) == 0)
2178 if (connection->cstate >= C_WF_REPORT_PARAMS)
2179 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
2184 if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2185 update_worker_timing_details(connection, do_unqueued_work);
2186 do_unqueued_work(connection);
2188 if (!list_empty(&work_list)) {
2189 w = list_first_entry(&work_list, struct drbd_work, list);
2190 list_del_init(&w->list);
2191 update_worker_timing_details(connection, w->cb);
2194 dequeue_work_batch(&connection->sender_work, &work_list);
2195 } while (!list_empty(&work_list) || test_bit(DEVICE_WORK_PENDING, &connection->flags));
2198 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
2199 struct drbd_device *device = peer_device->device;
2200 D_ASSERT(device, device->state.disk == D_DISKLESS && device->state.conn == C_STANDALONE);
2201 kref_get(&device->kref);
2203 drbd_device_cleanup(device);
2204 kref_put(&device->kref, drbd_destroy_device);