b3fa5575bc0e64ed4dd6599c5e7bb2f3c5632988
[cascardo/linux.git] / drivers / block / drbd / drbd_worker.c
1 /*
2    drbd_worker.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23
24 */
25
26 #include <linux/module.h>
27 #include <linux/drbd.h>
28 #include <linux/sched.h>
29 #include <linux/wait.h>
30 #include <linux/mm.h>
31 #include <linux/memcontrol.h>
32 #include <linux/mm_inline.h>
33 #include <linux/slab.h>
34 #include <linux/random.h>
35 #include <linux/string.h>
36 #include <linux/scatterlist.h>
37
38 #include "drbd_int.h"
39 #include "drbd_protocol.h"
40 #include "drbd_req.h"
41
42 static int make_ov_request(struct drbd_device *, int);
43 static int make_resync_request(struct drbd_device *, int);
44
45 /* endio handlers:
46  *   drbd_md_endio (defined here)
47  *   drbd_request_endio (defined here)
48  *   drbd_peer_request_endio (defined here)
49  *   drbd_bm_endio (defined in drbd_bitmap.c)
50  *
51  * For all these callbacks, note the following:
52  * The callbacks will be called in irq context by the IDE drivers,
53  * and in Softirqs/Tasklets/BH context by the SCSI drivers.
54  * Try to get the locking right :)
55  *
56  */
57
58 /* used for synchronous meta data and bitmap IO
59  * submitted by drbd_md_sync_page_io()
60  */
61 void drbd_md_endio(struct bio *bio)
62 {
63         struct drbd_device *device;
64
65         device = bio->bi_private;
66         device->md_io.error = bio->bi_error;
67
68         /* We grabbed an extra reference in _drbd_md_sync_page_io() to be able
69          * to timeout on the lower level device, and eventually detach from it.
70          * If this io completion runs after that timeout expired, this
71          * drbd_md_put_buffer() may allow us to finally try and re-attach.
72          * During normal operation, this only puts that extra reference
73          * down to 1 again.
74          * Make sure we first drop the reference, and only then signal
75          * completion, or we may (in drbd_al_read_log()) cycle so fast into the
76          * next drbd_md_sync_page_io(), that we trigger the
77          * ASSERT(atomic_read(&device->md_io_in_use) == 1) there.
78          */
79         drbd_md_put_buffer(device);
80         device->md_io.done = 1;
81         wake_up(&device->misc_wait);
82         bio_put(bio);
83         if (device->ldev) /* special case: drbd_md_read() during drbd_adm_attach() */
84                 put_ldev(device);
85 }
86
87 /* reads on behalf of the partner,
88  * "submitted" by the receiver
89  */
90 static void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local)
91 {
92         unsigned long flags = 0;
93         struct drbd_peer_device *peer_device = peer_req->peer_device;
94         struct drbd_device *device = peer_device->device;
95
96         spin_lock_irqsave(&device->resource->req_lock, flags);
97         device->read_cnt += peer_req->i.size >> 9;
98         list_del(&peer_req->w.list);
99         if (list_empty(&device->read_ee))
100                 wake_up(&device->ee_wait);
101         if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
102                 __drbd_chk_io_error(device, DRBD_READ_ERROR);
103         spin_unlock_irqrestore(&device->resource->req_lock, flags);
104
105         drbd_queue_work(&peer_device->connection->sender_work, &peer_req->w);
106         put_ldev(device);
107 }
108
109 /* writes on behalf of the partner, or resync writes,
110  * "submitted" by the receiver, final stage.  */
111 void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local)
112 {
113         unsigned long flags = 0;
114         struct drbd_peer_device *peer_device = peer_req->peer_device;
115         struct drbd_device *device = peer_device->device;
116         struct drbd_connection *connection = peer_device->connection;
117         struct drbd_interval i;
118         int do_wake;
119         u64 block_id;
120         int do_al_complete_io;
121
122         /* after we moved peer_req to done_ee,
123          * we may no longer access it,
124          * it may be freed/reused already!
125          * (as soon as we release the req_lock) */
126         i = peer_req->i;
127         do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO;
128         block_id = peer_req->block_id;
129         peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
130
131         spin_lock_irqsave(&device->resource->req_lock, flags);
132         device->writ_cnt += peer_req->i.size >> 9;
133         list_move_tail(&peer_req->w.list, &device->done_ee);
134
135         /*
136          * Do not remove from the write_requests tree here: we did not send the
137          * Ack yet and did not wake possibly waiting conflicting requests.
138          * Removed from the tree from "drbd_process_done_ee" within the
139          * appropriate dw.cb (e_end_block/e_end_resync_block) or from
140          * _drbd_clear_done_ee.
141          */
142
143         do_wake = list_empty(block_id == ID_SYNCER ? &device->sync_ee : &device->active_ee);
144
145         /* FIXME do we want to detach for failed REQ_DISCARD?
146          * ((peer_req->flags & (EE_WAS_ERROR|EE_IS_TRIM)) == EE_WAS_ERROR) */
147         if (peer_req->flags & EE_WAS_ERROR)
148                 __drbd_chk_io_error(device, DRBD_WRITE_ERROR);
149
150         if (connection->cstate >= C_WF_REPORT_PARAMS) {
151                 kref_get(&device->kref); /* put is in drbd_send_acks_wf() */
152                 if (!queue_work(connection->ack_sender, &peer_device->send_acks_work))
153                         kref_put(&device->kref, drbd_destroy_device);
154         }
155         spin_unlock_irqrestore(&device->resource->req_lock, flags);
156
157         if (block_id == ID_SYNCER)
158                 drbd_rs_complete_io(device, i.sector);
159
160         if (do_wake)
161                 wake_up(&device->ee_wait);
162
163         if (do_al_complete_io)
164                 drbd_al_complete_io(device, &i);
165
166         put_ldev(device);
167 }
168
169 /* writes on behalf of the partner, or resync writes,
170  * "submitted" by the receiver.
171  */
172 void drbd_peer_request_endio(struct bio *bio)
173 {
174         struct drbd_peer_request *peer_req = bio->bi_private;
175         struct drbd_device *device = peer_req->peer_device->device;
176         bool is_write = bio_data_dir(bio) == WRITE;
177         bool is_discard = !!(bio_op(bio) == REQ_OP_DISCARD);
178
179         if (bio->bi_error && __ratelimit(&drbd_ratelimit_state))
180                 drbd_warn(device, "%s: error=%d s=%llus\n",
181                                 is_write ? (is_discard ? "discard" : "write")
182                                         : "read", bio->bi_error,
183                                 (unsigned long long)peer_req->i.sector);
184
185         if (bio->bi_error)
186                 set_bit(__EE_WAS_ERROR, &peer_req->flags);
187
188         bio_put(bio); /* no need for the bio anymore */
189         if (atomic_dec_and_test(&peer_req->pending_bios)) {
190                 if (is_write)
191                         drbd_endio_write_sec_final(peer_req);
192                 else
193                         drbd_endio_read_sec_final(peer_req);
194         }
195 }
196
197 void drbd_panic_after_delayed_completion_of_aborted_request(struct drbd_device *device)
198 {
199         panic("drbd%u %s/%u potential random memory corruption caused by delayed completion of aborted local request\n",
200                 device->minor, device->resource->name, device->vnr);
201 }
202
203 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
204  */
205 void drbd_request_endio(struct bio *bio)
206 {
207         unsigned long flags;
208         struct drbd_request *req = bio->bi_private;
209         struct drbd_device *device = req->device;
210         struct bio_and_error m;
211         enum drbd_req_event what;
212
213         /* If this request was aborted locally before,
214          * but now was completed "successfully",
215          * chances are that this caused arbitrary data corruption.
216          *
217          * "aborting" requests, or force-detaching the disk, is intended for
218          * completely blocked/hung local backing devices which do no longer
219          * complete requests at all, not even do error completions.  In this
220          * situation, usually a hard-reset and failover is the only way out.
221          *
222          * By "aborting", basically faking a local error-completion,
223          * we allow for a more graceful swichover by cleanly migrating services.
224          * Still the affected node has to be rebooted "soon".
225          *
226          * By completing these requests, we allow the upper layers to re-use
227          * the associated data pages.
228          *
229          * If later the local backing device "recovers", and now DMAs some data
230          * from disk into the original request pages, in the best case it will
231          * just put random data into unused pages; but typically it will corrupt
232          * meanwhile completely unrelated data, causing all sorts of damage.
233          *
234          * Which means delayed successful completion,
235          * especially for READ requests,
236          * is a reason to panic().
237          *
238          * We assume that a delayed *error* completion is OK,
239          * though we still will complain noisily about it.
240          */
241         if (unlikely(req->rq_state & RQ_LOCAL_ABORTED)) {
242                 if (__ratelimit(&drbd_ratelimit_state))
243                         drbd_emerg(device, "delayed completion of aborted local request; disk-timeout may be too aggressive\n");
244
245                 if (!bio->bi_error)
246                         drbd_panic_after_delayed_completion_of_aborted_request(device);
247         }
248
249         /* to avoid recursion in __req_mod */
250         if (unlikely(bio->bi_error)) {
251                 if (bio_op(bio) == REQ_OP_DISCARD)
252                         what = (bio->bi_error == -EOPNOTSUPP)
253                                 ? DISCARD_COMPLETED_NOTSUPP
254                                 : DISCARD_COMPLETED_WITH_ERROR;
255                 else
256                         what = (bio_data_dir(bio) == WRITE)
257                         ? WRITE_COMPLETED_WITH_ERROR
258                         : (bio_rw(bio) == READ)
259                           ? READ_COMPLETED_WITH_ERROR
260                           : READ_AHEAD_COMPLETED_WITH_ERROR;
261         } else
262                 what = COMPLETED_OK;
263
264         bio_put(req->private_bio);
265         req->private_bio = ERR_PTR(bio->bi_error);
266
267         /* not req_mod(), we need irqsave here! */
268         spin_lock_irqsave(&device->resource->req_lock, flags);
269         __req_mod(req, what, &m);
270         spin_unlock_irqrestore(&device->resource->req_lock, flags);
271         put_ldev(device);
272
273         if (m.bio)
274                 complete_master_bio(device, &m);
275 }
276
277 void drbd_csum_ee(struct crypto_ahash *tfm, struct drbd_peer_request *peer_req, void *digest)
278 {
279         AHASH_REQUEST_ON_STACK(req, tfm);
280         struct scatterlist sg;
281         struct page *page = peer_req->pages;
282         struct page *tmp;
283         unsigned len;
284
285         ahash_request_set_tfm(req, tfm);
286         ahash_request_set_callback(req, 0, NULL, NULL);
287
288         sg_init_table(&sg, 1);
289         crypto_ahash_init(req);
290
291         while ((tmp = page_chain_next(page))) {
292                 /* all but the last page will be fully used */
293                 sg_set_page(&sg, page, PAGE_SIZE, 0);
294                 ahash_request_set_crypt(req, &sg, NULL, sg.length);
295                 crypto_ahash_update(req);
296                 page = tmp;
297         }
298         /* and now the last, possibly only partially used page */
299         len = peer_req->i.size & (PAGE_SIZE - 1);
300         sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
301         ahash_request_set_crypt(req, &sg, digest, sg.length);
302         crypto_ahash_finup(req);
303         ahash_request_zero(req);
304 }
305
306 void drbd_csum_bio(struct crypto_ahash *tfm, struct bio *bio, void *digest)
307 {
308         AHASH_REQUEST_ON_STACK(req, tfm);
309         struct scatterlist sg;
310         struct bio_vec bvec;
311         struct bvec_iter iter;
312
313         ahash_request_set_tfm(req, tfm);
314         ahash_request_set_callback(req, 0, NULL, NULL);
315
316         sg_init_table(&sg, 1);
317         crypto_ahash_init(req);
318
319         bio_for_each_segment(bvec, bio, iter) {
320                 sg_set_page(&sg, bvec.bv_page, bvec.bv_len, bvec.bv_offset);
321                 ahash_request_set_crypt(req, &sg, NULL, sg.length);
322                 crypto_ahash_update(req);
323                 /* REQ_OP_WRITE_SAME has only one segment,
324                  * checksum the payload only once. */
325                 if (bio_op(bio) == REQ_OP_WRITE_SAME)
326                         break;
327         }
328         ahash_request_set_crypt(req, NULL, digest, 0);
329         crypto_ahash_final(req);
330         ahash_request_zero(req);
331 }
332
333 /* MAYBE merge common code with w_e_end_ov_req */
334 static int w_e_send_csum(struct drbd_work *w, int cancel)
335 {
336         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
337         struct drbd_peer_device *peer_device = peer_req->peer_device;
338         struct drbd_device *device = peer_device->device;
339         int digest_size;
340         void *digest;
341         int err = 0;
342
343         if (unlikely(cancel))
344                 goto out;
345
346         if (unlikely((peer_req->flags & EE_WAS_ERROR) != 0))
347                 goto out;
348
349         digest_size = crypto_ahash_digestsize(peer_device->connection->csums_tfm);
350         digest = kmalloc(digest_size, GFP_NOIO);
351         if (digest) {
352                 sector_t sector = peer_req->i.sector;
353                 unsigned int size = peer_req->i.size;
354                 drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
355                 /* Free peer_req and pages before send.
356                  * In case we block on congestion, we could otherwise run into
357                  * some distributed deadlock, if the other side blocks on
358                  * congestion as well, because our receiver blocks in
359                  * drbd_alloc_pages due to pp_in_use > max_buffers. */
360                 drbd_free_peer_req(device, peer_req);
361                 peer_req = NULL;
362                 inc_rs_pending(device);
363                 err = drbd_send_drequest_csum(peer_device, sector, size,
364                                               digest, digest_size,
365                                               P_CSUM_RS_REQUEST);
366                 kfree(digest);
367         } else {
368                 drbd_err(device, "kmalloc() of digest failed.\n");
369                 err = -ENOMEM;
370         }
371
372 out:
373         if (peer_req)
374                 drbd_free_peer_req(device, peer_req);
375
376         if (unlikely(err))
377                 drbd_err(device, "drbd_send_drequest(..., csum) failed\n");
378         return err;
379 }
380
381 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
382
383 static int read_for_csum(struct drbd_peer_device *peer_device, sector_t sector, int size)
384 {
385         struct drbd_device *device = peer_device->device;
386         struct drbd_peer_request *peer_req;
387
388         if (!get_ldev(device))
389                 return -EIO;
390
391         /* GFP_TRY, because if there is no memory available right now, this may
392          * be rescheduled for later. It is "only" background resync, after all. */
393         peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER /* unused */, sector,
394                                        size, size, GFP_TRY);
395         if (!peer_req)
396                 goto defer;
397
398         peer_req->w.cb = w_e_send_csum;
399         spin_lock_irq(&device->resource->req_lock);
400         list_add_tail(&peer_req->w.list, &device->read_ee);
401         spin_unlock_irq(&device->resource->req_lock);
402
403         atomic_add(size >> 9, &device->rs_sect_ev);
404         if (drbd_submit_peer_request(device, peer_req, REQ_OP_READ, 0,
405                                      DRBD_FAULT_RS_RD) == 0)
406                 return 0;
407
408         /* If it failed because of ENOMEM, retry should help.  If it failed
409          * because bio_add_page failed (probably broken lower level driver),
410          * retry may or may not help.
411          * If it does not, you may need to force disconnect. */
412         spin_lock_irq(&device->resource->req_lock);
413         list_del(&peer_req->w.list);
414         spin_unlock_irq(&device->resource->req_lock);
415
416         drbd_free_peer_req(device, peer_req);
417 defer:
418         put_ldev(device);
419         return -EAGAIN;
420 }
421
422 int w_resync_timer(struct drbd_work *w, int cancel)
423 {
424         struct drbd_device *device =
425                 container_of(w, struct drbd_device, resync_work);
426
427         switch (device->state.conn) {
428         case C_VERIFY_S:
429                 make_ov_request(device, cancel);
430                 break;
431         case C_SYNC_TARGET:
432                 make_resync_request(device, cancel);
433                 break;
434         }
435
436         return 0;
437 }
438
439 void resync_timer_fn(unsigned long data)
440 {
441         struct drbd_device *device = (struct drbd_device *) data;
442
443         drbd_queue_work_if_unqueued(
444                 &first_peer_device(device)->connection->sender_work,
445                 &device->resync_work);
446 }
447
448 static void fifo_set(struct fifo_buffer *fb, int value)
449 {
450         int i;
451
452         for (i = 0; i < fb->size; i++)
453                 fb->values[i] = value;
454 }
455
456 static int fifo_push(struct fifo_buffer *fb, int value)
457 {
458         int ov;
459
460         ov = fb->values[fb->head_index];
461         fb->values[fb->head_index++] = value;
462
463         if (fb->head_index >= fb->size)
464                 fb->head_index = 0;
465
466         return ov;
467 }
468
469 static void fifo_add_val(struct fifo_buffer *fb, int value)
470 {
471         int i;
472
473         for (i = 0; i < fb->size; i++)
474                 fb->values[i] += value;
475 }
476
477 struct fifo_buffer *fifo_alloc(int fifo_size)
478 {
479         struct fifo_buffer *fb;
480
481         fb = kzalloc(sizeof(struct fifo_buffer) + sizeof(int) * fifo_size, GFP_NOIO);
482         if (!fb)
483                 return NULL;
484
485         fb->head_index = 0;
486         fb->size = fifo_size;
487         fb->total = 0;
488
489         return fb;
490 }
491
492 static int drbd_rs_controller(struct drbd_device *device, unsigned int sect_in)
493 {
494         struct disk_conf *dc;
495         unsigned int want;     /* The number of sectors we want in-flight */
496         int req_sect; /* Number of sectors to request in this turn */
497         int correction; /* Number of sectors more we need in-flight */
498         int cps; /* correction per invocation of drbd_rs_controller() */
499         int steps; /* Number of time steps to plan ahead */
500         int curr_corr;
501         int max_sect;
502         struct fifo_buffer *plan;
503
504         dc = rcu_dereference(device->ldev->disk_conf);
505         plan = rcu_dereference(device->rs_plan_s);
506
507         steps = plan->size; /* (dc->c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
508
509         if (device->rs_in_flight + sect_in == 0) { /* At start of resync */
510                 want = ((dc->resync_rate * 2 * SLEEP_TIME) / HZ) * steps;
511         } else { /* normal path */
512                 want = dc->c_fill_target ? dc->c_fill_target :
513                         sect_in * dc->c_delay_target * HZ / (SLEEP_TIME * 10);
514         }
515
516         correction = want - device->rs_in_flight - plan->total;
517
518         /* Plan ahead */
519         cps = correction / steps;
520         fifo_add_val(plan, cps);
521         plan->total += cps * steps;
522
523         /* What we do in this step */
524         curr_corr = fifo_push(plan, 0);
525         plan->total -= curr_corr;
526
527         req_sect = sect_in + curr_corr;
528         if (req_sect < 0)
529                 req_sect = 0;
530
531         max_sect = (dc->c_max_rate * 2 * SLEEP_TIME) / HZ;
532         if (req_sect > max_sect)
533                 req_sect = max_sect;
534
535         /*
536         drbd_warn(device, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
537                  sect_in, device->rs_in_flight, want, correction,
538                  steps, cps, device->rs_planed, curr_corr, req_sect);
539         */
540
541         return req_sect;
542 }
543
544 static int drbd_rs_number_requests(struct drbd_device *device)
545 {
546         unsigned int sect_in;  /* Number of sectors that came in since the last turn */
547         int number, mxb;
548
549         sect_in = atomic_xchg(&device->rs_sect_in, 0);
550         device->rs_in_flight -= sect_in;
551
552         rcu_read_lock();
553         mxb = drbd_get_max_buffers(device) / 2;
554         if (rcu_dereference(device->rs_plan_s)->size) {
555                 number = drbd_rs_controller(device, sect_in) >> (BM_BLOCK_SHIFT - 9);
556                 device->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
557         } else {
558                 device->c_sync_rate = rcu_dereference(device->ldev->disk_conf)->resync_rate;
559                 number = SLEEP_TIME * device->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
560         }
561         rcu_read_unlock();
562
563         /* Don't have more than "max-buffers"/2 in-flight.
564          * Otherwise we may cause the remote site to stall on drbd_alloc_pages(),
565          * potentially causing a distributed deadlock on congestion during
566          * online-verify or (checksum-based) resync, if max-buffers,
567          * socket buffer sizes and resync rate settings are mis-configured. */
568
569         /* note that "number" is in units of "BM_BLOCK_SIZE" (which is 4k),
570          * mxb (as used here, and in drbd_alloc_pages on the peer) is
571          * "number of pages" (typically also 4k),
572          * but "rs_in_flight" is in "sectors" (512 Byte). */
573         if (mxb - device->rs_in_flight/8 < number)
574                 number = mxb - device->rs_in_flight/8;
575
576         return number;
577 }
578
579 static int make_resync_request(struct drbd_device *const device, int cancel)
580 {
581         struct drbd_peer_device *const peer_device = first_peer_device(device);
582         struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
583         unsigned long bit;
584         sector_t sector;
585         const sector_t capacity = drbd_get_capacity(device->this_bdev);
586         int max_bio_size;
587         int number, rollback_i, size;
588         int align, requeue = 0;
589         int i = 0;
590         int discard_granularity = 0;
591
592         if (unlikely(cancel))
593                 return 0;
594
595         if (device->rs_total == 0) {
596                 /* empty resync? */
597                 drbd_resync_finished(device);
598                 return 0;
599         }
600
601         if (!get_ldev(device)) {
602                 /* Since we only need to access device->rsync a
603                    get_ldev_if_state(device,D_FAILED) would be sufficient, but
604                    to continue resync with a broken disk makes no sense at
605                    all */
606                 drbd_err(device, "Disk broke down during resync!\n");
607                 return 0;
608         }
609
610         if (connection->agreed_features & DRBD_FF_THIN_RESYNC) {
611                 rcu_read_lock();
612                 discard_granularity = rcu_dereference(device->ldev->disk_conf)->rs_discard_granularity;
613                 rcu_read_unlock();
614         }
615
616         max_bio_size = queue_max_hw_sectors(device->rq_queue) << 9;
617         number = drbd_rs_number_requests(device);
618         if (number <= 0)
619                 goto requeue;
620
621         for (i = 0; i < number; i++) {
622                 /* Stop generating RS requests when half of the send buffer is filled,
623                  * but notify TCP that we'd like to have more space. */
624                 mutex_lock(&connection->data.mutex);
625                 if (connection->data.socket) {
626                         struct sock *sk = connection->data.socket->sk;
627                         int queued = sk->sk_wmem_queued;
628                         int sndbuf = sk->sk_sndbuf;
629                         if (queued > sndbuf / 2) {
630                                 requeue = 1;
631                                 if (sk->sk_socket)
632                                         set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
633                         }
634                 } else
635                         requeue = 1;
636                 mutex_unlock(&connection->data.mutex);
637                 if (requeue)
638                         goto requeue;
639
640 next_sector:
641                 size = BM_BLOCK_SIZE;
642                 bit  = drbd_bm_find_next(device, device->bm_resync_fo);
643
644                 if (bit == DRBD_END_OF_BITMAP) {
645                         device->bm_resync_fo = drbd_bm_bits(device);
646                         put_ldev(device);
647                         return 0;
648                 }
649
650                 sector = BM_BIT_TO_SECT(bit);
651
652                 if (drbd_try_rs_begin_io(device, sector)) {
653                         device->bm_resync_fo = bit;
654                         goto requeue;
655                 }
656                 device->bm_resync_fo = bit + 1;
657
658                 if (unlikely(drbd_bm_test_bit(device, bit) == 0)) {
659                         drbd_rs_complete_io(device, sector);
660                         goto next_sector;
661                 }
662
663 #if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
664                 /* try to find some adjacent bits.
665                  * we stop if we have already the maximum req size.
666                  *
667                  * Additionally always align bigger requests, in order to
668                  * be prepared for all stripe sizes of software RAIDs.
669                  */
670                 align = 1;
671                 rollback_i = i;
672                 while (i < number) {
673                         if (size + BM_BLOCK_SIZE > max_bio_size)
674                                 break;
675
676                         /* Be always aligned */
677                         if (sector & ((1<<(align+3))-1))
678                                 break;
679
680                         if (discard_granularity && size == discard_granularity)
681                                 break;
682
683                         /* do not cross extent boundaries */
684                         if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
685                                 break;
686                         /* now, is it actually dirty, after all?
687                          * caution, drbd_bm_test_bit is tri-state for some
688                          * obscure reason; ( b == 0 ) would get the out-of-band
689                          * only accidentally right because of the "oddly sized"
690                          * adjustment below */
691                         if (drbd_bm_test_bit(device, bit+1) != 1)
692                                 break;
693                         bit++;
694                         size += BM_BLOCK_SIZE;
695                         if ((BM_BLOCK_SIZE << align) <= size)
696                                 align++;
697                         i++;
698                 }
699                 /* if we merged some,
700                  * reset the offset to start the next drbd_bm_find_next from */
701                 if (size > BM_BLOCK_SIZE)
702                         device->bm_resync_fo = bit + 1;
703 #endif
704
705                 /* adjust very last sectors, in case we are oddly sized */
706                 if (sector + (size>>9) > capacity)
707                         size = (capacity-sector)<<9;
708
709                 if (device->use_csums) {
710                         switch (read_for_csum(peer_device, sector, size)) {
711                         case -EIO: /* Disk failure */
712                                 put_ldev(device);
713                                 return -EIO;
714                         case -EAGAIN: /* allocation failed, or ldev busy */
715                                 drbd_rs_complete_io(device, sector);
716                                 device->bm_resync_fo = BM_SECT_TO_BIT(sector);
717                                 i = rollback_i;
718                                 goto requeue;
719                         case 0:
720                                 /* everything ok */
721                                 break;
722                         default:
723                                 BUG();
724                         }
725                 } else {
726                         int err;
727
728                         inc_rs_pending(device);
729                         err = drbd_send_drequest(peer_device,
730                                                  size == discard_granularity ? P_RS_THIN_REQ : P_RS_DATA_REQUEST,
731                                                  sector, size, ID_SYNCER);
732                         if (err) {
733                                 drbd_err(device, "drbd_send_drequest() failed, aborting...\n");
734                                 dec_rs_pending(device);
735                                 put_ldev(device);
736                                 return err;
737                         }
738                 }
739         }
740
741         if (device->bm_resync_fo >= drbd_bm_bits(device)) {
742                 /* last syncer _request_ was sent,
743                  * but the P_RS_DATA_REPLY not yet received.  sync will end (and
744                  * next sync group will resume), as soon as we receive the last
745                  * resync data block, and the last bit is cleared.
746                  * until then resync "work" is "inactive" ...
747                  */
748                 put_ldev(device);
749                 return 0;
750         }
751
752  requeue:
753         device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
754         mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
755         put_ldev(device);
756         return 0;
757 }
758
759 static int make_ov_request(struct drbd_device *device, int cancel)
760 {
761         int number, i, size;
762         sector_t sector;
763         const sector_t capacity = drbd_get_capacity(device->this_bdev);
764         bool stop_sector_reached = false;
765
766         if (unlikely(cancel))
767                 return 1;
768
769         number = drbd_rs_number_requests(device);
770
771         sector = device->ov_position;
772         for (i = 0; i < number; i++) {
773                 if (sector >= capacity)
774                         return 1;
775
776                 /* We check for "finished" only in the reply path:
777                  * w_e_end_ov_reply().
778                  * We need to send at least one request out. */
779                 stop_sector_reached = i > 0
780                         && verify_can_do_stop_sector(device)
781                         && sector >= device->ov_stop_sector;
782                 if (stop_sector_reached)
783                         break;
784
785                 size = BM_BLOCK_SIZE;
786
787                 if (drbd_try_rs_begin_io(device, sector)) {
788                         device->ov_position = sector;
789                         goto requeue;
790                 }
791
792                 if (sector + (size>>9) > capacity)
793                         size = (capacity-sector)<<9;
794
795                 inc_rs_pending(device);
796                 if (drbd_send_ov_request(first_peer_device(device), sector, size)) {
797                         dec_rs_pending(device);
798                         return 0;
799                 }
800                 sector += BM_SECT_PER_BIT;
801         }
802         device->ov_position = sector;
803
804  requeue:
805         device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
806         if (i == 0 || !stop_sector_reached)
807                 mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
808         return 1;
809 }
810
811 int w_ov_finished(struct drbd_work *w, int cancel)
812 {
813         struct drbd_device_work *dw =
814                 container_of(w, struct drbd_device_work, w);
815         struct drbd_device *device = dw->device;
816         kfree(dw);
817         ov_out_of_sync_print(device);
818         drbd_resync_finished(device);
819
820         return 0;
821 }
822
823 static int w_resync_finished(struct drbd_work *w, int cancel)
824 {
825         struct drbd_device_work *dw =
826                 container_of(w, struct drbd_device_work, w);
827         struct drbd_device *device = dw->device;
828         kfree(dw);
829
830         drbd_resync_finished(device);
831
832         return 0;
833 }
834
835 static void ping_peer(struct drbd_device *device)
836 {
837         struct drbd_connection *connection = first_peer_device(device)->connection;
838
839         clear_bit(GOT_PING_ACK, &connection->flags);
840         request_ping(connection);
841         wait_event(connection->ping_wait,
842                    test_bit(GOT_PING_ACK, &connection->flags) || device->state.conn < C_CONNECTED);
843 }
844
845 int drbd_resync_finished(struct drbd_device *device)
846 {
847         struct drbd_connection *connection = first_peer_device(device)->connection;
848         unsigned long db, dt, dbdt;
849         unsigned long n_oos;
850         union drbd_state os, ns;
851         struct drbd_device_work *dw;
852         char *khelper_cmd = NULL;
853         int verify_done = 0;
854
855         /* Remove all elements from the resync LRU. Since future actions
856          * might set bits in the (main) bitmap, then the entries in the
857          * resync LRU would be wrong. */
858         if (drbd_rs_del_all(device)) {
859                 /* In case this is not possible now, most probably because
860                  * there are P_RS_DATA_REPLY Packets lingering on the worker's
861                  * queue (or even the read operations for those packets
862                  * is not finished by now).   Retry in 100ms. */
863
864                 schedule_timeout_interruptible(HZ / 10);
865                 dw = kmalloc(sizeof(struct drbd_device_work), GFP_ATOMIC);
866                 if (dw) {
867                         dw->w.cb = w_resync_finished;
868                         dw->device = device;
869                         drbd_queue_work(&connection->sender_work, &dw->w);
870                         return 1;
871                 }
872                 drbd_err(device, "Warn failed to drbd_rs_del_all() and to kmalloc(dw).\n");
873         }
874
875         dt = (jiffies - device->rs_start - device->rs_paused) / HZ;
876         if (dt <= 0)
877                 dt = 1;
878
879         db = device->rs_total;
880         /* adjust for verify start and stop sectors, respective reached position */
881         if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
882                 db -= device->ov_left;
883
884         dbdt = Bit2KB(db/dt);
885         device->rs_paused /= HZ;
886
887         if (!get_ldev(device))
888                 goto out;
889
890         ping_peer(device);
891
892         spin_lock_irq(&device->resource->req_lock);
893         os = drbd_read_state(device);
894
895         verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
896
897         /* This protects us against multiple calls (that can happen in the presence
898            of application IO), and against connectivity loss just before we arrive here. */
899         if (os.conn <= C_CONNECTED)
900                 goto out_unlock;
901
902         ns = os;
903         ns.conn = C_CONNECTED;
904
905         drbd_info(device, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
906              verify_done ? "Online verify" : "Resync",
907              dt + device->rs_paused, device->rs_paused, dbdt);
908
909         n_oos = drbd_bm_total_weight(device);
910
911         if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
912                 if (n_oos) {
913                         drbd_alert(device, "Online verify found %lu %dk block out of sync!\n",
914                               n_oos, Bit2KB(1));
915                         khelper_cmd = "out-of-sync";
916                 }
917         } else {
918                 D_ASSERT(device, (n_oos - device->rs_failed) == 0);
919
920                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
921                         khelper_cmd = "after-resync-target";
922
923                 if (device->use_csums && device->rs_total) {
924                         const unsigned long s = device->rs_same_csum;
925                         const unsigned long t = device->rs_total;
926                         const int ratio =
927                                 (t == 0)     ? 0 :
928                         (t < 100000) ? ((s*100)/t) : (s/(t/100));
929                         drbd_info(device, "%u %% had equal checksums, eliminated: %luK; "
930                              "transferred %luK total %luK\n",
931                              ratio,
932                              Bit2KB(device->rs_same_csum),
933                              Bit2KB(device->rs_total - device->rs_same_csum),
934                              Bit2KB(device->rs_total));
935                 }
936         }
937
938         if (device->rs_failed) {
939                 drbd_info(device, "            %lu failed blocks\n", device->rs_failed);
940
941                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
942                         ns.disk = D_INCONSISTENT;
943                         ns.pdsk = D_UP_TO_DATE;
944                 } else {
945                         ns.disk = D_UP_TO_DATE;
946                         ns.pdsk = D_INCONSISTENT;
947                 }
948         } else {
949                 ns.disk = D_UP_TO_DATE;
950                 ns.pdsk = D_UP_TO_DATE;
951
952                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
953                         if (device->p_uuid) {
954                                 int i;
955                                 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
956                                         _drbd_uuid_set(device, i, device->p_uuid[i]);
957                                 drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_CURRENT]);
958                                 _drbd_uuid_set(device, UI_CURRENT, device->p_uuid[UI_CURRENT]);
959                         } else {
960                                 drbd_err(device, "device->p_uuid is NULL! BUG\n");
961                         }
962                 }
963
964                 if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
965                         /* for verify runs, we don't update uuids here,
966                          * so there would be nothing to report. */
967                         drbd_uuid_set_bm(device, 0UL);
968                         drbd_print_uuids(device, "updated UUIDs");
969                         if (device->p_uuid) {
970                                 /* Now the two UUID sets are equal, update what we
971                                  * know of the peer. */
972                                 int i;
973                                 for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
974                                         device->p_uuid[i] = device->ldev->md.uuid[i];
975                         }
976                 }
977         }
978
979         _drbd_set_state(device, ns, CS_VERBOSE, NULL);
980 out_unlock:
981         spin_unlock_irq(&device->resource->req_lock);
982
983         /* If we have been sync source, and have an effective fencing-policy,
984          * once *all* volumes are back in sync, call "unfence". */
985         if (os.conn == C_SYNC_SOURCE) {
986                 enum drbd_disk_state disk_state = D_MASK;
987                 enum drbd_disk_state pdsk_state = D_MASK;
988                 enum drbd_fencing_p fp = FP_DONT_CARE;
989
990                 rcu_read_lock();
991                 fp = rcu_dereference(device->ldev->disk_conf)->fencing;
992                 if (fp != FP_DONT_CARE) {
993                         struct drbd_peer_device *peer_device;
994                         int vnr;
995                         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
996                                 struct drbd_device *device = peer_device->device;
997                                 disk_state = min_t(enum drbd_disk_state, disk_state, device->state.disk);
998                                 pdsk_state = min_t(enum drbd_disk_state, pdsk_state, device->state.pdsk);
999                         }
1000                 }
1001                 rcu_read_unlock();
1002                 if (disk_state == D_UP_TO_DATE && pdsk_state == D_UP_TO_DATE)
1003                         conn_khelper(connection, "unfence-peer");
1004         }
1005
1006         put_ldev(device);
1007 out:
1008         device->rs_total  = 0;
1009         device->rs_failed = 0;
1010         device->rs_paused = 0;
1011
1012         /* reset start sector, if we reached end of device */
1013         if (verify_done && device->ov_left == 0)
1014                 device->ov_start_sector = 0;
1015
1016         drbd_md_sync(device);
1017
1018         if (khelper_cmd)
1019                 drbd_khelper(device, khelper_cmd);
1020
1021         return 1;
1022 }
1023
1024 /* helper */
1025 static void move_to_net_ee_or_free(struct drbd_device *device, struct drbd_peer_request *peer_req)
1026 {
1027         if (drbd_peer_req_has_active_page(peer_req)) {
1028                 /* This might happen if sendpage() has not finished */
1029                 int i = (peer_req->i.size + PAGE_SIZE -1) >> PAGE_SHIFT;
1030                 atomic_add(i, &device->pp_in_use_by_net);
1031                 atomic_sub(i, &device->pp_in_use);
1032                 spin_lock_irq(&device->resource->req_lock);
1033                 list_add_tail(&peer_req->w.list, &device->net_ee);
1034                 spin_unlock_irq(&device->resource->req_lock);
1035                 wake_up(&drbd_pp_wait);
1036         } else
1037                 drbd_free_peer_req(device, peer_req);
1038 }
1039
1040 /**
1041  * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
1042  * @w:          work object.
1043  * @cancel:     The connection will be closed anyways
1044  */
1045 int w_e_end_data_req(struct drbd_work *w, int cancel)
1046 {
1047         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1048         struct drbd_peer_device *peer_device = peer_req->peer_device;
1049         struct drbd_device *device = peer_device->device;
1050         int err;
1051
1052         if (unlikely(cancel)) {
1053                 drbd_free_peer_req(device, peer_req);
1054                 dec_unacked(device);
1055                 return 0;
1056         }
1057
1058         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1059                 err = drbd_send_block(peer_device, P_DATA_REPLY, peer_req);
1060         } else {
1061                 if (__ratelimit(&drbd_ratelimit_state))
1062                         drbd_err(device, "Sending NegDReply. sector=%llus.\n",
1063                             (unsigned long long)peer_req->i.sector);
1064
1065                 err = drbd_send_ack(peer_device, P_NEG_DREPLY, peer_req);
1066         }
1067
1068         dec_unacked(device);
1069
1070         move_to_net_ee_or_free(device, peer_req);
1071
1072         if (unlikely(err))
1073                 drbd_err(device, "drbd_send_block() failed\n");
1074         return err;
1075 }
1076
1077 static bool all_zero(struct drbd_peer_request *peer_req)
1078 {
1079         struct page *page = peer_req->pages;
1080         unsigned int len = peer_req->i.size;
1081
1082         page_chain_for_each(page) {
1083                 unsigned int l = min_t(unsigned int, len, PAGE_SIZE);
1084                 unsigned int i, words = l / sizeof(long);
1085                 unsigned long *d;
1086
1087                 d = kmap_atomic(page);
1088                 for (i = 0; i < words; i++) {
1089                         if (d[i]) {
1090                                 kunmap_atomic(d);
1091                                 return false;
1092                         }
1093                 }
1094                 kunmap_atomic(d);
1095                 len -= l;
1096         }
1097
1098         return true;
1099 }
1100
1101 /**
1102  * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUEST
1103  * @w:          work object.
1104  * @cancel:     The connection will be closed anyways
1105  */
1106 int w_e_end_rsdata_req(struct drbd_work *w, int cancel)
1107 {
1108         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1109         struct drbd_peer_device *peer_device = peer_req->peer_device;
1110         struct drbd_device *device = peer_device->device;
1111         int err;
1112
1113         if (unlikely(cancel)) {
1114                 drbd_free_peer_req(device, peer_req);
1115                 dec_unacked(device);
1116                 return 0;
1117         }
1118
1119         if (get_ldev_if_state(device, D_FAILED)) {
1120                 drbd_rs_complete_io(device, peer_req->i.sector);
1121                 put_ldev(device);
1122         }
1123
1124         if (device->state.conn == C_AHEAD) {
1125                 err = drbd_send_ack(peer_device, P_RS_CANCEL, peer_req);
1126         } else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1127                 if (likely(device->state.pdsk >= D_INCONSISTENT)) {
1128                         inc_rs_pending(device);
1129                         if (peer_req->flags & EE_RS_THIN_REQ && all_zero(peer_req))
1130                                 err = drbd_send_rs_deallocated(peer_device, peer_req);
1131                         else
1132                                 err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1133                 } else {
1134                         if (__ratelimit(&drbd_ratelimit_state))
1135                                 drbd_err(device, "Not sending RSDataReply, "
1136                                     "partner DISKLESS!\n");
1137                         err = 0;
1138                 }
1139         } else {
1140                 if (__ratelimit(&drbd_ratelimit_state))
1141                         drbd_err(device, "Sending NegRSDReply. sector %llus.\n",
1142                             (unsigned long long)peer_req->i.sector);
1143
1144                 err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1145
1146                 /* update resync data with failure */
1147                 drbd_rs_failed_io(device, peer_req->i.sector, peer_req->i.size);
1148         }
1149
1150         dec_unacked(device);
1151
1152         move_to_net_ee_or_free(device, peer_req);
1153
1154         if (unlikely(err))
1155                 drbd_err(device, "drbd_send_block() failed\n");
1156         return err;
1157 }
1158
1159 int w_e_end_csum_rs_req(struct drbd_work *w, int cancel)
1160 {
1161         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1162         struct drbd_peer_device *peer_device = peer_req->peer_device;
1163         struct drbd_device *device = peer_device->device;
1164         struct digest_info *di;
1165         int digest_size;
1166         void *digest = NULL;
1167         int err, eq = 0;
1168
1169         if (unlikely(cancel)) {
1170                 drbd_free_peer_req(device, peer_req);
1171                 dec_unacked(device);
1172                 return 0;
1173         }
1174
1175         if (get_ldev(device)) {
1176                 drbd_rs_complete_io(device, peer_req->i.sector);
1177                 put_ldev(device);
1178         }
1179
1180         di = peer_req->digest;
1181
1182         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1183                 /* quick hack to try to avoid a race against reconfiguration.
1184                  * a real fix would be much more involved,
1185                  * introducing more locking mechanisms */
1186                 if (peer_device->connection->csums_tfm) {
1187                         digest_size = crypto_ahash_digestsize(peer_device->connection->csums_tfm);
1188                         D_ASSERT(device, digest_size == di->digest_size);
1189                         digest = kmalloc(digest_size, GFP_NOIO);
1190                 }
1191                 if (digest) {
1192                         drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
1193                         eq = !memcmp(digest, di->digest, digest_size);
1194                         kfree(digest);
1195                 }
1196
1197                 if (eq) {
1198                         drbd_set_in_sync(device, peer_req->i.sector, peer_req->i.size);
1199                         /* rs_same_csums unit is BM_BLOCK_SIZE */
1200                         device->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT;
1201                         err = drbd_send_ack(peer_device, P_RS_IS_IN_SYNC, peer_req);
1202                 } else {
1203                         inc_rs_pending(device);
1204                         peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1205                         peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */
1206                         kfree(di);
1207                         err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1208                 }
1209         } else {
1210                 err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1211                 if (__ratelimit(&drbd_ratelimit_state))
1212                         drbd_err(device, "Sending NegDReply. I guess it gets messy.\n");
1213         }
1214
1215         dec_unacked(device);
1216         move_to_net_ee_or_free(device, peer_req);
1217
1218         if (unlikely(err))
1219                 drbd_err(device, "drbd_send_block/ack() failed\n");
1220         return err;
1221 }
1222
1223 int w_e_end_ov_req(struct drbd_work *w, int cancel)
1224 {
1225         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1226         struct drbd_peer_device *peer_device = peer_req->peer_device;
1227         struct drbd_device *device = peer_device->device;
1228         sector_t sector = peer_req->i.sector;
1229         unsigned int size = peer_req->i.size;
1230         int digest_size;
1231         void *digest;
1232         int err = 0;
1233
1234         if (unlikely(cancel))
1235                 goto out;
1236
1237         digest_size = crypto_ahash_digestsize(peer_device->connection->verify_tfm);
1238         digest = kmalloc(digest_size, GFP_NOIO);
1239         if (!digest) {
1240                 err = 1;        /* terminate the connection in case the allocation failed */
1241                 goto out;
1242         }
1243
1244         if (likely(!(peer_req->flags & EE_WAS_ERROR)))
1245                 drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1246         else
1247                 memset(digest, 0, digest_size);
1248
1249         /* Free e and pages before send.
1250          * In case we block on congestion, we could otherwise run into
1251          * some distributed deadlock, if the other side blocks on
1252          * congestion as well, because our receiver blocks in
1253          * drbd_alloc_pages due to pp_in_use > max_buffers. */
1254         drbd_free_peer_req(device, peer_req);
1255         peer_req = NULL;
1256         inc_rs_pending(device);
1257         err = drbd_send_drequest_csum(peer_device, sector, size, digest, digest_size, P_OV_REPLY);
1258         if (err)
1259                 dec_rs_pending(device);
1260         kfree(digest);
1261
1262 out:
1263         if (peer_req)
1264                 drbd_free_peer_req(device, peer_req);
1265         dec_unacked(device);
1266         return err;
1267 }
1268
1269 void drbd_ov_out_of_sync_found(struct drbd_device *device, sector_t sector, int size)
1270 {
1271         if (device->ov_last_oos_start + device->ov_last_oos_size == sector) {
1272                 device->ov_last_oos_size += size>>9;
1273         } else {
1274                 device->ov_last_oos_start = sector;
1275                 device->ov_last_oos_size = size>>9;
1276         }
1277         drbd_set_out_of_sync(device, sector, size);
1278 }
1279
1280 int w_e_end_ov_reply(struct drbd_work *w, int cancel)
1281 {
1282         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1283         struct drbd_peer_device *peer_device = peer_req->peer_device;
1284         struct drbd_device *device = peer_device->device;
1285         struct digest_info *di;
1286         void *digest;
1287         sector_t sector = peer_req->i.sector;
1288         unsigned int size = peer_req->i.size;
1289         int digest_size;
1290         int err, eq = 0;
1291         bool stop_sector_reached = false;
1292
1293         if (unlikely(cancel)) {
1294                 drbd_free_peer_req(device, peer_req);
1295                 dec_unacked(device);
1296                 return 0;
1297         }
1298
1299         /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1300          * the resync lru has been cleaned up already */
1301         if (get_ldev(device)) {
1302                 drbd_rs_complete_io(device, peer_req->i.sector);
1303                 put_ldev(device);
1304         }
1305
1306         di = peer_req->digest;
1307
1308         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1309                 digest_size = crypto_ahash_digestsize(peer_device->connection->verify_tfm);
1310                 digest = kmalloc(digest_size, GFP_NOIO);
1311                 if (digest) {
1312                         drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1313
1314                         D_ASSERT(device, digest_size == di->digest_size);
1315                         eq = !memcmp(digest, di->digest, digest_size);
1316                         kfree(digest);
1317                 }
1318         }
1319
1320         /* Free peer_req and pages before send.
1321          * In case we block on congestion, we could otherwise run into
1322          * some distributed deadlock, if the other side blocks on
1323          * congestion as well, because our receiver blocks in
1324          * drbd_alloc_pages due to pp_in_use > max_buffers. */
1325         drbd_free_peer_req(device, peer_req);
1326         if (!eq)
1327                 drbd_ov_out_of_sync_found(device, sector, size);
1328         else
1329                 ov_out_of_sync_print(device);
1330
1331         err = drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size,
1332                                eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1333
1334         dec_unacked(device);
1335
1336         --device->ov_left;
1337
1338         /* let's advance progress step marks only for every other megabyte */
1339         if ((device->ov_left & 0x200) == 0x200)
1340                 drbd_advance_rs_marks(device, device->ov_left);
1341
1342         stop_sector_reached = verify_can_do_stop_sector(device) &&
1343                 (sector + (size>>9)) >= device->ov_stop_sector;
1344
1345         if (device->ov_left == 0 || stop_sector_reached) {
1346                 ov_out_of_sync_print(device);
1347                 drbd_resync_finished(device);
1348         }
1349
1350         return err;
1351 }
1352
1353 /* FIXME
1354  * We need to track the number of pending barrier acks,
1355  * and to be able to wait for them.
1356  * See also comment in drbd_adm_attach before drbd_suspend_io.
1357  */
1358 static int drbd_send_barrier(struct drbd_connection *connection)
1359 {
1360         struct p_barrier *p;
1361         struct drbd_socket *sock;
1362
1363         sock = &connection->data;
1364         p = conn_prepare_command(connection, sock);
1365         if (!p)
1366                 return -EIO;
1367         p->barrier = connection->send.current_epoch_nr;
1368         p->pad = 0;
1369         connection->send.current_epoch_writes = 0;
1370         connection->send.last_sent_barrier_jif = jiffies;
1371
1372         return conn_send_command(connection, sock, P_BARRIER, sizeof(*p), NULL, 0);
1373 }
1374
1375 int w_send_write_hint(struct drbd_work *w, int cancel)
1376 {
1377         struct drbd_device *device =
1378                 container_of(w, struct drbd_device, unplug_work);
1379         struct drbd_socket *sock;
1380
1381         if (cancel)
1382                 return 0;
1383         sock = &first_peer_device(device)->connection->data;
1384         if (!drbd_prepare_command(first_peer_device(device), sock))
1385                 return -EIO;
1386         return drbd_send_command(first_peer_device(device), sock, P_UNPLUG_REMOTE, 0, NULL, 0);
1387 }
1388
1389 static void re_init_if_first_write(struct drbd_connection *connection, unsigned int epoch)
1390 {
1391         if (!connection->send.seen_any_write_yet) {
1392                 connection->send.seen_any_write_yet = true;
1393                 connection->send.current_epoch_nr = epoch;
1394                 connection->send.current_epoch_writes = 0;
1395                 connection->send.last_sent_barrier_jif = jiffies;
1396         }
1397 }
1398
1399 static void maybe_send_barrier(struct drbd_connection *connection, unsigned int epoch)
1400 {
1401         /* re-init if first write on this connection */
1402         if (!connection->send.seen_any_write_yet)
1403                 return;
1404         if (connection->send.current_epoch_nr != epoch) {
1405                 if (connection->send.current_epoch_writes)
1406                         drbd_send_barrier(connection);
1407                 connection->send.current_epoch_nr = epoch;
1408         }
1409 }
1410
1411 int w_send_out_of_sync(struct drbd_work *w, int cancel)
1412 {
1413         struct drbd_request *req = container_of(w, struct drbd_request, w);
1414         struct drbd_device *device = req->device;
1415         struct drbd_peer_device *const peer_device = first_peer_device(device);
1416         struct drbd_connection *const connection = peer_device->connection;
1417         int err;
1418
1419         if (unlikely(cancel)) {
1420                 req_mod(req, SEND_CANCELED);
1421                 return 0;
1422         }
1423         req->pre_send_jif = jiffies;
1424
1425         /* this time, no connection->send.current_epoch_writes++;
1426          * If it was sent, it was the closing barrier for the last
1427          * replicated epoch, before we went into AHEAD mode.
1428          * No more barriers will be sent, until we leave AHEAD mode again. */
1429         maybe_send_barrier(connection, req->epoch);
1430
1431         err = drbd_send_out_of_sync(peer_device, req);
1432         req_mod(req, OOS_HANDED_TO_NETWORK);
1433
1434         return err;
1435 }
1436
1437 /**
1438  * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1439  * @w:          work object.
1440  * @cancel:     The connection will be closed anyways
1441  */
1442 int w_send_dblock(struct drbd_work *w, int cancel)
1443 {
1444         struct drbd_request *req = container_of(w, struct drbd_request, w);
1445         struct drbd_device *device = req->device;
1446         struct drbd_peer_device *const peer_device = first_peer_device(device);
1447         struct drbd_connection *connection = peer_device->connection;
1448         int err;
1449
1450         if (unlikely(cancel)) {
1451                 req_mod(req, SEND_CANCELED);
1452                 return 0;
1453         }
1454         req->pre_send_jif = jiffies;
1455
1456         re_init_if_first_write(connection, req->epoch);
1457         maybe_send_barrier(connection, req->epoch);
1458         connection->send.current_epoch_writes++;
1459
1460         err = drbd_send_dblock(peer_device, req);
1461         req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1462
1463         return err;
1464 }
1465
1466 /**
1467  * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1468  * @w:          work object.
1469  * @cancel:     The connection will be closed anyways
1470  */
1471 int w_send_read_req(struct drbd_work *w, int cancel)
1472 {
1473         struct drbd_request *req = container_of(w, struct drbd_request, w);
1474         struct drbd_device *device = req->device;
1475         struct drbd_peer_device *const peer_device = first_peer_device(device);
1476         struct drbd_connection *connection = peer_device->connection;
1477         int err;
1478
1479         if (unlikely(cancel)) {
1480                 req_mod(req, SEND_CANCELED);
1481                 return 0;
1482         }
1483         req->pre_send_jif = jiffies;
1484
1485         /* Even read requests may close a write epoch,
1486          * if there was any yet. */
1487         maybe_send_barrier(connection, req->epoch);
1488
1489         err = drbd_send_drequest(peer_device, P_DATA_REQUEST, req->i.sector, req->i.size,
1490                                  (unsigned long)req);
1491
1492         req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1493
1494         return err;
1495 }
1496
1497 int w_restart_disk_io(struct drbd_work *w, int cancel)
1498 {
1499         struct drbd_request *req = container_of(w, struct drbd_request, w);
1500         struct drbd_device *device = req->device;
1501
1502         if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1503                 drbd_al_begin_io(device, &req->i);
1504
1505         drbd_req_make_private_bio(req, req->master_bio);
1506         req->private_bio->bi_bdev = device->ldev->backing_bdev;
1507         generic_make_request(req->private_bio);
1508
1509         return 0;
1510 }
1511
1512 static int _drbd_may_sync_now(struct drbd_device *device)
1513 {
1514         struct drbd_device *odev = device;
1515         int resync_after;
1516
1517         while (1) {
1518                 if (!odev->ldev || odev->state.disk == D_DISKLESS)
1519                         return 1;
1520                 rcu_read_lock();
1521                 resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1522                 rcu_read_unlock();
1523                 if (resync_after == -1)
1524                         return 1;
1525                 odev = minor_to_device(resync_after);
1526                 if (!odev)
1527                         return 1;
1528                 if ((odev->state.conn >= C_SYNC_SOURCE &&
1529                      odev->state.conn <= C_PAUSED_SYNC_T) ||
1530                     odev->state.aftr_isp || odev->state.peer_isp ||
1531                     odev->state.user_isp)
1532                         return 0;
1533         }
1534 }
1535
1536 /**
1537  * drbd_pause_after() - Pause resync on all devices that may not resync now
1538  * @device:     DRBD device.
1539  *
1540  * Called from process context only (admin command and after_state_ch).
1541  */
1542 static bool drbd_pause_after(struct drbd_device *device)
1543 {
1544         bool changed = false;
1545         struct drbd_device *odev;
1546         int i;
1547
1548         rcu_read_lock();
1549         idr_for_each_entry(&drbd_devices, odev, i) {
1550                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1551                         continue;
1552                 if (!_drbd_may_sync_now(odev) &&
1553                     _drbd_set_state(_NS(odev, aftr_isp, 1),
1554                                     CS_HARD, NULL) != SS_NOTHING_TO_DO)
1555                         changed = true;
1556         }
1557         rcu_read_unlock();
1558
1559         return changed;
1560 }
1561
1562 /**
1563  * drbd_resume_next() - Resume resync on all devices that may resync now
1564  * @device:     DRBD device.
1565  *
1566  * Called from process context only (admin command and worker).
1567  */
1568 static bool drbd_resume_next(struct drbd_device *device)
1569 {
1570         bool changed = false;
1571         struct drbd_device *odev;
1572         int i;
1573
1574         rcu_read_lock();
1575         idr_for_each_entry(&drbd_devices, odev, i) {
1576                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1577                         continue;
1578                 if (odev->state.aftr_isp) {
1579                         if (_drbd_may_sync_now(odev) &&
1580                             _drbd_set_state(_NS(odev, aftr_isp, 0),
1581                                             CS_HARD, NULL) != SS_NOTHING_TO_DO)
1582                                 changed = true;
1583                 }
1584         }
1585         rcu_read_unlock();
1586         return changed;
1587 }
1588
1589 void resume_next_sg(struct drbd_device *device)
1590 {
1591         lock_all_resources();
1592         drbd_resume_next(device);
1593         unlock_all_resources();
1594 }
1595
1596 void suspend_other_sg(struct drbd_device *device)
1597 {
1598         lock_all_resources();
1599         drbd_pause_after(device);
1600         unlock_all_resources();
1601 }
1602
1603 /* caller must lock_all_resources() */
1604 enum drbd_ret_code drbd_resync_after_valid(struct drbd_device *device, int o_minor)
1605 {
1606         struct drbd_device *odev;
1607         int resync_after;
1608
1609         if (o_minor == -1)
1610                 return NO_ERROR;
1611         if (o_minor < -1 || o_minor > MINORMASK)
1612                 return ERR_RESYNC_AFTER;
1613
1614         /* check for loops */
1615         odev = minor_to_device(o_minor);
1616         while (1) {
1617                 if (odev == device)
1618                         return ERR_RESYNC_AFTER_CYCLE;
1619
1620                 /* You are free to depend on diskless, non-existing,
1621                  * or not yet/no longer existing minors.
1622                  * We only reject dependency loops.
1623                  * We cannot follow the dependency chain beyond a detached or
1624                  * missing minor.
1625                  */
1626                 if (!odev || !odev->ldev || odev->state.disk == D_DISKLESS)
1627                         return NO_ERROR;
1628
1629                 rcu_read_lock();
1630                 resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1631                 rcu_read_unlock();
1632                 /* dependency chain ends here, no cycles. */
1633                 if (resync_after == -1)
1634                         return NO_ERROR;
1635
1636                 /* follow the dependency chain */
1637                 odev = minor_to_device(resync_after);
1638         }
1639 }
1640
1641 /* caller must lock_all_resources() */
1642 void drbd_resync_after_changed(struct drbd_device *device)
1643 {
1644         int changed;
1645
1646         do {
1647                 changed  = drbd_pause_after(device);
1648                 changed |= drbd_resume_next(device);
1649         } while (changed);
1650 }
1651
1652 void drbd_rs_controller_reset(struct drbd_device *device)
1653 {
1654         struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk;
1655         struct fifo_buffer *plan;
1656
1657         atomic_set(&device->rs_sect_in, 0);
1658         atomic_set(&device->rs_sect_ev, 0);
1659         device->rs_in_flight = 0;
1660         device->rs_last_events =
1661                 (int)part_stat_read(&disk->part0, sectors[0]) +
1662                 (int)part_stat_read(&disk->part0, sectors[1]);
1663
1664         /* Updating the RCU protected object in place is necessary since
1665            this function gets called from atomic context.
1666            It is valid since all other updates also lead to an completely
1667            empty fifo */
1668         rcu_read_lock();
1669         plan = rcu_dereference(device->rs_plan_s);
1670         plan->total = 0;
1671         fifo_set(plan, 0);
1672         rcu_read_unlock();
1673 }
1674
1675 void start_resync_timer_fn(unsigned long data)
1676 {
1677         struct drbd_device *device = (struct drbd_device *) data;
1678         drbd_device_post_work(device, RS_START);
1679 }
1680
1681 static void do_start_resync(struct drbd_device *device)
1682 {
1683         if (atomic_read(&device->unacked_cnt) || atomic_read(&device->rs_pending_cnt)) {
1684                 drbd_warn(device, "postponing start_resync ...\n");
1685                 device->start_resync_timer.expires = jiffies + HZ/10;
1686                 add_timer(&device->start_resync_timer);
1687                 return;
1688         }
1689
1690         drbd_start_resync(device, C_SYNC_SOURCE);
1691         clear_bit(AHEAD_TO_SYNC_SOURCE, &device->flags);
1692 }
1693
1694 static bool use_checksum_based_resync(struct drbd_connection *connection, struct drbd_device *device)
1695 {
1696         bool csums_after_crash_only;
1697         rcu_read_lock();
1698         csums_after_crash_only = rcu_dereference(connection->net_conf)->csums_after_crash_only;
1699         rcu_read_unlock();
1700         return connection->agreed_pro_version >= 89 &&          /* supported? */
1701                 connection->csums_tfm &&                        /* configured? */
1702                 (csums_after_crash_only == false                /* use for each resync? */
1703                  || test_bit(CRASHED_PRIMARY, &device->flags)); /* or only after Primary crash? */
1704 }
1705
1706 /**
1707  * drbd_start_resync() - Start the resync process
1708  * @device:     DRBD device.
1709  * @side:       Either C_SYNC_SOURCE or C_SYNC_TARGET
1710  *
1711  * This function might bring you directly into one of the
1712  * C_PAUSED_SYNC_* states.
1713  */
1714 void drbd_start_resync(struct drbd_device *device, enum drbd_conns side)
1715 {
1716         struct drbd_peer_device *peer_device = first_peer_device(device);
1717         struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
1718         union drbd_state ns;
1719         int r;
1720
1721         if (device->state.conn >= C_SYNC_SOURCE && device->state.conn < C_AHEAD) {
1722                 drbd_err(device, "Resync already running!\n");
1723                 return;
1724         }
1725
1726         if (!test_bit(B_RS_H_DONE, &device->flags)) {
1727                 if (side == C_SYNC_TARGET) {
1728                         /* Since application IO was locked out during C_WF_BITMAP_T and
1729                            C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1730                            we check that we might make the data inconsistent. */
1731                         r = drbd_khelper(device, "before-resync-target");
1732                         r = (r >> 8) & 0xff;
1733                         if (r > 0) {
1734                                 drbd_info(device, "before-resync-target handler returned %d, "
1735                                          "dropping connection.\n", r);
1736                                 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
1737                                 return;
1738                         }
1739                 } else /* C_SYNC_SOURCE */ {
1740                         r = drbd_khelper(device, "before-resync-source");
1741                         r = (r >> 8) & 0xff;
1742                         if (r > 0) {
1743                                 if (r == 3) {
1744                                         drbd_info(device, "before-resync-source handler returned %d, "
1745                                                  "ignoring. Old userland tools?", r);
1746                                 } else {
1747                                         drbd_info(device, "before-resync-source handler returned %d, "
1748                                                  "dropping connection.\n", r);
1749                                         conn_request_state(connection,
1750                                                            NS(conn, C_DISCONNECTING), CS_HARD);
1751                                         return;
1752                                 }
1753                         }
1754                 }
1755         }
1756
1757         if (current == connection->worker.task) {
1758                 /* The worker should not sleep waiting for state_mutex,
1759                    that can take long */
1760                 if (!mutex_trylock(device->state_mutex)) {
1761                         set_bit(B_RS_H_DONE, &device->flags);
1762                         device->start_resync_timer.expires = jiffies + HZ/5;
1763                         add_timer(&device->start_resync_timer);
1764                         return;
1765                 }
1766         } else {
1767                 mutex_lock(device->state_mutex);
1768         }
1769
1770         lock_all_resources();
1771         clear_bit(B_RS_H_DONE, &device->flags);
1772         /* Did some connection breakage or IO error race with us? */
1773         if (device->state.conn < C_CONNECTED
1774         || !get_ldev_if_state(device, D_NEGOTIATING)) {
1775                 unlock_all_resources();
1776                 goto out;
1777         }
1778
1779         ns = drbd_read_state(device);
1780
1781         ns.aftr_isp = !_drbd_may_sync_now(device);
1782
1783         ns.conn = side;
1784
1785         if (side == C_SYNC_TARGET)
1786                 ns.disk = D_INCONSISTENT;
1787         else /* side == C_SYNC_SOURCE */
1788                 ns.pdsk = D_INCONSISTENT;
1789
1790         r = _drbd_set_state(device, ns, CS_VERBOSE, NULL);
1791         ns = drbd_read_state(device);
1792
1793         if (ns.conn < C_CONNECTED)
1794                 r = SS_UNKNOWN_ERROR;
1795
1796         if (r == SS_SUCCESS) {
1797                 unsigned long tw = drbd_bm_total_weight(device);
1798                 unsigned long now = jiffies;
1799                 int i;
1800
1801                 device->rs_failed    = 0;
1802                 device->rs_paused    = 0;
1803                 device->rs_same_csum = 0;
1804                 device->rs_last_sect_ev = 0;
1805                 device->rs_total     = tw;
1806                 device->rs_start     = now;
1807                 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1808                         device->rs_mark_left[i] = tw;
1809                         device->rs_mark_time[i] = now;
1810                 }
1811                 drbd_pause_after(device);
1812                 /* Forget potentially stale cached per resync extent bit-counts.
1813                  * Open coded drbd_rs_cancel_all(device), we already have IRQs
1814                  * disabled, and know the disk state is ok. */
1815                 spin_lock(&device->al_lock);
1816                 lc_reset(device->resync);
1817                 device->resync_locked = 0;
1818                 device->resync_wenr = LC_FREE;
1819                 spin_unlock(&device->al_lock);
1820         }
1821         unlock_all_resources();
1822
1823         if (r == SS_SUCCESS) {
1824                 wake_up(&device->al_wait); /* for lc_reset() above */
1825                 /* reset rs_last_bcast when a resync or verify is started,
1826                  * to deal with potential jiffies wrap. */
1827                 device->rs_last_bcast = jiffies - HZ;
1828
1829                 drbd_info(device, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1830                      drbd_conn_str(ns.conn),
1831                      (unsigned long) device->rs_total << (BM_BLOCK_SHIFT-10),
1832                      (unsigned long) device->rs_total);
1833                 if (side == C_SYNC_TARGET) {
1834                         device->bm_resync_fo = 0;
1835                         device->use_csums = use_checksum_based_resync(connection, device);
1836                 } else {
1837                         device->use_csums = false;
1838                 }
1839
1840                 /* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1841                  * with w_send_oos, or the sync target will get confused as to
1842                  * how much bits to resync.  We cannot do that always, because for an
1843                  * empty resync and protocol < 95, we need to do it here, as we call
1844                  * drbd_resync_finished from here in that case.
1845                  * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1846                  * and from after_state_ch otherwise. */
1847                 if (side == C_SYNC_SOURCE && connection->agreed_pro_version < 96)
1848                         drbd_gen_and_send_sync_uuid(peer_device);
1849
1850                 if (connection->agreed_pro_version < 95 && device->rs_total == 0) {
1851                         /* This still has a race (about when exactly the peers
1852                          * detect connection loss) that can lead to a full sync
1853                          * on next handshake. In 8.3.9 we fixed this with explicit
1854                          * resync-finished notifications, but the fix
1855                          * introduces a protocol change.  Sleeping for some
1856                          * time longer than the ping interval + timeout on the
1857                          * SyncSource, to give the SyncTarget the chance to
1858                          * detect connection loss, then waiting for a ping
1859                          * response (implicit in drbd_resync_finished) reduces
1860                          * the race considerably, but does not solve it. */
1861                         if (side == C_SYNC_SOURCE) {
1862                                 struct net_conf *nc;
1863                                 int timeo;
1864
1865                                 rcu_read_lock();
1866                                 nc = rcu_dereference(connection->net_conf);
1867                                 timeo = nc->ping_int * HZ + nc->ping_timeo * HZ / 9;
1868                                 rcu_read_unlock();
1869                                 schedule_timeout_interruptible(timeo);
1870                         }
1871                         drbd_resync_finished(device);
1872                 }
1873
1874                 drbd_rs_controller_reset(device);
1875                 /* ns.conn may already be != device->state.conn,
1876                  * we may have been paused in between, or become paused until
1877                  * the timer triggers.
1878                  * No matter, that is handled in resync_timer_fn() */
1879                 if (ns.conn == C_SYNC_TARGET)
1880                         mod_timer(&device->resync_timer, jiffies);
1881
1882                 drbd_md_sync(device);
1883         }
1884         put_ldev(device);
1885 out:
1886         mutex_unlock(device->state_mutex);
1887 }
1888
1889 static void update_on_disk_bitmap(struct drbd_device *device, bool resync_done)
1890 {
1891         struct sib_info sib = { .sib_reason = SIB_SYNC_PROGRESS, };
1892         device->rs_last_bcast = jiffies;
1893
1894         if (!get_ldev(device))
1895                 return;
1896
1897         drbd_bm_write_lazy(device, 0);
1898         if (resync_done && is_sync_state(device->state.conn))
1899                 drbd_resync_finished(device);
1900
1901         drbd_bcast_event(device, &sib);
1902         /* update timestamp, in case it took a while to write out stuff */
1903         device->rs_last_bcast = jiffies;
1904         put_ldev(device);
1905 }
1906
1907 static void drbd_ldev_destroy(struct drbd_device *device)
1908 {
1909         lc_destroy(device->resync);
1910         device->resync = NULL;
1911         lc_destroy(device->act_log);
1912         device->act_log = NULL;
1913
1914         __acquire(local);
1915         drbd_backing_dev_free(device, device->ldev);
1916         device->ldev = NULL;
1917         __release(local);
1918
1919         clear_bit(GOING_DISKLESS, &device->flags);
1920         wake_up(&device->misc_wait);
1921 }
1922
1923 static void go_diskless(struct drbd_device *device)
1924 {
1925         D_ASSERT(device, device->state.disk == D_FAILED);
1926         /* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
1927          * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
1928          * the protected members anymore, though, so once put_ldev reaches zero
1929          * again, it will be safe to free them. */
1930
1931         /* Try to write changed bitmap pages, read errors may have just
1932          * set some bits outside the area covered by the activity log.
1933          *
1934          * If we have an IO error during the bitmap writeout,
1935          * we will want a full sync next time, just in case.
1936          * (Do we want a specific meta data flag for this?)
1937          *
1938          * If that does not make it to stable storage either,
1939          * we cannot do anything about that anymore.
1940          *
1941          * We still need to check if both bitmap and ldev are present, we may
1942          * end up here after a failed attach, before ldev was even assigned.
1943          */
1944         if (device->bitmap && device->ldev) {
1945                 /* An interrupted resync or similar is allowed to recounts bits
1946                  * while we detach.
1947                  * Any modifications would not be expected anymore, though.
1948                  */
1949                 if (drbd_bitmap_io_from_worker(device, drbd_bm_write,
1950                                         "detach", BM_LOCKED_TEST_ALLOWED)) {
1951                         if (test_bit(WAS_READ_ERROR, &device->flags)) {
1952                                 drbd_md_set_flag(device, MDF_FULL_SYNC);
1953                                 drbd_md_sync(device);
1954                         }
1955                 }
1956         }
1957
1958         drbd_force_state(device, NS(disk, D_DISKLESS));
1959 }
1960
1961 static int do_md_sync(struct drbd_device *device)
1962 {
1963         drbd_warn(device, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
1964         drbd_md_sync(device);
1965         return 0;
1966 }
1967
1968 /* only called from drbd_worker thread, no locking */
1969 void __update_timing_details(
1970                 struct drbd_thread_timing_details *tdp,
1971                 unsigned int *cb_nr,
1972                 void *cb,
1973                 const char *fn, const unsigned int line)
1974 {
1975         unsigned int i = *cb_nr % DRBD_THREAD_DETAILS_HIST;
1976         struct drbd_thread_timing_details *td = tdp + i;
1977
1978         td->start_jif = jiffies;
1979         td->cb_addr = cb;
1980         td->caller_fn = fn;
1981         td->line = line;
1982         td->cb_nr = *cb_nr;
1983
1984         i = (i+1) % DRBD_THREAD_DETAILS_HIST;
1985         td = tdp + i;
1986         memset(td, 0, sizeof(*td));
1987
1988         ++(*cb_nr);
1989 }
1990
1991 static void do_device_work(struct drbd_device *device, const unsigned long todo)
1992 {
1993         if (test_bit(MD_SYNC, &todo))
1994                 do_md_sync(device);
1995         if (test_bit(RS_DONE, &todo) ||
1996             test_bit(RS_PROGRESS, &todo))
1997                 update_on_disk_bitmap(device, test_bit(RS_DONE, &todo));
1998         if (test_bit(GO_DISKLESS, &todo))
1999                 go_diskless(device);
2000         if (test_bit(DESTROY_DISK, &todo))
2001                 drbd_ldev_destroy(device);
2002         if (test_bit(RS_START, &todo))
2003                 do_start_resync(device);
2004 }
2005
2006 #define DRBD_DEVICE_WORK_MASK   \
2007         ((1UL << GO_DISKLESS)   \
2008         |(1UL << DESTROY_DISK)  \
2009         |(1UL << MD_SYNC)       \
2010         |(1UL << RS_START)      \
2011         |(1UL << RS_PROGRESS)   \
2012         |(1UL << RS_DONE)       \
2013         )
2014
2015 static unsigned long get_work_bits(unsigned long *flags)
2016 {
2017         unsigned long old, new;
2018         do {
2019                 old = *flags;
2020                 new = old & ~DRBD_DEVICE_WORK_MASK;
2021         } while (cmpxchg(flags, old, new) != old);
2022         return old & DRBD_DEVICE_WORK_MASK;
2023 }
2024
2025 static void do_unqueued_work(struct drbd_connection *connection)
2026 {
2027         struct drbd_peer_device *peer_device;
2028         int vnr;
2029
2030         rcu_read_lock();
2031         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
2032                 struct drbd_device *device = peer_device->device;
2033                 unsigned long todo = get_work_bits(&device->flags);
2034                 if (!todo)
2035                         continue;
2036
2037                 kref_get(&device->kref);
2038                 rcu_read_unlock();
2039                 do_device_work(device, todo);
2040                 kref_put(&device->kref, drbd_destroy_device);
2041                 rcu_read_lock();
2042         }
2043         rcu_read_unlock();
2044 }
2045
2046 static bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list)
2047 {
2048         spin_lock_irq(&queue->q_lock);
2049         list_splice_tail_init(&queue->q, work_list);
2050         spin_unlock_irq(&queue->q_lock);
2051         return !list_empty(work_list);
2052 }
2053
2054 static void wait_for_work(struct drbd_connection *connection, struct list_head *work_list)
2055 {
2056         DEFINE_WAIT(wait);
2057         struct net_conf *nc;
2058         int uncork, cork;
2059
2060         dequeue_work_batch(&connection->sender_work, work_list);
2061         if (!list_empty(work_list))
2062                 return;
2063
2064         /* Still nothing to do?
2065          * Maybe we still need to close the current epoch,
2066          * even if no new requests are queued yet.
2067          *
2068          * Also, poke TCP, just in case.
2069          * Then wait for new work (or signal). */
2070         rcu_read_lock();
2071         nc = rcu_dereference(connection->net_conf);
2072         uncork = nc ? nc->tcp_cork : 0;
2073         rcu_read_unlock();
2074         if (uncork) {
2075                 mutex_lock(&connection->data.mutex);
2076                 if (connection->data.socket)
2077                         drbd_tcp_uncork(connection->data.socket);
2078                 mutex_unlock(&connection->data.mutex);
2079         }
2080
2081         for (;;) {
2082                 int send_barrier;
2083                 prepare_to_wait(&connection->sender_work.q_wait, &wait, TASK_INTERRUPTIBLE);
2084                 spin_lock_irq(&connection->resource->req_lock);
2085                 spin_lock(&connection->sender_work.q_lock);     /* FIXME get rid of this one? */
2086                 if (!list_empty(&connection->sender_work.q))
2087                         list_splice_tail_init(&connection->sender_work.q, work_list);
2088                 spin_unlock(&connection->sender_work.q_lock);   /* FIXME get rid of this one? */
2089                 if (!list_empty(work_list) || signal_pending(current)) {
2090                         spin_unlock_irq(&connection->resource->req_lock);
2091                         break;
2092                 }
2093
2094                 /* We found nothing new to do, no to-be-communicated request,
2095                  * no other work item.  We may still need to close the last
2096                  * epoch.  Next incoming request epoch will be connection ->
2097                  * current transfer log epoch number.  If that is different
2098                  * from the epoch of the last request we communicated, it is
2099                  * safe to send the epoch separating barrier now.
2100                  */
2101                 send_barrier =
2102                         atomic_read(&connection->current_tle_nr) !=
2103                         connection->send.current_epoch_nr;
2104                 spin_unlock_irq(&connection->resource->req_lock);
2105
2106                 if (send_barrier)
2107                         maybe_send_barrier(connection,
2108                                         connection->send.current_epoch_nr + 1);
2109
2110                 if (test_bit(DEVICE_WORK_PENDING, &connection->flags))
2111                         break;
2112
2113                 /* drbd_send() may have called flush_signals() */
2114                 if (get_t_state(&connection->worker) != RUNNING)
2115                         break;
2116
2117                 schedule();
2118                 /* may be woken up for other things but new work, too,
2119                  * e.g. if the current epoch got closed.
2120                  * In which case we send the barrier above. */
2121         }
2122         finish_wait(&connection->sender_work.q_wait, &wait);
2123
2124         /* someone may have changed the config while we have been waiting above. */
2125         rcu_read_lock();
2126         nc = rcu_dereference(connection->net_conf);
2127         cork = nc ? nc->tcp_cork : 0;
2128         rcu_read_unlock();
2129         mutex_lock(&connection->data.mutex);
2130         if (connection->data.socket) {
2131                 if (cork)
2132                         drbd_tcp_cork(connection->data.socket);
2133                 else if (!uncork)
2134                         drbd_tcp_uncork(connection->data.socket);
2135         }
2136         mutex_unlock(&connection->data.mutex);
2137 }
2138
2139 int drbd_worker(struct drbd_thread *thi)
2140 {
2141         struct drbd_connection *connection = thi->connection;
2142         struct drbd_work *w = NULL;
2143         struct drbd_peer_device *peer_device;
2144         LIST_HEAD(work_list);
2145         int vnr;
2146
2147         while (get_t_state(thi) == RUNNING) {
2148                 drbd_thread_current_set_cpu(thi);
2149
2150                 if (list_empty(&work_list)) {
2151                         update_worker_timing_details(connection, wait_for_work);
2152                         wait_for_work(connection, &work_list);
2153                 }
2154
2155                 if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2156                         update_worker_timing_details(connection, do_unqueued_work);
2157                         do_unqueued_work(connection);
2158                 }
2159
2160                 if (signal_pending(current)) {
2161                         flush_signals(current);
2162                         if (get_t_state(thi) == RUNNING) {
2163                                 drbd_warn(connection, "Worker got an unexpected signal\n");
2164                                 continue;
2165                         }
2166                         break;
2167                 }
2168
2169                 if (get_t_state(thi) != RUNNING)
2170                         break;
2171
2172                 if (!list_empty(&work_list)) {
2173                         w = list_first_entry(&work_list, struct drbd_work, list);
2174                         list_del_init(&w->list);
2175                         update_worker_timing_details(connection, w->cb);
2176                         if (w->cb(w, connection->cstate < C_WF_REPORT_PARAMS) == 0)
2177                                 continue;
2178                         if (connection->cstate >= C_WF_REPORT_PARAMS)
2179                                 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
2180                 }
2181         }
2182
2183         do {
2184                 if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2185                         update_worker_timing_details(connection, do_unqueued_work);
2186                         do_unqueued_work(connection);
2187                 }
2188                 if (!list_empty(&work_list)) {
2189                         w = list_first_entry(&work_list, struct drbd_work, list);
2190                         list_del_init(&w->list);
2191                         update_worker_timing_details(connection, w->cb);
2192                         w->cb(w, 1);
2193                 } else
2194                         dequeue_work_batch(&connection->sender_work, &work_list);
2195         } while (!list_empty(&work_list) || test_bit(DEVICE_WORK_PENDING, &connection->flags));
2196
2197         rcu_read_lock();
2198         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
2199                 struct drbd_device *device = peer_device->device;
2200                 D_ASSERT(device, device->state.disk == D_DISKLESS && device->state.conn == C_STANDALONE);
2201                 kref_get(&device->kref);
2202                 rcu_read_unlock();
2203                 drbd_device_cleanup(device);
2204                 kref_put(&device->kref, drbd_destroy_device);
2205                 rcu_read_lock();
2206         }
2207         rcu_read_unlock();
2208
2209         return 0;
2210 }