c47fcc5af7f2cfcffe8763a6c46801441786fc3e
[cascardo/linux.git] / drivers / block / drbd / drbd_worker.c
1 /*
2    drbd_worker.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23
24 */
25
26 #include <linux/module.h>
27 #include <linux/drbd.h>
28 #include <linux/sched.h>
29 #include <linux/wait.h>
30 #include <linux/mm.h>
31 #include <linux/memcontrol.h>
32 #include <linux/mm_inline.h>
33 #include <linux/slab.h>
34 #include <linux/random.h>
35 #include <linux/string.h>
36 #include <linux/scatterlist.h>
37
38 #include "drbd_int.h"
39 #include "drbd_protocol.h"
40 #include "drbd_req.h"
41
42 static int w_make_ov_request(struct drbd_work *, int);
43
44
45 /* endio handlers:
46  *   drbd_md_io_complete (defined here)
47  *   drbd_request_endio (defined here)
48  *   drbd_peer_request_endio (defined here)
49  *   bm_async_io_complete (defined in drbd_bitmap.c)
50  *
51  * For all these callbacks, note the following:
52  * The callbacks will be called in irq context by the IDE drivers,
53  * and in Softirqs/Tasklets/BH context by the SCSI drivers.
54  * Try to get the locking right :)
55  *
56  */
57
58
59 /* About the global_state_lock
60    Each state transition on an device holds a read lock. In case we have
61    to evaluate the resync after dependencies, we grab a write lock, because
62    we need stable states on all devices for that.  */
63 rwlock_t global_state_lock;
64
65 /* used for synchronous meta data and bitmap IO
66  * submitted by drbd_md_sync_page_io()
67  */
68 void drbd_md_io_complete(struct bio *bio, int error)
69 {
70         struct drbd_md_io *md_io;
71         struct drbd_device *device;
72
73         md_io = (struct drbd_md_io *)bio->bi_private;
74         device = container_of(md_io, struct drbd_device, md_io);
75
76         md_io->error = error;
77
78         /* We grabbed an extra reference in _drbd_md_sync_page_io() to be able
79          * to timeout on the lower level device, and eventually detach from it.
80          * If this io completion runs after that timeout expired, this
81          * drbd_md_put_buffer() may allow us to finally try and re-attach.
82          * During normal operation, this only puts that extra reference
83          * down to 1 again.
84          * Make sure we first drop the reference, and only then signal
85          * completion, or we may (in drbd_al_read_log()) cycle so fast into the
86          * next drbd_md_sync_page_io(), that we trigger the
87          * ASSERT(atomic_read(&device->md_io_in_use) == 1) there.
88          */
89         drbd_md_put_buffer(device);
90         md_io->done = 1;
91         wake_up(&device->misc_wait);
92         bio_put(bio);
93         if (device->ldev) /* special case: drbd_md_read() during drbd_adm_attach() */
94                 put_ldev(device);
95 }
96
97 /* reads on behalf of the partner,
98  * "submitted" by the receiver
99  */
100 static void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local)
101 {
102         unsigned long flags = 0;
103         struct drbd_device *device = peer_req->dw.device;
104
105         spin_lock_irqsave(&device->resource->req_lock, flags);
106         device->read_cnt += peer_req->i.size >> 9;
107         list_del(&peer_req->dw.w.list);
108         if (list_empty(&device->read_ee))
109                 wake_up(&device->ee_wait);
110         if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
111                 __drbd_chk_io_error(device, DRBD_READ_ERROR);
112         spin_unlock_irqrestore(&device->resource->req_lock, flags);
113
114         drbd_queue_work(&first_peer_device(device)->connection->sender_work,
115                         &peer_req->dw.w);
116         put_ldev(device);
117 }
118
119 /* writes on behalf of the partner, or resync writes,
120  * "submitted" by the receiver, final stage.  */
121 static void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local)
122 {
123         unsigned long flags = 0;
124         struct drbd_device *device = peer_req->dw.device;
125         struct drbd_interval i;
126         int do_wake;
127         u64 block_id;
128         int do_al_complete_io;
129
130         /* after we moved peer_req to done_ee,
131          * we may no longer access it,
132          * it may be freed/reused already!
133          * (as soon as we release the req_lock) */
134         i = peer_req->i;
135         do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO;
136         block_id = peer_req->block_id;
137
138         spin_lock_irqsave(&device->resource->req_lock, flags);
139         device->writ_cnt += peer_req->i.size >> 9;
140         list_move_tail(&peer_req->dw.w.list, &device->done_ee);
141
142         /*
143          * Do not remove from the write_requests tree here: we did not send the
144          * Ack yet and did not wake possibly waiting conflicting requests.
145          * Removed from the tree from "drbd_process_done_ee" within the
146          * appropriate dw.cb (e_end_block/e_end_resync_block) or from
147          * _drbd_clear_done_ee.
148          */
149
150         do_wake = list_empty(block_id == ID_SYNCER ? &device->sync_ee : &device->active_ee);
151
152         if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
153                 __drbd_chk_io_error(device, DRBD_WRITE_ERROR);
154         spin_unlock_irqrestore(&device->resource->req_lock, flags);
155
156         if (block_id == ID_SYNCER)
157                 drbd_rs_complete_io(device, i.sector);
158
159         if (do_wake)
160                 wake_up(&device->ee_wait);
161
162         if (do_al_complete_io)
163                 drbd_al_complete_io(device, &i);
164
165         wake_asender(first_peer_device(device)->connection);
166         put_ldev(device);
167 }
168
169 /* writes on behalf of the partner, or resync writes,
170  * "submitted" by the receiver.
171  */
172 void drbd_peer_request_endio(struct bio *bio, int error)
173 {
174         struct drbd_peer_request *peer_req = bio->bi_private;
175         struct drbd_device *device = peer_req->dw.device;
176         int uptodate = bio_flagged(bio, BIO_UPTODATE);
177         int is_write = bio_data_dir(bio) == WRITE;
178
179         if (error && __ratelimit(&drbd_ratelimit_state))
180                 drbd_warn(device, "%s: error=%d s=%llus\n",
181                                 is_write ? "write" : "read", error,
182                                 (unsigned long long)peer_req->i.sector);
183         if (!error && !uptodate) {
184                 if (__ratelimit(&drbd_ratelimit_state))
185                         drbd_warn(device, "%s: setting error to -EIO s=%llus\n",
186                                         is_write ? "write" : "read",
187                                         (unsigned long long)peer_req->i.sector);
188                 /* strange behavior of some lower level drivers...
189                  * fail the request by clearing the uptodate flag,
190                  * but do not return any error?! */
191                 error = -EIO;
192         }
193
194         if (error)
195                 set_bit(__EE_WAS_ERROR, &peer_req->flags);
196
197         bio_put(bio); /* no need for the bio anymore */
198         if (atomic_dec_and_test(&peer_req->pending_bios)) {
199                 if (is_write)
200                         drbd_endio_write_sec_final(peer_req);
201                 else
202                         drbd_endio_read_sec_final(peer_req);
203         }
204 }
205
206 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
207  */
208 void drbd_request_endio(struct bio *bio, int error)
209 {
210         unsigned long flags;
211         struct drbd_request *req = bio->bi_private;
212         struct drbd_device *device = req->device;
213         struct bio_and_error m;
214         enum drbd_req_event what;
215         int uptodate = bio_flagged(bio, BIO_UPTODATE);
216
217         if (!error && !uptodate) {
218                 drbd_warn(device, "p %s: setting error to -EIO\n",
219                          bio_data_dir(bio) == WRITE ? "write" : "read");
220                 /* strange behavior of some lower level drivers...
221                  * fail the request by clearing the uptodate flag,
222                  * but do not return any error?! */
223                 error = -EIO;
224         }
225
226
227         /* If this request was aborted locally before,
228          * but now was completed "successfully",
229          * chances are that this caused arbitrary data corruption.
230          *
231          * "aborting" requests, or force-detaching the disk, is intended for
232          * completely blocked/hung local backing devices which do no longer
233          * complete requests at all, not even do error completions.  In this
234          * situation, usually a hard-reset and failover is the only way out.
235          *
236          * By "aborting", basically faking a local error-completion,
237          * we allow for a more graceful swichover by cleanly migrating services.
238          * Still the affected node has to be rebooted "soon".
239          *
240          * By completing these requests, we allow the upper layers to re-use
241          * the associated data pages.
242          *
243          * If later the local backing device "recovers", and now DMAs some data
244          * from disk into the original request pages, in the best case it will
245          * just put random data into unused pages; but typically it will corrupt
246          * meanwhile completely unrelated data, causing all sorts of damage.
247          *
248          * Which means delayed successful completion,
249          * especially for READ requests,
250          * is a reason to panic().
251          *
252          * We assume that a delayed *error* completion is OK,
253          * though we still will complain noisily about it.
254          */
255         if (unlikely(req->rq_state & RQ_LOCAL_ABORTED)) {
256                 if (__ratelimit(&drbd_ratelimit_state))
257                         drbd_emerg(device, "delayed completion of aborted local request; disk-timeout may be too aggressive\n");
258
259                 if (!error)
260                         panic("possible random memory corruption caused by delayed completion of aborted local request\n");
261         }
262
263         /* to avoid recursion in __req_mod */
264         if (unlikely(error)) {
265                 what = (bio_data_dir(bio) == WRITE)
266                         ? WRITE_COMPLETED_WITH_ERROR
267                         : (bio_rw(bio) == READ)
268                           ? READ_COMPLETED_WITH_ERROR
269                           : READ_AHEAD_COMPLETED_WITH_ERROR;
270         } else
271                 what = COMPLETED_OK;
272
273         bio_put(req->private_bio);
274         req->private_bio = ERR_PTR(error);
275
276         /* not req_mod(), we need irqsave here! */
277         spin_lock_irqsave(&device->resource->req_lock, flags);
278         __req_mod(req, what, &m);
279         spin_unlock_irqrestore(&device->resource->req_lock, flags);
280         put_ldev(device);
281
282         if (m.bio)
283                 complete_master_bio(device, &m);
284 }
285
286 void drbd_csum_ee(struct crypto_hash *tfm, struct drbd_peer_request *peer_req, void *digest)
287 {
288         struct hash_desc desc;
289         struct scatterlist sg;
290         struct page *page = peer_req->pages;
291         struct page *tmp;
292         unsigned len;
293
294         desc.tfm = tfm;
295         desc.flags = 0;
296
297         sg_init_table(&sg, 1);
298         crypto_hash_init(&desc);
299
300         while ((tmp = page_chain_next(page))) {
301                 /* all but the last page will be fully used */
302                 sg_set_page(&sg, page, PAGE_SIZE, 0);
303                 crypto_hash_update(&desc, &sg, sg.length);
304                 page = tmp;
305         }
306         /* and now the last, possibly only partially used page */
307         len = peer_req->i.size & (PAGE_SIZE - 1);
308         sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
309         crypto_hash_update(&desc, &sg, sg.length);
310         crypto_hash_final(&desc, digest);
311 }
312
313 void drbd_csum_bio(struct crypto_hash *tfm, struct bio *bio, void *digest)
314 {
315         struct hash_desc desc;
316         struct scatterlist sg;
317         struct bio_vec bvec;
318         struct bvec_iter iter;
319
320         desc.tfm = tfm;
321         desc.flags = 0;
322
323         sg_init_table(&sg, 1);
324         crypto_hash_init(&desc);
325
326         bio_for_each_segment(bvec, bio, iter) {
327                 sg_set_page(&sg, bvec.bv_page, bvec.bv_len, bvec.bv_offset);
328                 crypto_hash_update(&desc, &sg, sg.length);
329         }
330         crypto_hash_final(&desc, digest);
331 }
332
333 /* MAYBE merge common code with w_e_end_ov_req */
334 static int w_e_send_csum(struct drbd_work *w, int cancel)
335 {
336         struct drbd_device_work *dw = device_work(w);
337         struct drbd_peer_request *peer_req = container_of(dw, struct drbd_peer_request, dw);
338         struct drbd_device *device = dw->device;
339         int digest_size;
340         void *digest;
341         int err = 0;
342
343         if (unlikely(cancel))
344                 goto out;
345
346         if (unlikely((peer_req->flags & EE_WAS_ERROR) != 0))
347                 goto out;
348
349         digest_size = crypto_hash_digestsize(first_peer_device(device)->connection->csums_tfm);
350         digest = kmalloc(digest_size, GFP_NOIO);
351         if (digest) {
352                 sector_t sector = peer_req->i.sector;
353                 unsigned int size = peer_req->i.size;
354                 drbd_csum_ee(first_peer_device(device)->connection->csums_tfm, peer_req, digest);
355                 /* Free peer_req and pages before send.
356                  * In case we block on congestion, we could otherwise run into
357                  * some distributed deadlock, if the other side blocks on
358                  * congestion as well, because our receiver blocks in
359                  * drbd_alloc_pages due to pp_in_use > max_buffers. */
360                 drbd_free_peer_req(device, peer_req);
361                 peer_req = NULL;
362                 inc_rs_pending(device);
363                 err = drbd_send_drequest_csum(first_peer_device(device), sector, size,
364                                               digest, digest_size,
365                                               P_CSUM_RS_REQUEST);
366                 kfree(digest);
367         } else {
368                 drbd_err(device, "kmalloc() of digest failed.\n");
369                 err = -ENOMEM;
370         }
371
372 out:
373         if (peer_req)
374                 drbd_free_peer_req(device, peer_req);
375
376         if (unlikely(err))
377                 drbd_err(device, "drbd_send_drequest(..., csum) failed\n");
378         return err;
379 }
380
381 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
382
383 static int read_for_csum(struct drbd_peer_device *peer_device, sector_t sector, int size)
384 {
385         struct drbd_device *device = peer_device->device;
386         struct drbd_peer_request *peer_req;
387
388         if (!get_ldev(device))
389                 return -EIO;
390
391         if (drbd_rs_should_slow_down(device, sector))
392                 goto defer;
393
394         /* GFP_TRY, because if there is no memory available right now, this may
395          * be rescheduled for later. It is "only" background resync, after all. */
396         peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER /* unused */, sector,
397                                        size, GFP_TRY);
398         if (!peer_req)
399                 goto defer;
400
401         peer_req->dw.w.cb = w_e_send_csum;
402         spin_lock_irq(&device->resource->req_lock);
403         list_add(&peer_req->dw.w.list, &device->read_ee);
404         spin_unlock_irq(&device->resource->req_lock);
405
406         atomic_add(size >> 9, &device->rs_sect_ev);
407         if (drbd_submit_peer_request(device, peer_req, READ, DRBD_FAULT_RS_RD) == 0)
408                 return 0;
409
410         /* If it failed because of ENOMEM, retry should help.  If it failed
411          * because bio_add_page failed (probably broken lower level driver),
412          * retry may or may not help.
413          * If it does not, you may need to force disconnect. */
414         spin_lock_irq(&device->resource->req_lock);
415         list_del(&peer_req->dw.w.list);
416         spin_unlock_irq(&device->resource->req_lock);
417
418         drbd_free_peer_req(device, peer_req);
419 defer:
420         put_ldev(device);
421         return -EAGAIN;
422 }
423
424 int w_resync_timer(struct drbd_work *w, int cancel)
425 {
426         struct drbd_device *device =
427                 container_of(w, struct drbd_device, resync_work);
428
429         switch (device->state.conn) {
430         case C_VERIFY_S:
431                 w_make_ov_request(w, cancel);
432                 break;
433         case C_SYNC_TARGET:
434                 w_make_resync_request(w, cancel);
435                 break;
436         }
437
438         return 0;
439 }
440
441 void resync_timer_fn(unsigned long data)
442 {
443         struct drbd_device *device = (struct drbd_device *) data;
444
445         if (list_empty(&device->resync_work.list))
446                 drbd_queue_work(&first_peer_device(device)->connection->sender_work,
447                                 &device->resync_work);
448 }
449
450 static void fifo_set(struct fifo_buffer *fb, int value)
451 {
452         int i;
453
454         for (i = 0; i < fb->size; i++)
455                 fb->values[i] = value;
456 }
457
458 static int fifo_push(struct fifo_buffer *fb, int value)
459 {
460         int ov;
461
462         ov = fb->values[fb->head_index];
463         fb->values[fb->head_index++] = value;
464
465         if (fb->head_index >= fb->size)
466                 fb->head_index = 0;
467
468         return ov;
469 }
470
471 static void fifo_add_val(struct fifo_buffer *fb, int value)
472 {
473         int i;
474
475         for (i = 0; i < fb->size; i++)
476                 fb->values[i] += value;
477 }
478
479 struct fifo_buffer *fifo_alloc(int fifo_size)
480 {
481         struct fifo_buffer *fb;
482
483         fb = kzalloc(sizeof(struct fifo_buffer) + sizeof(int) * fifo_size, GFP_NOIO);
484         if (!fb)
485                 return NULL;
486
487         fb->head_index = 0;
488         fb->size = fifo_size;
489         fb->total = 0;
490
491         return fb;
492 }
493
494 static int drbd_rs_controller(struct drbd_device *device)
495 {
496         struct disk_conf *dc;
497         unsigned int sect_in;  /* Number of sectors that came in since the last turn */
498         unsigned int want;     /* The number of sectors we want in the proxy */
499         int req_sect; /* Number of sectors to request in this turn */
500         int correction; /* Number of sectors more we need in the proxy*/
501         int cps; /* correction per invocation of drbd_rs_controller() */
502         int steps; /* Number of time steps to plan ahead */
503         int curr_corr;
504         int max_sect;
505         struct fifo_buffer *plan;
506
507         sect_in = atomic_xchg(&device->rs_sect_in, 0); /* Number of sectors that came in */
508         device->rs_in_flight -= sect_in;
509
510         dc = rcu_dereference(device->ldev->disk_conf);
511         plan = rcu_dereference(device->rs_plan_s);
512
513         steps = plan->size; /* (dc->c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
514
515         if (device->rs_in_flight + sect_in == 0) { /* At start of resync */
516                 want = ((dc->resync_rate * 2 * SLEEP_TIME) / HZ) * steps;
517         } else { /* normal path */
518                 want = dc->c_fill_target ? dc->c_fill_target :
519                         sect_in * dc->c_delay_target * HZ / (SLEEP_TIME * 10);
520         }
521
522         correction = want - device->rs_in_flight - plan->total;
523
524         /* Plan ahead */
525         cps = correction / steps;
526         fifo_add_val(plan, cps);
527         plan->total += cps * steps;
528
529         /* What we do in this step */
530         curr_corr = fifo_push(plan, 0);
531         plan->total -= curr_corr;
532
533         req_sect = sect_in + curr_corr;
534         if (req_sect < 0)
535                 req_sect = 0;
536
537         max_sect = (dc->c_max_rate * 2 * SLEEP_TIME) / HZ;
538         if (req_sect > max_sect)
539                 req_sect = max_sect;
540
541         /*
542         drbd_warn(device, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
543                  sect_in, device->rs_in_flight, want, correction,
544                  steps, cps, device->rs_planed, curr_corr, req_sect);
545         */
546
547         return req_sect;
548 }
549
550 static int drbd_rs_number_requests(struct drbd_device *device)
551 {
552         int number;
553
554         rcu_read_lock();
555         if (rcu_dereference(device->rs_plan_s)->size) {
556                 number = drbd_rs_controller(device) >> (BM_BLOCK_SHIFT - 9);
557                 device->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
558         } else {
559                 device->c_sync_rate = rcu_dereference(device->ldev->disk_conf)->resync_rate;
560                 number = SLEEP_TIME * device->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
561         }
562         rcu_read_unlock();
563
564         /* ignore the amount of pending requests, the resync controller should
565          * throttle down to incoming reply rate soon enough anyways. */
566         return number;
567 }
568
569 int w_make_resync_request(struct drbd_work *w, int cancel)
570 {
571         struct drbd_device_work *dw = device_work(w);
572         struct drbd_device *device = dw->device;
573         unsigned long bit;
574         sector_t sector;
575         const sector_t capacity = drbd_get_capacity(device->this_bdev);
576         int max_bio_size;
577         int number, rollback_i, size;
578         int align, queued, sndbuf;
579         int i = 0;
580
581         if (unlikely(cancel))
582                 return 0;
583
584         if (device->rs_total == 0) {
585                 /* empty resync? */
586                 drbd_resync_finished(device);
587                 return 0;
588         }
589
590         if (!get_ldev(device)) {
591                 /* Since we only need to access device->rsync a
592                    get_ldev_if_state(device,D_FAILED) would be sufficient, but
593                    to continue resync with a broken disk makes no sense at
594                    all */
595                 drbd_err(device, "Disk broke down during resync!\n");
596                 return 0;
597         }
598
599         max_bio_size = queue_max_hw_sectors(device->rq_queue) << 9;
600         number = drbd_rs_number_requests(device);
601         if (number == 0)
602                 goto requeue;
603
604         for (i = 0; i < number; i++) {
605                 /* Stop generating RS requests, when half of the send buffer is filled */
606                 mutex_lock(&first_peer_device(device)->connection->data.mutex);
607                 if (first_peer_device(device)->connection->data.socket) {
608                         queued = first_peer_device(device)->connection->data.socket->sk->sk_wmem_queued;
609                         sndbuf = first_peer_device(device)->connection->data.socket->sk->sk_sndbuf;
610                 } else {
611                         queued = 1;
612                         sndbuf = 0;
613                 }
614                 mutex_unlock(&first_peer_device(device)->connection->data.mutex);
615                 if (queued > sndbuf / 2)
616                         goto requeue;
617
618 next_sector:
619                 size = BM_BLOCK_SIZE;
620                 bit  = drbd_bm_find_next(device, device->bm_resync_fo);
621
622                 if (bit == DRBD_END_OF_BITMAP) {
623                         device->bm_resync_fo = drbd_bm_bits(device);
624                         put_ldev(device);
625                         return 0;
626                 }
627
628                 sector = BM_BIT_TO_SECT(bit);
629
630                 if (drbd_rs_should_slow_down(device, sector) ||
631                     drbd_try_rs_begin_io(device, sector)) {
632                         device->bm_resync_fo = bit;
633                         goto requeue;
634                 }
635                 device->bm_resync_fo = bit + 1;
636
637                 if (unlikely(drbd_bm_test_bit(device, bit) == 0)) {
638                         drbd_rs_complete_io(device, sector);
639                         goto next_sector;
640                 }
641
642 #if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
643                 /* try to find some adjacent bits.
644                  * we stop if we have already the maximum req size.
645                  *
646                  * Additionally always align bigger requests, in order to
647                  * be prepared for all stripe sizes of software RAIDs.
648                  */
649                 align = 1;
650                 rollback_i = i;
651                 for (;;) {
652                         if (size + BM_BLOCK_SIZE > max_bio_size)
653                                 break;
654
655                         /* Be always aligned */
656                         if (sector & ((1<<(align+3))-1))
657                                 break;
658
659                         /* do not cross extent boundaries */
660                         if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
661                                 break;
662                         /* now, is it actually dirty, after all?
663                          * caution, drbd_bm_test_bit is tri-state for some
664                          * obscure reason; ( b == 0 ) would get the out-of-band
665                          * only accidentally right because of the "oddly sized"
666                          * adjustment below */
667                         if (drbd_bm_test_bit(device, bit+1) != 1)
668                                 break;
669                         bit++;
670                         size += BM_BLOCK_SIZE;
671                         if ((BM_BLOCK_SIZE << align) <= size)
672                                 align++;
673                         i++;
674                 }
675                 /* if we merged some,
676                  * reset the offset to start the next drbd_bm_find_next from */
677                 if (size > BM_BLOCK_SIZE)
678                         device->bm_resync_fo = bit + 1;
679 #endif
680
681                 /* adjust very last sectors, in case we are oddly sized */
682                 if (sector + (size>>9) > capacity)
683                         size = (capacity-sector)<<9;
684                 if (first_peer_device(device)->connection->agreed_pro_version >= 89 &&
685                     first_peer_device(device)->connection->csums_tfm) {
686                         switch (read_for_csum(first_peer_device(device), sector, size)) {
687                         case -EIO: /* Disk failure */
688                                 put_ldev(device);
689                                 return -EIO;
690                         case -EAGAIN: /* allocation failed, or ldev busy */
691                                 drbd_rs_complete_io(device, sector);
692                                 device->bm_resync_fo = BM_SECT_TO_BIT(sector);
693                                 i = rollback_i;
694                                 goto requeue;
695                         case 0:
696                                 /* everything ok */
697                                 break;
698                         default:
699                                 BUG();
700                         }
701                 } else {
702                         int err;
703
704                         inc_rs_pending(device);
705                         err = drbd_send_drequest(first_peer_device(device), P_RS_DATA_REQUEST,
706                                                  sector, size, ID_SYNCER);
707                         if (err) {
708                                 drbd_err(device, "drbd_send_drequest() failed, aborting...\n");
709                                 dec_rs_pending(device);
710                                 put_ldev(device);
711                                 return err;
712                         }
713                 }
714         }
715
716         if (device->bm_resync_fo >= drbd_bm_bits(device)) {
717                 /* last syncer _request_ was sent,
718                  * but the P_RS_DATA_REPLY not yet received.  sync will end (and
719                  * next sync group will resume), as soon as we receive the last
720                  * resync data block, and the last bit is cleared.
721                  * until then resync "work" is "inactive" ...
722                  */
723                 put_ldev(device);
724                 return 0;
725         }
726
727  requeue:
728         device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
729         mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
730         put_ldev(device);
731         return 0;
732 }
733
734 static int w_make_ov_request(struct drbd_work *w, int cancel)
735 {
736         struct drbd_device *device = device_work(w)->device;
737         int number, i, size;
738         sector_t sector;
739         const sector_t capacity = drbd_get_capacity(device->this_bdev);
740         bool stop_sector_reached = false;
741
742         if (unlikely(cancel))
743                 return 1;
744
745         number = drbd_rs_number_requests(device);
746
747         sector = device->ov_position;
748         for (i = 0; i < number; i++) {
749                 if (sector >= capacity)
750                         return 1;
751
752                 /* We check for "finished" only in the reply path:
753                  * w_e_end_ov_reply().
754                  * We need to send at least one request out. */
755                 stop_sector_reached = i > 0
756                         && verify_can_do_stop_sector(device)
757                         && sector >= device->ov_stop_sector;
758                 if (stop_sector_reached)
759                         break;
760
761                 size = BM_BLOCK_SIZE;
762
763                 if (drbd_rs_should_slow_down(device, sector) ||
764                     drbd_try_rs_begin_io(device, sector)) {
765                         device->ov_position = sector;
766                         goto requeue;
767                 }
768
769                 if (sector + (size>>9) > capacity)
770                         size = (capacity-sector)<<9;
771
772                 inc_rs_pending(device);
773                 if (drbd_send_ov_request(first_peer_device(device), sector, size)) {
774                         dec_rs_pending(device);
775                         return 0;
776                 }
777                 sector += BM_SECT_PER_BIT;
778         }
779         device->ov_position = sector;
780
781  requeue:
782         device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
783         if (i == 0 || !stop_sector_reached)
784                 mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
785         return 1;
786 }
787
788 int w_ov_finished(struct drbd_work *w, int cancel)
789 {
790         struct drbd_device_work *dw =
791                 container_of(w, struct drbd_device_work, w);
792         struct drbd_device *device = dw->device;
793         kfree(dw);
794         ov_out_of_sync_print(device);
795         drbd_resync_finished(device);
796
797         return 0;
798 }
799
800 static int w_resync_finished(struct drbd_work *w, int cancel)
801 {
802         struct drbd_device_work *dw =
803                 container_of(w, struct drbd_device_work, w);
804         struct drbd_device *device = dw->device;
805         kfree(dw);
806
807         drbd_resync_finished(device);
808
809         return 0;
810 }
811
812 static void ping_peer(struct drbd_device *device)
813 {
814         struct drbd_connection *connection = first_peer_device(device)->connection;
815
816         clear_bit(GOT_PING_ACK, &connection->flags);
817         request_ping(connection);
818         wait_event(connection->ping_wait,
819                    test_bit(GOT_PING_ACK, &connection->flags) || device->state.conn < C_CONNECTED);
820 }
821
822 int drbd_resync_finished(struct drbd_device *device)
823 {
824         unsigned long db, dt, dbdt;
825         unsigned long n_oos;
826         union drbd_state os, ns;
827         struct drbd_device_work *dw;
828         char *khelper_cmd = NULL;
829         int verify_done = 0;
830
831         /* Remove all elements from the resync LRU. Since future actions
832          * might set bits in the (main) bitmap, then the entries in the
833          * resync LRU would be wrong. */
834         if (drbd_rs_del_all(device)) {
835                 /* In case this is not possible now, most probably because
836                  * there are P_RS_DATA_REPLY Packets lingering on the worker's
837                  * queue (or even the read operations for those packets
838                  * is not finished by now).   Retry in 100ms. */
839
840                 schedule_timeout_interruptible(HZ / 10);
841                 dw = kmalloc(sizeof(struct drbd_device_work), GFP_ATOMIC);
842                 if (dw) {
843                         dw->w.cb = w_resync_finished;
844                         dw->device = device;
845                         drbd_queue_work(&first_peer_device(device)->connection->sender_work,
846                                         &dw->w);
847                         return 1;
848                 }
849                 drbd_err(device, "Warn failed to drbd_rs_del_all() and to kmalloc(dw).\n");
850         }
851
852         dt = (jiffies - device->rs_start - device->rs_paused) / HZ;
853         if (dt <= 0)
854                 dt = 1;
855
856         db = device->rs_total;
857         /* adjust for verify start and stop sectors, respective reached position */
858         if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
859                 db -= device->ov_left;
860
861         dbdt = Bit2KB(db/dt);
862         device->rs_paused /= HZ;
863
864         if (!get_ldev(device))
865                 goto out;
866
867         ping_peer(device);
868
869         spin_lock_irq(&device->resource->req_lock);
870         os = drbd_read_state(device);
871
872         verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
873
874         /* This protects us against multiple calls (that can happen in the presence
875            of application IO), and against connectivity loss just before we arrive here. */
876         if (os.conn <= C_CONNECTED)
877                 goto out_unlock;
878
879         ns = os;
880         ns.conn = C_CONNECTED;
881
882         drbd_info(device, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
883              verify_done ? "Online verify" : "Resync",
884              dt + device->rs_paused, device->rs_paused, dbdt);
885
886         n_oos = drbd_bm_total_weight(device);
887
888         if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
889                 if (n_oos) {
890                         drbd_alert(device, "Online verify found %lu %dk block out of sync!\n",
891                               n_oos, Bit2KB(1));
892                         khelper_cmd = "out-of-sync";
893                 }
894         } else {
895                 D_ASSERT(device, (n_oos - device->rs_failed) == 0);
896
897                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
898                         khelper_cmd = "after-resync-target";
899
900                 if (first_peer_device(device)->connection->csums_tfm && device->rs_total) {
901                         const unsigned long s = device->rs_same_csum;
902                         const unsigned long t = device->rs_total;
903                         const int ratio =
904                                 (t == 0)     ? 0 :
905                         (t < 100000) ? ((s*100)/t) : (s/(t/100));
906                         drbd_info(device, "%u %% had equal checksums, eliminated: %luK; "
907                              "transferred %luK total %luK\n",
908                              ratio,
909                              Bit2KB(device->rs_same_csum),
910                              Bit2KB(device->rs_total - device->rs_same_csum),
911                              Bit2KB(device->rs_total));
912                 }
913         }
914
915         if (device->rs_failed) {
916                 drbd_info(device, "            %lu failed blocks\n", device->rs_failed);
917
918                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
919                         ns.disk = D_INCONSISTENT;
920                         ns.pdsk = D_UP_TO_DATE;
921                 } else {
922                         ns.disk = D_UP_TO_DATE;
923                         ns.pdsk = D_INCONSISTENT;
924                 }
925         } else {
926                 ns.disk = D_UP_TO_DATE;
927                 ns.pdsk = D_UP_TO_DATE;
928
929                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
930                         if (device->p_uuid) {
931                                 int i;
932                                 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
933                                         _drbd_uuid_set(device, i, device->p_uuid[i]);
934                                 drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_CURRENT]);
935                                 _drbd_uuid_set(device, UI_CURRENT, device->p_uuid[UI_CURRENT]);
936                         } else {
937                                 drbd_err(device, "device->p_uuid is NULL! BUG\n");
938                         }
939                 }
940
941                 if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
942                         /* for verify runs, we don't update uuids here,
943                          * so there would be nothing to report. */
944                         drbd_uuid_set_bm(device, 0UL);
945                         drbd_print_uuids(device, "updated UUIDs");
946                         if (device->p_uuid) {
947                                 /* Now the two UUID sets are equal, update what we
948                                  * know of the peer. */
949                                 int i;
950                                 for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
951                                         device->p_uuid[i] = device->ldev->md.uuid[i];
952                         }
953                 }
954         }
955
956         _drbd_set_state(device, ns, CS_VERBOSE, NULL);
957 out_unlock:
958         spin_unlock_irq(&device->resource->req_lock);
959         put_ldev(device);
960 out:
961         device->rs_total  = 0;
962         device->rs_failed = 0;
963         device->rs_paused = 0;
964
965         /* reset start sector, if we reached end of device */
966         if (verify_done && device->ov_left == 0)
967                 device->ov_start_sector = 0;
968
969         drbd_md_sync(device);
970
971         if (khelper_cmd)
972                 drbd_khelper(device, khelper_cmd);
973
974         return 1;
975 }
976
977 /* helper */
978 static void move_to_net_ee_or_free(struct drbd_device *device, struct drbd_peer_request *peer_req)
979 {
980         if (drbd_peer_req_has_active_page(peer_req)) {
981                 /* This might happen if sendpage() has not finished */
982                 int i = (peer_req->i.size + PAGE_SIZE -1) >> PAGE_SHIFT;
983                 atomic_add(i, &device->pp_in_use_by_net);
984                 atomic_sub(i, &device->pp_in_use);
985                 spin_lock_irq(&device->resource->req_lock);
986                 list_add_tail(&peer_req->dw.w.list, &device->net_ee);
987                 spin_unlock_irq(&device->resource->req_lock);
988                 wake_up(&drbd_pp_wait);
989         } else
990                 drbd_free_peer_req(device, peer_req);
991 }
992
993 /**
994  * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
995  * @device:     DRBD device.
996  * @w:          work object.
997  * @cancel:     The connection will be closed anyways
998  */
999 int w_e_end_data_req(struct drbd_work *w, int cancel)
1000 {
1001         struct drbd_device_work *dw = device_work(w);
1002         struct drbd_peer_request *peer_req = container_of(dw, struct drbd_peer_request, dw);
1003         struct drbd_device *device = dw->device;
1004         int err;
1005
1006         if (unlikely(cancel)) {
1007                 drbd_free_peer_req(device, peer_req);
1008                 dec_unacked(device);
1009                 return 0;
1010         }
1011
1012         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1013                 err = drbd_send_block(first_peer_device(device), P_DATA_REPLY, peer_req);
1014         } else {
1015                 if (__ratelimit(&drbd_ratelimit_state))
1016                         drbd_err(device, "Sending NegDReply. sector=%llus.\n",
1017                             (unsigned long long)peer_req->i.sector);
1018
1019                 err = drbd_send_ack(first_peer_device(device), P_NEG_DREPLY, peer_req);
1020         }
1021
1022         dec_unacked(device);
1023
1024         move_to_net_ee_or_free(device, peer_req);
1025
1026         if (unlikely(err))
1027                 drbd_err(device, "drbd_send_block() failed\n");
1028         return err;
1029 }
1030
1031 /**
1032  * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUEST
1033  * @w:          work object.
1034  * @cancel:     The connection will be closed anyways
1035  */
1036 int w_e_end_rsdata_req(struct drbd_work *w, int cancel)
1037 {
1038         struct drbd_device_work *dw = device_work(w);
1039         struct drbd_peer_request *peer_req = container_of(dw, struct drbd_peer_request, dw);
1040         struct drbd_device *device = dw->device;
1041         int err;
1042
1043         if (unlikely(cancel)) {
1044                 drbd_free_peer_req(device, peer_req);
1045                 dec_unacked(device);
1046                 return 0;
1047         }
1048
1049         if (get_ldev_if_state(device, D_FAILED)) {
1050                 drbd_rs_complete_io(device, peer_req->i.sector);
1051                 put_ldev(device);
1052         }
1053
1054         if (device->state.conn == C_AHEAD) {
1055                 err = drbd_send_ack(first_peer_device(device), P_RS_CANCEL, peer_req);
1056         } else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1057                 if (likely(device->state.pdsk >= D_INCONSISTENT)) {
1058                         inc_rs_pending(device);
1059                         err = drbd_send_block(first_peer_device(device), P_RS_DATA_REPLY, peer_req);
1060                 } else {
1061                         if (__ratelimit(&drbd_ratelimit_state))
1062                                 drbd_err(device, "Not sending RSDataReply, "
1063                                     "partner DISKLESS!\n");
1064                         err = 0;
1065                 }
1066         } else {
1067                 if (__ratelimit(&drbd_ratelimit_state))
1068                         drbd_err(device, "Sending NegRSDReply. sector %llus.\n",
1069                             (unsigned long long)peer_req->i.sector);
1070
1071                 err = drbd_send_ack(first_peer_device(device), P_NEG_RS_DREPLY, peer_req);
1072
1073                 /* update resync data with failure */
1074                 drbd_rs_failed_io(device, peer_req->i.sector, peer_req->i.size);
1075         }
1076
1077         dec_unacked(device);
1078
1079         move_to_net_ee_or_free(device, peer_req);
1080
1081         if (unlikely(err))
1082                 drbd_err(device, "drbd_send_block() failed\n");
1083         return err;
1084 }
1085
1086 int w_e_end_csum_rs_req(struct drbd_work *w, int cancel)
1087 {
1088         struct drbd_device_work *dw = device_work(w);
1089         struct drbd_peer_request *peer_req = container_of(dw, struct drbd_peer_request, dw);
1090         struct drbd_device *device = dw->device;
1091         struct digest_info *di;
1092         int digest_size;
1093         void *digest = NULL;
1094         int err, eq = 0;
1095
1096         if (unlikely(cancel)) {
1097                 drbd_free_peer_req(device, peer_req);
1098                 dec_unacked(device);
1099                 return 0;
1100         }
1101
1102         if (get_ldev(device)) {
1103                 drbd_rs_complete_io(device, peer_req->i.sector);
1104                 put_ldev(device);
1105         }
1106
1107         di = peer_req->digest;
1108
1109         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1110                 /* quick hack to try to avoid a race against reconfiguration.
1111                  * a real fix would be much more involved,
1112                  * introducing more locking mechanisms */
1113                 if (first_peer_device(device)->connection->csums_tfm) {
1114                         digest_size = crypto_hash_digestsize(first_peer_device(device)->connection->csums_tfm);
1115                         D_ASSERT(device, digest_size == di->digest_size);
1116                         digest = kmalloc(digest_size, GFP_NOIO);
1117                 }
1118                 if (digest) {
1119                         drbd_csum_ee(first_peer_device(device)->connection->csums_tfm, peer_req, digest);
1120                         eq = !memcmp(digest, di->digest, digest_size);
1121                         kfree(digest);
1122                 }
1123
1124                 if (eq) {
1125                         drbd_set_in_sync(device, peer_req->i.sector, peer_req->i.size);
1126                         /* rs_same_csums unit is BM_BLOCK_SIZE */
1127                         device->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT;
1128                         err = drbd_send_ack(first_peer_device(device), P_RS_IS_IN_SYNC, peer_req);
1129                 } else {
1130                         inc_rs_pending(device);
1131                         peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1132                         peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */
1133                         kfree(di);
1134                         err = drbd_send_block(first_peer_device(device), P_RS_DATA_REPLY, peer_req);
1135                 }
1136         } else {
1137                 err = drbd_send_ack(first_peer_device(device), P_NEG_RS_DREPLY, peer_req);
1138                 if (__ratelimit(&drbd_ratelimit_state))
1139                         drbd_err(device, "Sending NegDReply. I guess it gets messy.\n");
1140         }
1141
1142         dec_unacked(device);
1143         move_to_net_ee_or_free(device, peer_req);
1144
1145         if (unlikely(err))
1146                 drbd_err(device, "drbd_send_block/ack() failed\n");
1147         return err;
1148 }
1149
1150 int w_e_end_ov_req(struct drbd_work *w, int cancel)
1151 {
1152         struct drbd_device_work *dw = device_work(w);
1153         struct drbd_peer_request *peer_req = container_of(dw, struct drbd_peer_request, dw);
1154         struct drbd_device *device = dw->device;
1155         sector_t sector = peer_req->i.sector;
1156         unsigned int size = peer_req->i.size;
1157         int digest_size;
1158         void *digest;
1159         int err = 0;
1160
1161         if (unlikely(cancel))
1162                 goto out;
1163
1164         digest_size = crypto_hash_digestsize(first_peer_device(device)->connection->verify_tfm);
1165         digest = kmalloc(digest_size, GFP_NOIO);
1166         if (!digest) {
1167                 err = 1;        /* terminate the connection in case the allocation failed */
1168                 goto out;
1169         }
1170
1171         if (likely(!(peer_req->flags & EE_WAS_ERROR)))
1172                 drbd_csum_ee(first_peer_device(device)->connection->verify_tfm, peer_req, digest);
1173         else
1174                 memset(digest, 0, digest_size);
1175
1176         /* Free e and pages before send.
1177          * In case we block on congestion, we could otherwise run into
1178          * some distributed deadlock, if the other side blocks on
1179          * congestion as well, because our receiver blocks in
1180          * drbd_alloc_pages due to pp_in_use > max_buffers. */
1181         drbd_free_peer_req(device, peer_req);
1182         peer_req = NULL;
1183         inc_rs_pending(device);
1184         err = drbd_send_drequest_csum(first_peer_device(device), sector, size, digest, digest_size, P_OV_REPLY);
1185         if (err)
1186                 dec_rs_pending(device);
1187         kfree(digest);
1188
1189 out:
1190         if (peer_req)
1191                 drbd_free_peer_req(device, peer_req);
1192         dec_unacked(device);
1193         return err;
1194 }
1195
1196 void drbd_ov_out_of_sync_found(struct drbd_device *device, sector_t sector, int size)
1197 {
1198         if (device->ov_last_oos_start + device->ov_last_oos_size == sector) {
1199                 device->ov_last_oos_size += size>>9;
1200         } else {
1201                 device->ov_last_oos_start = sector;
1202                 device->ov_last_oos_size = size>>9;
1203         }
1204         drbd_set_out_of_sync(device, sector, size);
1205 }
1206
1207 int w_e_end_ov_reply(struct drbd_work *w, int cancel)
1208 {
1209         struct drbd_device_work *dw = device_work(w);
1210         struct drbd_peer_request *peer_req = container_of(dw, struct drbd_peer_request, dw);
1211         struct drbd_device *device = dw->device;
1212         struct digest_info *di;
1213         void *digest;
1214         sector_t sector = peer_req->i.sector;
1215         unsigned int size = peer_req->i.size;
1216         int digest_size;
1217         int err, eq = 0;
1218         bool stop_sector_reached = false;
1219
1220         if (unlikely(cancel)) {
1221                 drbd_free_peer_req(device, peer_req);
1222                 dec_unacked(device);
1223                 return 0;
1224         }
1225
1226         /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1227          * the resync lru has been cleaned up already */
1228         if (get_ldev(device)) {
1229                 drbd_rs_complete_io(device, peer_req->i.sector);
1230                 put_ldev(device);
1231         }
1232
1233         di = peer_req->digest;
1234
1235         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1236                 digest_size = crypto_hash_digestsize(first_peer_device(device)->connection->verify_tfm);
1237                 digest = kmalloc(digest_size, GFP_NOIO);
1238                 if (digest) {
1239                         drbd_csum_ee(first_peer_device(device)->connection->verify_tfm, peer_req, digest);
1240
1241                         D_ASSERT(device, digest_size == di->digest_size);
1242                         eq = !memcmp(digest, di->digest, digest_size);
1243                         kfree(digest);
1244                 }
1245         }
1246
1247         /* Free peer_req and pages before send.
1248          * In case we block on congestion, we could otherwise run into
1249          * some distributed deadlock, if the other side blocks on
1250          * congestion as well, because our receiver blocks in
1251          * drbd_alloc_pages due to pp_in_use > max_buffers. */
1252         drbd_free_peer_req(device, peer_req);
1253         if (!eq)
1254                 drbd_ov_out_of_sync_found(device, sector, size);
1255         else
1256                 ov_out_of_sync_print(device);
1257
1258         err = drbd_send_ack_ex(first_peer_device(device), P_OV_RESULT, sector, size,
1259                                eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1260
1261         dec_unacked(device);
1262
1263         --device->ov_left;
1264
1265         /* let's advance progress step marks only for every other megabyte */
1266         if ((device->ov_left & 0x200) == 0x200)
1267                 drbd_advance_rs_marks(device, device->ov_left);
1268
1269         stop_sector_reached = verify_can_do_stop_sector(device) &&
1270                 (sector + (size>>9)) >= device->ov_stop_sector;
1271
1272         if (device->ov_left == 0 || stop_sector_reached) {
1273                 ov_out_of_sync_print(device);
1274                 drbd_resync_finished(device);
1275         }
1276
1277         return err;
1278 }
1279
1280 /* FIXME
1281  * We need to track the number of pending barrier acks,
1282  * and to be able to wait for them.
1283  * See also comment in drbd_adm_attach before drbd_suspend_io.
1284  */
1285 static int drbd_send_barrier(struct drbd_connection *connection)
1286 {
1287         struct p_barrier *p;
1288         struct drbd_socket *sock;
1289
1290         sock = &connection->data;
1291         p = conn_prepare_command(connection, sock);
1292         if (!p)
1293                 return -EIO;
1294         p->barrier = connection->send.current_epoch_nr;
1295         p->pad = 0;
1296         connection->send.current_epoch_writes = 0;
1297
1298         return conn_send_command(connection, sock, P_BARRIER, sizeof(*p), NULL, 0);
1299 }
1300
1301 int w_send_write_hint(struct drbd_work *w, int cancel)
1302 {
1303         struct drbd_device *device =
1304                 container_of(w, struct drbd_device, unplug_work);
1305         struct drbd_socket *sock;
1306
1307         if (cancel)
1308                 return 0;
1309         sock = &first_peer_device(device)->connection->data;
1310         if (!drbd_prepare_command(first_peer_device(device), sock))
1311                 return -EIO;
1312         return drbd_send_command(first_peer_device(device), sock, P_UNPLUG_REMOTE, 0, NULL, 0);
1313 }
1314
1315 static void re_init_if_first_write(struct drbd_connection *connection, unsigned int epoch)
1316 {
1317         if (!connection->send.seen_any_write_yet) {
1318                 connection->send.seen_any_write_yet = true;
1319                 connection->send.current_epoch_nr = epoch;
1320                 connection->send.current_epoch_writes = 0;
1321         }
1322 }
1323
1324 static void maybe_send_barrier(struct drbd_connection *connection, unsigned int epoch)
1325 {
1326         /* re-init if first write on this connection */
1327         if (!connection->send.seen_any_write_yet)
1328                 return;
1329         if (connection->send.current_epoch_nr != epoch) {
1330                 if (connection->send.current_epoch_writes)
1331                         drbd_send_barrier(connection);
1332                 connection->send.current_epoch_nr = epoch;
1333         }
1334 }
1335
1336 int w_send_out_of_sync(struct drbd_work *w, int cancel)
1337 {
1338         struct drbd_request *req = container_of(w, struct drbd_request, w);
1339         struct drbd_device *device = req->device;
1340         struct drbd_connection *connection = first_peer_device(device)->connection;
1341         int err;
1342
1343         if (unlikely(cancel)) {
1344                 req_mod(req, SEND_CANCELED);
1345                 return 0;
1346         }
1347
1348         /* this time, no connection->send.current_epoch_writes++;
1349          * If it was sent, it was the closing barrier for the last
1350          * replicated epoch, before we went into AHEAD mode.
1351          * No more barriers will be sent, until we leave AHEAD mode again. */
1352         maybe_send_barrier(connection, req->epoch);
1353
1354         err = drbd_send_out_of_sync(first_peer_device(device), req);
1355         req_mod(req, OOS_HANDED_TO_NETWORK);
1356
1357         return err;
1358 }
1359
1360 /**
1361  * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1362  * @w:          work object.
1363  * @cancel:     The connection will be closed anyways
1364  */
1365 int w_send_dblock(struct drbd_work *w, int cancel)
1366 {
1367         struct drbd_request *req = container_of(w, struct drbd_request, w);
1368         struct drbd_device *device = req->device;
1369         struct drbd_connection *connection = first_peer_device(device)->connection;
1370         int err;
1371
1372         if (unlikely(cancel)) {
1373                 req_mod(req, SEND_CANCELED);
1374                 return 0;
1375         }
1376
1377         re_init_if_first_write(connection, req->epoch);
1378         maybe_send_barrier(connection, req->epoch);
1379         connection->send.current_epoch_writes++;
1380
1381         err = drbd_send_dblock(first_peer_device(device), req);
1382         req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1383
1384         return err;
1385 }
1386
1387 /**
1388  * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1389  * @w:          work object.
1390  * @cancel:     The connection will be closed anyways
1391  */
1392 int w_send_read_req(struct drbd_work *w, int cancel)
1393 {
1394         struct drbd_request *req = container_of(w, struct drbd_request, w);
1395         struct drbd_device *device = req->device;
1396         struct drbd_connection *connection = first_peer_device(device)->connection;
1397         int err;
1398
1399         if (unlikely(cancel)) {
1400                 req_mod(req, SEND_CANCELED);
1401                 return 0;
1402         }
1403
1404         /* Even read requests may close a write epoch,
1405          * if there was any yet. */
1406         maybe_send_barrier(connection, req->epoch);
1407
1408         err = drbd_send_drequest(first_peer_device(device), P_DATA_REQUEST, req->i.sector, req->i.size,
1409                                  (unsigned long)req);
1410
1411         req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1412
1413         return err;
1414 }
1415
1416 int w_restart_disk_io(struct drbd_work *w, int cancel)
1417 {
1418         struct drbd_request *req = container_of(w, struct drbd_request, w);
1419         struct drbd_device *device = req->device;
1420
1421         if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1422                 drbd_al_begin_io(device, &req->i, false);
1423
1424         drbd_req_make_private_bio(req, req->master_bio);
1425         req->private_bio->bi_bdev = device->ldev->backing_bdev;
1426         generic_make_request(req->private_bio);
1427
1428         return 0;
1429 }
1430
1431 static int _drbd_may_sync_now(struct drbd_device *device)
1432 {
1433         struct drbd_device *odev = device;
1434         int resync_after;
1435
1436         while (1) {
1437                 if (!odev->ldev || odev->state.disk == D_DISKLESS)
1438                         return 1;
1439                 rcu_read_lock();
1440                 resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1441                 rcu_read_unlock();
1442                 if (resync_after == -1)
1443                         return 1;
1444                 odev = minor_to_device(resync_after);
1445                 if (!odev)
1446                         return 1;
1447                 if ((odev->state.conn >= C_SYNC_SOURCE &&
1448                      odev->state.conn <= C_PAUSED_SYNC_T) ||
1449                     odev->state.aftr_isp || odev->state.peer_isp ||
1450                     odev->state.user_isp)
1451                         return 0;
1452         }
1453 }
1454
1455 /**
1456  * _drbd_pause_after() - Pause resync on all devices that may not resync now
1457  * @device:     DRBD device.
1458  *
1459  * Called from process context only (admin command and after_state_ch).
1460  */
1461 static int _drbd_pause_after(struct drbd_device *device)
1462 {
1463         struct drbd_device *odev;
1464         int i, rv = 0;
1465
1466         rcu_read_lock();
1467         idr_for_each_entry(&drbd_devices, odev, i) {
1468                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1469                         continue;
1470                 if (!_drbd_may_sync_now(odev))
1471                         rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1472                                != SS_NOTHING_TO_DO);
1473         }
1474         rcu_read_unlock();
1475
1476         return rv;
1477 }
1478
1479 /**
1480  * _drbd_resume_next() - Resume resync on all devices that may resync now
1481  * @device:     DRBD device.
1482  *
1483  * Called from process context only (admin command and worker).
1484  */
1485 static int _drbd_resume_next(struct drbd_device *device)
1486 {
1487         struct drbd_device *odev;
1488         int i, rv = 0;
1489
1490         rcu_read_lock();
1491         idr_for_each_entry(&drbd_devices, odev, i) {
1492                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1493                         continue;
1494                 if (odev->state.aftr_isp) {
1495                         if (_drbd_may_sync_now(odev))
1496                                 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1497                                                         CS_HARD, NULL)
1498                                        != SS_NOTHING_TO_DO) ;
1499                 }
1500         }
1501         rcu_read_unlock();
1502         return rv;
1503 }
1504
1505 void resume_next_sg(struct drbd_device *device)
1506 {
1507         write_lock_irq(&global_state_lock);
1508         _drbd_resume_next(device);
1509         write_unlock_irq(&global_state_lock);
1510 }
1511
1512 void suspend_other_sg(struct drbd_device *device)
1513 {
1514         write_lock_irq(&global_state_lock);
1515         _drbd_pause_after(device);
1516         write_unlock_irq(&global_state_lock);
1517 }
1518
1519 /* caller must hold global_state_lock */
1520 enum drbd_ret_code drbd_resync_after_valid(struct drbd_device *device, int o_minor)
1521 {
1522         struct drbd_device *odev;
1523         int resync_after;
1524
1525         if (o_minor == -1)
1526                 return NO_ERROR;
1527         if (o_minor < -1 || o_minor > MINORMASK)
1528                 return ERR_RESYNC_AFTER;
1529
1530         /* check for loops */
1531         odev = minor_to_device(o_minor);
1532         while (1) {
1533                 if (odev == device)
1534                         return ERR_RESYNC_AFTER_CYCLE;
1535
1536                 /* You are free to depend on diskless, non-existing,
1537                  * or not yet/no longer existing minors.
1538                  * We only reject dependency loops.
1539                  * We cannot follow the dependency chain beyond a detached or
1540                  * missing minor.
1541                  */
1542                 if (!odev || !odev->ldev || odev->state.disk == D_DISKLESS)
1543                         return NO_ERROR;
1544
1545                 rcu_read_lock();
1546                 resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1547                 rcu_read_unlock();
1548                 /* dependency chain ends here, no cycles. */
1549                 if (resync_after == -1)
1550                         return NO_ERROR;
1551
1552                 /* follow the dependency chain */
1553                 odev = minor_to_device(resync_after);
1554         }
1555 }
1556
1557 /* caller must hold global_state_lock */
1558 void drbd_resync_after_changed(struct drbd_device *device)
1559 {
1560         int changes;
1561
1562         do {
1563                 changes  = _drbd_pause_after(device);
1564                 changes |= _drbd_resume_next(device);
1565         } while (changes);
1566 }
1567
1568 void drbd_rs_controller_reset(struct drbd_device *device)
1569 {
1570         struct fifo_buffer *plan;
1571
1572         atomic_set(&device->rs_sect_in, 0);
1573         atomic_set(&device->rs_sect_ev, 0);
1574         device->rs_in_flight = 0;
1575
1576         /* Updating the RCU protected object in place is necessary since
1577            this function gets called from atomic context.
1578            It is valid since all other updates also lead to an completely
1579            empty fifo */
1580         rcu_read_lock();
1581         plan = rcu_dereference(device->rs_plan_s);
1582         plan->total = 0;
1583         fifo_set(plan, 0);
1584         rcu_read_unlock();
1585 }
1586
1587 void start_resync_timer_fn(unsigned long data)
1588 {
1589         struct drbd_device *device = (struct drbd_device *) data;
1590
1591         drbd_queue_work(&first_peer_device(device)->connection->sender_work,
1592                         &device->start_resync_work);
1593 }
1594
1595 int w_start_resync(struct drbd_work *w, int cancel)
1596 {
1597         struct drbd_device *device =
1598                 container_of(w, struct drbd_device, start_resync_work);
1599
1600         if (atomic_read(&device->unacked_cnt) || atomic_read(&device->rs_pending_cnt)) {
1601                 drbd_warn(device, "w_start_resync later...\n");
1602                 device->start_resync_timer.expires = jiffies + HZ/10;
1603                 add_timer(&device->start_resync_timer);
1604                 return 0;
1605         }
1606
1607         drbd_start_resync(device, C_SYNC_SOURCE);
1608         clear_bit(AHEAD_TO_SYNC_SOURCE, &device->flags);
1609         return 0;
1610 }
1611
1612 /**
1613  * drbd_start_resync() - Start the resync process
1614  * @device:     DRBD device.
1615  * @side:       Either C_SYNC_SOURCE or C_SYNC_TARGET
1616  *
1617  * This function might bring you directly into one of the
1618  * C_PAUSED_SYNC_* states.
1619  */
1620 void drbd_start_resync(struct drbd_device *device, enum drbd_conns side)
1621 {
1622         union drbd_state ns;
1623         int r;
1624
1625         if (device->state.conn >= C_SYNC_SOURCE && device->state.conn < C_AHEAD) {
1626                 drbd_err(device, "Resync already running!\n");
1627                 return;
1628         }
1629
1630         if (!test_bit(B_RS_H_DONE, &device->flags)) {
1631                 if (side == C_SYNC_TARGET) {
1632                         /* Since application IO was locked out during C_WF_BITMAP_T and
1633                            C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1634                            we check that we might make the data inconsistent. */
1635                         r = drbd_khelper(device, "before-resync-target");
1636                         r = (r >> 8) & 0xff;
1637                         if (r > 0) {
1638                                 drbd_info(device, "before-resync-target handler returned %d, "
1639                                          "dropping connection.\n", r);
1640                                 conn_request_state(first_peer_device(device)->connection, NS(conn, C_DISCONNECTING), CS_HARD);
1641                                 return;
1642                         }
1643                 } else /* C_SYNC_SOURCE */ {
1644                         r = drbd_khelper(device, "before-resync-source");
1645                         r = (r >> 8) & 0xff;
1646                         if (r > 0) {
1647                                 if (r == 3) {
1648                                         drbd_info(device, "before-resync-source handler returned %d, "
1649                                                  "ignoring. Old userland tools?", r);
1650                                 } else {
1651                                         drbd_info(device, "before-resync-source handler returned %d, "
1652                                                  "dropping connection.\n", r);
1653                                         conn_request_state(first_peer_device(device)->connection,
1654                                                            NS(conn, C_DISCONNECTING), CS_HARD);
1655                                         return;
1656                                 }
1657                         }
1658                 }
1659         }
1660
1661         if (current == first_peer_device(device)->connection->worker.task) {
1662                 /* The worker should not sleep waiting for state_mutex,
1663                    that can take long */
1664                 if (!mutex_trylock(device->state_mutex)) {
1665                         set_bit(B_RS_H_DONE, &device->flags);
1666                         device->start_resync_timer.expires = jiffies + HZ/5;
1667                         add_timer(&device->start_resync_timer);
1668                         return;
1669                 }
1670         } else {
1671                 mutex_lock(device->state_mutex);
1672         }
1673         clear_bit(B_RS_H_DONE, &device->flags);
1674
1675         write_lock_irq(&global_state_lock);
1676         /* Did some connection breakage or IO error race with us? */
1677         if (device->state.conn < C_CONNECTED
1678         || !get_ldev_if_state(device, D_NEGOTIATING)) {
1679                 write_unlock_irq(&global_state_lock);
1680                 mutex_unlock(device->state_mutex);
1681                 return;
1682         }
1683
1684         ns = drbd_read_state(device);
1685
1686         ns.aftr_isp = !_drbd_may_sync_now(device);
1687
1688         ns.conn = side;
1689
1690         if (side == C_SYNC_TARGET)
1691                 ns.disk = D_INCONSISTENT;
1692         else /* side == C_SYNC_SOURCE */
1693                 ns.pdsk = D_INCONSISTENT;
1694
1695         r = __drbd_set_state(device, ns, CS_VERBOSE, NULL);
1696         ns = drbd_read_state(device);
1697
1698         if (ns.conn < C_CONNECTED)
1699                 r = SS_UNKNOWN_ERROR;
1700
1701         if (r == SS_SUCCESS) {
1702                 unsigned long tw = drbd_bm_total_weight(device);
1703                 unsigned long now = jiffies;
1704                 int i;
1705
1706                 device->rs_failed    = 0;
1707                 device->rs_paused    = 0;
1708                 device->rs_same_csum = 0;
1709                 device->rs_last_events = 0;
1710                 device->rs_last_sect_ev = 0;
1711                 device->rs_total     = tw;
1712                 device->rs_start     = now;
1713                 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1714                         device->rs_mark_left[i] = tw;
1715                         device->rs_mark_time[i] = now;
1716                 }
1717                 _drbd_pause_after(device);
1718         }
1719         write_unlock_irq(&global_state_lock);
1720
1721         if (r == SS_SUCCESS) {
1722                 /* reset rs_last_bcast when a resync or verify is started,
1723                  * to deal with potential jiffies wrap. */
1724                 device->rs_last_bcast = jiffies - HZ;
1725
1726                 drbd_info(device, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1727                      drbd_conn_str(ns.conn),
1728                      (unsigned long) device->rs_total << (BM_BLOCK_SHIFT-10),
1729                      (unsigned long) device->rs_total);
1730                 if (side == C_SYNC_TARGET)
1731                         device->bm_resync_fo = 0;
1732
1733                 /* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1734                  * with w_send_oos, or the sync target will get confused as to
1735                  * how much bits to resync.  We cannot do that always, because for an
1736                  * empty resync and protocol < 95, we need to do it here, as we call
1737                  * drbd_resync_finished from here in that case.
1738                  * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1739                  * and from after_state_ch otherwise. */
1740                 if (side == C_SYNC_SOURCE &&
1741                     first_peer_device(device)->connection->agreed_pro_version < 96)
1742                         drbd_gen_and_send_sync_uuid(first_peer_device(device));
1743
1744                 if (first_peer_device(device)->connection->agreed_pro_version < 95 &&
1745                     device->rs_total == 0) {
1746                         /* This still has a race (about when exactly the peers
1747                          * detect connection loss) that can lead to a full sync
1748                          * on next handshake. In 8.3.9 we fixed this with explicit
1749                          * resync-finished notifications, but the fix
1750                          * introduces a protocol change.  Sleeping for some
1751                          * time longer than the ping interval + timeout on the
1752                          * SyncSource, to give the SyncTarget the chance to
1753                          * detect connection loss, then waiting for a ping
1754                          * response (implicit in drbd_resync_finished) reduces
1755                          * the race considerably, but does not solve it. */
1756                         if (side == C_SYNC_SOURCE) {
1757                                 struct net_conf *nc;
1758                                 int timeo;
1759
1760                                 rcu_read_lock();
1761                                 nc = rcu_dereference(first_peer_device(device)->connection->net_conf);
1762                                 timeo = nc->ping_int * HZ + nc->ping_timeo * HZ / 9;
1763                                 rcu_read_unlock();
1764                                 schedule_timeout_interruptible(timeo);
1765                         }
1766                         drbd_resync_finished(device);
1767                 }
1768
1769                 drbd_rs_controller_reset(device);
1770                 /* ns.conn may already be != device->state.conn,
1771                  * we may have been paused in between, or become paused until
1772                  * the timer triggers.
1773                  * No matter, that is handled in resync_timer_fn() */
1774                 if (ns.conn == C_SYNC_TARGET)
1775                         mod_timer(&device->resync_timer, jiffies);
1776
1777                 drbd_md_sync(device);
1778         }
1779         put_ldev(device);
1780         mutex_unlock(device->state_mutex);
1781 }
1782
1783 /* If the resource already closed the current epoch, but we did not
1784  * (because we have not yet seen new requests), we should send the
1785  * corresponding barrier now.  Must be checked within the same spinlock
1786  * that is used to check for new requests. */
1787 static bool need_to_send_barrier(struct drbd_connection *connection)
1788 {
1789         if (!connection->send.seen_any_write_yet)
1790                 return false;
1791
1792         /* Skip barriers that do not contain any writes.
1793          * This may happen during AHEAD mode. */
1794         if (!connection->send.current_epoch_writes)
1795                 return false;
1796
1797         /* ->req_lock is held when requests are queued on
1798          * connection->sender_work, and put into ->transfer_log.
1799          * It is also held when ->current_tle_nr is increased.
1800          * So either there are already new requests queued,
1801          * and corresponding barriers will be send there.
1802          * Or nothing new is queued yet, so the difference will be 1.
1803          */
1804         if (atomic_read(&connection->current_tle_nr) !=
1805             connection->send.current_epoch_nr + 1)
1806                 return false;
1807
1808         return true;
1809 }
1810
1811 static bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list)
1812 {
1813         spin_lock_irq(&queue->q_lock);
1814         list_splice_init(&queue->q, work_list);
1815         spin_unlock_irq(&queue->q_lock);
1816         return !list_empty(work_list);
1817 }
1818
1819 static bool dequeue_work_item(struct drbd_work_queue *queue, struct list_head *work_list)
1820 {
1821         spin_lock_irq(&queue->q_lock);
1822         if (!list_empty(&queue->q))
1823                 list_move(queue->q.next, work_list);
1824         spin_unlock_irq(&queue->q_lock);
1825         return !list_empty(work_list);
1826 }
1827
1828 static void wait_for_work(struct drbd_connection *connection, struct list_head *work_list)
1829 {
1830         DEFINE_WAIT(wait);
1831         struct net_conf *nc;
1832         int uncork, cork;
1833
1834         dequeue_work_item(&connection->sender_work, work_list);
1835         if (!list_empty(work_list))
1836                 return;
1837
1838         /* Still nothing to do?
1839          * Maybe we still need to close the current epoch,
1840          * even if no new requests are queued yet.
1841          *
1842          * Also, poke TCP, just in case.
1843          * Then wait for new work (or signal). */
1844         rcu_read_lock();
1845         nc = rcu_dereference(connection->net_conf);
1846         uncork = nc ? nc->tcp_cork : 0;
1847         rcu_read_unlock();
1848         if (uncork) {
1849                 mutex_lock(&connection->data.mutex);
1850                 if (connection->data.socket)
1851                         drbd_tcp_uncork(connection->data.socket);
1852                 mutex_unlock(&connection->data.mutex);
1853         }
1854
1855         for (;;) {
1856                 int send_barrier;
1857                 prepare_to_wait(&connection->sender_work.q_wait, &wait, TASK_INTERRUPTIBLE);
1858                 spin_lock_irq(&connection->resource->req_lock);
1859                 spin_lock(&connection->sender_work.q_lock);     /* FIXME get rid of this one? */
1860                 /* dequeue single item only,
1861                  * we still use drbd_queue_work_front() in some places */
1862                 if (!list_empty(&connection->sender_work.q))
1863                         list_move(connection->sender_work.q.next, work_list);
1864                 spin_unlock(&connection->sender_work.q_lock);   /* FIXME get rid of this one? */
1865                 if (!list_empty(work_list) || signal_pending(current)) {
1866                         spin_unlock_irq(&connection->resource->req_lock);
1867                         break;
1868                 }
1869                 send_barrier = need_to_send_barrier(connection);
1870                 spin_unlock_irq(&connection->resource->req_lock);
1871                 if (send_barrier) {
1872                         drbd_send_barrier(connection);
1873                         connection->send.current_epoch_nr++;
1874                 }
1875                 schedule();
1876                 /* may be woken up for other things but new work, too,
1877                  * e.g. if the current epoch got closed.
1878                  * In which case we send the barrier above. */
1879         }
1880         finish_wait(&connection->sender_work.q_wait, &wait);
1881
1882         /* someone may have changed the config while we have been waiting above. */
1883         rcu_read_lock();
1884         nc = rcu_dereference(connection->net_conf);
1885         cork = nc ? nc->tcp_cork : 0;
1886         rcu_read_unlock();
1887         mutex_lock(&connection->data.mutex);
1888         if (connection->data.socket) {
1889                 if (cork)
1890                         drbd_tcp_cork(connection->data.socket);
1891                 else if (!uncork)
1892                         drbd_tcp_uncork(connection->data.socket);
1893         }
1894         mutex_unlock(&connection->data.mutex);
1895 }
1896
1897 int drbd_worker(struct drbd_thread *thi)
1898 {
1899         struct drbd_connection *connection = thi->connection;
1900         struct drbd_device_work *dw = NULL;
1901         struct drbd_peer_device *peer_device;
1902         LIST_HEAD(work_list);
1903         int vnr;
1904
1905         while (get_t_state(thi) == RUNNING) {
1906                 drbd_thread_current_set_cpu(thi);
1907
1908                 /* as long as we use drbd_queue_work_front(),
1909                  * we may only dequeue single work items here, not batches. */
1910                 if (list_empty(&work_list))
1911                         wait_for_work(connection, &work_list);
1912
1913                 if (signal_pending(current)) {
1914                         flush_signals(current);
1915                         if (get_t_state(thi) == RUNNING) {
1916                                 drbd_warn(connection, "Worker got an unexpected signal\n");
1917                                 continue;
1918                         }
1919                         break;
1920                 }
1921
1922                 if (get_t_state(thi) != RUNNING)
1923                         break;
1924
1925                 while (!list_empty(&work_list)) {
1926                         dw = list_first_entry(&work_list, struct drbd_device_work, w.list);
1927                         list_del_init(&dw->w.list);
1928                         if (dw->w.cb(&dw->w, connection->cstate < C_WF_REPORT_PARAMS) == 0)
1929                                 continue;
1930                         if (connection->cstate >= C_WF_REPORT_PARAMS)
1931                                 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
1932                 }
1933         }
1934
1935         do {
1936                 while (!list_empty(&work_list)) {
1937                         dw = list_first_entry(&work_list, struct drbd_device_work, w.list);
1938                         list_del_init(&dw->w.list);
1939                         dw->w.cb(&dw->w, 1);
1940                 }
1941                 dequeue_work_batch(&connection->sender_work, &work_list);
1942         } while (!list_empty(&work_list));
1943
1944         rcu_read_lock();
1945         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1946                 struct drbd_device *device = peer_device->device;
1947                 D_ASSERT(device, device->state.disk == D_DISKLESS && device->state.conn == C_STANDALONE);
1948                 kref_get(&device->kref);
1949                 rcu_read_unlock();
1950                 drbd_device_cleanup(device);
1951                 kref_put(&device->kref, drbd_destroy_device);
1952                 rcu_read_lock();
1953         }
1954         rcu_read_unlock();
1955
1956         return 0;
1957 }