These changes are the raw update to linux-4.4.6-rt14. Kernel sources
[kvmfornfv.git] / kernel / drivers / block / drbd / drbd_worker.c
1 /*
2    drbd_worker.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23
24 */
25
26 #include <linux/module.h>
27 #include <linux/drbd.h>
28 #include <linux/sched.h>
29 #include <linux/wait.h>
30 #include <linux/mm.h>
31 #include <linux/memcontrol.h>
32 #include <linux/mm_inline.h>
33 #include <linux/slab.h>
34 #include <linux/random.h>
35 #include <linux/string.h>
36 #include <linux/scatterlist.h>
37
38 #include "drbd_int.h"
39 #include "drbd_protocol.h"
40 #include "drbd_req.h"
41
42 static int make_ov_request(struct drbd_device *, int);
43 static int make_resync_request(struct drbd_device *, int);
44
45 /* endio handlers:
46  *   drbd_md_endio (defined here)
47  *   drbd_request_endio (defined here)
48  *   drbd_peer_request_endio (defined here)
49  *   drbd_bm_endio (defined in drbd_bitmap.c)
50  *
51  * For all these callbacks, note the following:
52  * The callbacks will be called in irq context by the IDE drivers,
53  * and in Softirqs/Tasklets/BH context by the SCSI drivers.
54  * Try to get the locking right :)
55  *
56  */
57
58
59 /* About the global_state_lock
60    Each state transition on an device holds a read lock. In case we have
61    to evaluate the resync after dependencies, we grab a write lock, because
62    we need stable states on all devices for that.  */
63 rwlock_t global_state_lock;
64
65 /* used for synchronous meta data and bitmap IO
66  * submitted by drbd_md_sync_page_io()
67  */
68 void drbd_md_endio(struct bio *bio)
69 {
70         struct drbd_device *device;
71
72         device = bio->bi_private;
73         device->md_io.error = bio->bi_error;
74
75         /* We grabbed an extra reference in _drbd_md_sync_page_io() to be able
76          * to timeout on the lower level device, and eventually detach from it.
77          * If this io completion runs after that timeout expired, this
78          * drbd_md_put_buffer() may allow us to finally try and re-attach.
79          * During normal operation, this only puts that extra reference
80          * down to 1 again.
81          * Make sure we first drop the reference, and only then signal
82          * completion, or we may (in drbd_al_read_log()) cycle so fast into the
83          * next drbd_md_sync_page_io(), that we trigger the
84          * ASSERT(atomic_read(&device->md_io_in_use) == 1) there.
85          */
86         drbd_md_put_buffer(device);
87         device->md_io.done = 1;
88         wake_up(&device->misc_wait);
89         bio_put(bio);
90         if (device->ldev) /* special case: drbd_md_read() during drbd_adm_attach() */
91                 put_ldev(device);
92 }
93
94 /* reads on behalf of the partner,
95  * "submitted" by the receiver
96  */
97 static void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local)
98 {
99         unsigned long flags = 0;
100         struct drbd_peer_device *peer_device = peer_req->peer_device;
101         struct drbd_device *device = peer_device->device;
102
103         spin_lock_irqsave(&device->resource->req_lock, flags);
104         device->read_cnt += peer_req->i.size >> 9;
105         list_del(&peer_req->w.list);
106         if (list_empty(&device->read_ee))
107                 wake_up(&device->ee_wait);
108         if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
109                 __drbd_chk_io_error(device, DRBD_READ_ERROR);
110         spin_unlock_irqrestore(&device->resource->req_lock, flags);
111
112         drbd_queue_work(&peer_device->connection->sender_work, &peer_req->w);
113         put_ldev(device);
114 }
115
116 /* writes on behalf of the partner, or resync writes,
117  * "submitted" by the receiver, final stage.  */
118 void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local)
119 {
120         unsigned long flags = 0;
121         struct drbd_peer_device *peer_device = peer_req->peer_device;
122         struct drbd_device *device = peer_device->device;
123         struct drbd_interval i;
124         int do_wake;
125         u64 block_id;
126         int do_al_complete_io;
127
128         /* after we moved peer_req to done_ee,
129          * we may no longer access it,
130          * it may be freed/reused already!
131          * (as soon as we release the req_lock) */
132         i = peer_req->i;
133         do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO;
134         block_id = peer_req->block_id;
135         peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
136
137         spin_lock_irqsave(&device->resource->req_lock, flags);
138         device->writ_cnt += peer_req->i.size >> 9;
139         list_move_tail(&peer_req->w.list, &device->done_ee);
140
141         /*
142          * Do not remove from the write_requests tree here: we did not send the
143          * Ack yet and did not wake possibly waiting conflicting requests.
144          * Removed from the tree from "drbd_process_done_ee" within the
145          * appropriate dw.cb (e_end_block/e_end_resync_block) or from
146          * _drbd_clear_done_ee.
147          */
148
149         do_wake = list_empty(block_id == ID_SYNCER ? &device->sync_ee : &device->active_ee);
150
151         /* FIXME do we want to detach for failed REQ_DISCARD?
152          * ((peer_req->flags & (EE_WAS_ERROR|EE_IS_TRIM)) == EE_WAS_ERROR) */
153         if (peer_req->flags & EE_WAS_ERROR)
154                 __drbd_chk_io_error(device, DRBD_WRITE_ERROR);
155         spin_unlock_irqrestore(&device->resource->req_lock, flags);
156
157         if (block_id == ID_SYNCER)
158                 drbd_rs_complete_io(device, i.sector);
159
160         if (do_wake)
161                 wake_up(&device->ee_wait);
162
163         if (do_al_complete_io)
164                 drbd_al_complete_io(device, &i);
165
166         wake_asender(peer_device->connection);
167         put_ldev(device);
168 }
169
170 /* writes on behalf of the partner, or resync writes,
171  * "submitted" by the receiver.
172  */
173 void drbd_peer_request_endio(struct bio *bio)
174 {
175         struct drbd_peer_request *peer_req = bio->bi_private;
176         struct drbd_device *device = peer_req->peer_device->device;
177         int is_write = bio_data_dir(bio) == WRITE;
178         int is_discard = !!(bio->bi_rw & REQ_DISCARD);
179
180         if (bio->bi_error && __ratelimit(&drbd_ratelimit_state))
181                 drbd_warn(device, "%s: error=%d s=%llus\n",
182                                 is_write ? (is_discard ? "discard" : "write")
183                                         : "read", bio->bi_error,
184                                 (unsigned long long)peer_req->i.sector);
185
186         if (bio->bi_error)
187                 set_bit(__EE_WAS_ERROR, &peer_req->flags);
188
189         bio_put(bio); /* no need for the bio anymore */
190         if (atomic_dec_and_test(&peer_req->pending_bios)) {
191                 if (is_write)
192                         drbd_endio_write_sec_final(peer_req);
193                 else
194                         drbd_endio_read_sec_final(peer_req);
195         }
196 }
197
198 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
199  */
200 void drbd_request_endio(struct bio *bio)
201 {
202         unsigned long flags;
203         struct drbd_request *req = bio->bi_private;
204         struct drbd_device *device = req->device;
205         struct bio_and_error m;
206         enum drbd_req_event what;
207
208         /* If this request was aborted locally before,
209          * but now was completed "successfully",
210          * chances are that this caused arbitrary data corruption.
211          *
212          * "aborting" requests, or force-detaching the disk, is intended for
213          * completely blocked/hung local backing devices which do no longer
214          * complete requests at all, not even do error completions.  In this
215          * situation, usually a hard-reset and failover is the only way out.
216          *
217          * By "aborting", basically faking a local error-completion,
218          * we allow for a more graceful swichover by cleanly migrating services.
219          * Still the affected node has to be rebooted "soon".
220          *
221          * By completing these requests, we allow the upper layers to re-use
222          * the associated data pages.
223          *
224          * If later the local backing device "recovers", and now DMAs some data
225          * from disk into the original request pages, in the best case it will
226          * just put random data into unused pages; but typically it will corrupt
227          * meanwhile completely unrelated data, causing all sorts of damage.
228          *
229          * Which means delayed successful completion,
230          * especially for READ requests,
231          * is a reason to panic().
232          *
233          * We assume that a delayed *error* completion is OK,
234          * though we still will complain noisily about it.
235          */
236         if (unlikely(req->rq_state & RQ_LOCAL_ABORTED)) {
237                 if (__ratelimit(&drbd_ratelimit_state))
238                         drbd_emerg(device, "delayed completion of aborted local request; disk-timeout may be too aggressive\n");
239
240                 if (!bio->bi_error)
241                         panic("possible random memory corruption caused by delayed completion of aborted local request\n");
242         }
243
244         /* to avoid recursion in __req_mod */
245         if (unlikely(bio->bi_error)) {
246                 if (bio->bi_rw & REQ_DISCARD)
247                         what = (bio->bi_error == -EOPNOTSUPP)
248                                 ? DISCARD_COMPLETED_NOTSUPP
249                                 : DISCARD_COMPLETED_WITH_ERROR;
250                 else
251                         what = (bio_data_dir(bio) == WRITE)
252                         ? WRITE_COMPLETED_WITH_ERROR
253                         : (bio_rw(bio) == READ)
254                           ? READ_COMPLETED_WITH_ERROR
255                           : READ_AHEAD_COMPLETED_WITH_ERROR;
256         } else
257                 what = COMPLETED_OK;
258
259         bio_put(req->private_bio);
260         req->private_bio = ERR_PTR(bio->bi_error);
261
262         /* not req_mod(), we need irqsave here! */
263         spin_lock_irqsave(&device->resource->req_lock, flags);
264         __req_mod(req, what, &m);
265         spin_unlock_irqrestore(&device->resource->req_lock, flags);
266         put_ldev(device);
267
268         if (m.bio)
269                 complete_master_bio(device, &m);
270 }
271
272 void drbd_csum_ee(struct crypto_hash *tfm, struct drbd_peer_request *peer_req, void *digest)
273 {
274         struct hash_desc desc;
275         struct scatterlist sg;
276         struct page *page = peer_req->pages;
277         struct page *tmp;
278         unsigned len;
279
280         desc.tfm = tfm;
281         desc.flags = 0;
282
283         sg_init_table(&sg, 1);
284         crypto_hash_init(&desc);
285
286         while ((tmp = page_chain_next(page))) {
287                 /* all but the last page will be fully used */
288                 sg_set_page(&sg, page, PAGE_SIZE, 0);
289                 crypto_hash_update(&desc, &sg, sg.length);
290                 page = tmp;
291         }
292         /* and now the last, possibly only partially used page */
293         len = peer_req->i.size & (PAGE_SIZE - 1);
294         sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
295         crypto_hash_update(&desc, &sg, sg.length);
296         crypto_hash_final(&desc, digest);
297 }
298
299 void drbd_csum_bio(struct crypto_hash *tfm, struct bio *bio, void *digest)
300 {
301         struct hash_desc desc;
302         struct scatterlist sg;
303         struct bio_vec bvec;
304         struct bvec_iter iter;
305
306         desc.tfm = tfm;
307         desc.flags = 0;
308
309         sg_init_table(&sg, 1);
310         crypto_hash_init(&desc);
311
312         bio_for_each_segment(bvec, bio, iter) {
313                 sg_set_page(&sg, bvec.bv_page, bvec.bv_len, bvec.bv_offset);
314                 crypto_hash_update(&desc, &sg, sg.length);
315         }
316         crypto_hash_final(&desc, digest);
317 }
318
319 /* MAYBE merge common code with w_e_end_ov_req */
320 static int w_e_send_csum(struct drbd_work *w, int cancel)
321 {
322         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
323         struct drbd_peer_device *peer_device = peer_req->peer_device;
324         struct drbd_device *device = peer_device->device;
325         int digest_size;
326         void *digest;
327         int err = 0;
328
329         if (unlikely(cancel))
330                 goto out;
331
332         if (unlikely((peer_req->flags & EE_WAS_ERROR) != 0))
333                 goto out;
334
335         digest_size = crypto_hash_digestsize(peer_device->connection->csums_tfm);
336         digest = kmalloc(digest_size, GFP_NOIO);
337         if (digest) {
338                 sector_t sector = peer_req->i.sector;
339                 unsigned int size = peer_req->i.size;
340                 drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
341                 /* Free peer_req and pages before send.
342                  * In case we block on congestion, we could otherwise run into
343                  * some distributed deadlock, if the other side blocks on
344                  * congestion as well, because our receiver blocks in
345                  * drbd_alloc_pages due to pp_in_use > max_buffers. */
346                 drbd_free_peer_req(device, peer_req);
347                 peer_req = NULL;
348                 inc_rs_pending(device);
349                 err = drbd_send_drequest_csum(peer_device, sector, size,
350                                               digest, digest_size,
351                                               P_CSUM_RS_REQUEST);
352                 kfree(digest);
353         } else {
354                 drbd_err(device, "kmalloc() of digest failed.\n");
355                 err = -ENOMEM;
356         }
357
358 out:
359         if (peer_req)
360                 drbd_free_peer_req(device, peer_req);
361
362         if (unlikely(err))
363                 drbd_err(device, "drbd_send_drequest(..., csum) failed\n");
364         return err;
365 }
366
367 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
368
369 static int read_for_csum(struct drbd_peer_device *peer_device, sector_t sector, int size)
370 {
371         struct drbd_device *device = peer_device->device;
372         struct drbd_peer_request *peer_req;
373
374         if (!get_ldev(device))
375                 return -EIO;
376
377         /* GFP_TRY, because if there is no memory available right now, this may
378          * be rescheduled for later. It is "only" background resync, after all. */
379         peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER /* unused */, sector,
380                                        size, true /* has real payload */, GFP_TRY);
381         if (!peer_req)
382                 goto defer;
383
384         peer_req->w.cb = w_e_send_csum;
385         spin_lock_irq(&device->resource->req_lock);
386         list_add_tail(&peer_req->w.list, &device->read_ee);
387         spin_unlock_irq(&device->resource->req_lock);
388
389         atomic_add(size >> 9, &device->rs_sect_ev);
390         if (drbd_submit_peer_request(device, peer_req, READ, DRBD_FAULT_RS_RD) == 0)
391                 return 0;
392
393         /* If it failed because of ENOMEM, retry should help.  If it failed
394          * because bio_add_page failed (probably broken lower level driver),
395          * retry may or may not help.
396          * If it does not, you may need to force disconnect. */
397         spin_lock_irq(&device->resource->req_lock);
398         list_del(&peer_req->w.list);
399         spin_unlock_irq(&device->resource->req_lock);
400
401         drbd_free_peer_req(device, peer_req);
402 defer:
403         put_ldev(device);
404         return -EAGAIN;
405 }
406
407 int w_resync_timer(struct drbd_work *w, int cancel)
408 {
409         struct drbd_device *device =
410                 container_of(w, struct drbd_device, resync_work);
411
412         switch (device->state.conn) {
413         case C_VERIFY_S:
414                 make_ov_request(device, cancel);
415                 break;
416         case C_SYNC_TARGET:
417                 make_resync_request(device, cancel);
418                 break;
419         }
420
421         return 0;
422 }
423
424 void resync_timer_fn(unsigned long data)
425 {
426         struct drbd_device *device = (struct drbd_device *) data;
427
428         drbd_queue_work_if_unqueued(
429                 &first_peer_device(device)->connection->sender_work,
430                 &device->resync_work);
431 }
432
433 static void fifo_set(struct fifo_buffer *fb, int value)
434 {
435         int i;
436
437         for (i = 0; i < fb->size; i++)
438                 fb->values[i] = value;
439 }
440
441 static int fifo_push(struct fifo_buffer *fb, int value)
442 {
443         int ov;
444
445         ov = fb->values[fb->head_index];
446         fb->values[fb->head_index++] = value;
447
448         if (fb->head_index >= fb->size)
449                 fb->head_index = 0;
450
451         return ov;
452 }
453
454 static void fifo_add_val(struct fifo_buffer *fb, int value)
455 {
456         int i;
457
458         for (i = 0; i < fb->size; i++)
459                 fb->values[i] += value;
460 }
461
462 struct fifo_buffer *fifo_alloc(int fifo_size)
463 {
464         struct fifo_buffer *fb;
465
466         fb = kzalloc(sizeof(struct fifo_buffer) + sizeof(int) * fifo_size, GFP_NOIO);
467         if (!fb)
468                 return NULL;
469
470         fb->head_index = 0;
471         fb->size = fifo_size;
472         fb->total = 0;
473
474         return fb;
475 }
476
477 static int drbd_rs_controller(struct drbd_device *device, unsigned int sect_in)
478 {
479         struct disk_conf *dc;
480         unsigned int want;     /* The number of sectors we want in-flight */
481         int req_sect; /* Number of sectors to request in this turn */
482         int correction; /* Number of sectors more we need in-flight */
483         int cps; /* correction per invocation of drbd_rs_controller() */
484         int steps; /* Number of time steps to plan ahead */
485         int curr_corr;
486         int max_sect;
487         struct fifo_buffer *plan;
488
489         dc = rcu_dereference(device->ldev->disk_conf);
490         plan = rcu_dereference(device->rs_plan_s);
491
492         steps = plan->size; /* (dc->c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
493
494         if (device->rs_in_flight + sect_in == 0) { /* At start of resync */
495                 want = ((dc->resync_rate * 2 * SLEEP_TIME) / HZ) * steps;
496         } else { /* normal path */
497                 want = dc->c_fill_target ? dc->c_fill_target :
498                         sect_in * dc->c_delay_target * HZ / (SLEEP_TIME * 10);
499         }
500
501         correction = want - device->rs_in_flight - plan->total;
502
503         /* Plan ahead */
504         cps = correction / steps;
505         fifo_add_val(plan, cps);
506         plan->total += cps * steps;
507
508         /* What we do in this step */
509         curr_corr = fifo_push(plan, 0);
510         plan->total -= curr_corr;
511
512         req_sect = sect_in + curr_corr;
513         if (req_sect < 0)
514                 req_sect = 0;
515
516         max_sect = (dc->c_max_rate * 2 * SLEEP_TIME) / HZ;
517         if (req_sect > max_sect)
518                 req_sect = max_sect;
519
520         /*
521         drbd_warn(device, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
522                  sect_in, device->rs_in_flight, want, correction,
523                  steps, cps, device->rs_planed, curr_corr, req_sect);
524         */
525
526         return req_sect;
527 }
528
529 static int drbd_rs_number_requests(struct drbd_device *device)
530 {
531         unsigned int sect_in;  /* Number of sectors that came in since the last turn */
532         int number, mxb;
533
534         sect_in = atomic_xchg(&device->rs_sect_in, 0);
535         device->rs_in_flight -= sect_in;
536
537         rcu_read_lock();
538         mxb = drbd_get_max_buffers(device) / 2;
539         if (rcu_dereference(device->rs_plan_s)->size) {
540                 number = drbd_rs_controller(device, sect_in) >> (BM_BLOCK_SHIFT - 9);
541                 device->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
542         } else {
543                 device->c_sync_rate = rcu_dereference(device->ldev->disk_conf)->resync_rate;
544                 number = SLEEP_TIME * device->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
545         }
546         rcu_read_unlock();
547
548         /* Don't have more than "max-buffers"/2 in-flight.
549          * Otherwise we may cause the remote site to stall on drbd_alloc_pages(),
550          * potentially causing a distributed deadlock on congestion during
551          * online-verify or (checksum-based) resync, if max-buffers,
552          * socket buffer sizes and resync rate settings are mis-configured. */
553
554         /* note that "number" is in units of "BM_BLOCK_SIZE" (which is 4k),
555          * mxb (as used here, and in drbd_alloc_pages on the peer) is
556          * "number of pages" (typically also 4k),
557          * but "rs_in_flight" is in "sectors" (512 Byte). */
558         if (mxb - device->rs_in_flight/8 < number)
559                 number = mxb - device->rs_in_flight/8;
560
561         return number;
562 }
563
564 static int make_resync_request(struct drbd_device *const device, int cancel)
565 {
566         struct drbd_peer_device *const peer_device = first_peer_device(device);
567         struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
568         unsigned long bit;
569         sector_t sector;
570         const sector_t capacity = drbd_get_capacity(device->this_bdev);
571         int max_bio_size;
572         int number, rollback_i, size;
573         int align, requeue = 0;
574         int i = 0;
575
576         if (unlikely(cancel))
577                 return 0;
578
579         if (device->rs_total == 0) {
580                 /* empty resync? */
581                 drbd_resync_finished(device);
582                 return 0;
583         }
584
585         if (!get_ldev(device)) {
586                 /* Since we only need to access device->rsync a
587                    get_ldev_if_state(device,D_FAILED) would be sufficient, but
588                    to continue resync with a broken disk makes no sense at
589                    all */
590                 drbd_err(device, "Disk broke down during resync!\n");
591                 return 0;
592         }
593
594         max_bio_size = queue_max_hw_sectors(device->rq_queue) << 9;
595         number = drbd_rs_number_requests(device);
596         if (number <= 0)
597                 goto requeue;
598
599         for (i = 0; i < number; i++) {
600                 /* Stop generating RS requests when half of the send buffer is filled,
601                  * but notify TCP that we'd like to have more space. */
602                 mutex_lock(&connection->data.mutex);
603                 if (connection->data.socket) {
604                         struct sock *sk = connection->data.socket->sk;
605                         int queued = sk->sk_wmem_queued;
606                         int sndbuf = sk->sk_sndbuf;
607                         if (queued > sndbuf / 2) {
608                                 requeue = 1;
609                                 if (sk->sk_socket)
610                                         set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
611                         }
612                 } else
613                         requeue = 1;
614                 mutex_unlock(&connection->data.mutex);
615                 if (requeue)
616                         goto requeue;
617
618 next_sector:
619                 size = BM_BLOCK_SIZE;
620                 bit  = drbd_bm_find_next(device, device->bm_resync_fo);
621
622                 if (bit == DRBD_END_OF_BITMAP) {
623                         device->bm_resync_fo = drbd_bm_bits(device);
624                         put_ldev(device);
625                         return 0;
626                 }
627
628                 sector = BM_BIT_TO_SECT(bit);
629
630                 if (drbd_try_rs_begin_io(device, sector)) {
631                         device->bm_resync_fo = bit;
632                         goto requeue;
633                 }
634                 device->bm_resync_fo = bit + 1;
635
636                 if (unlikely(drbd_bm_test_bit(device, bit) == 0)) {
637                         drbd_rs_complete_io(device, sector);
638                         goto next_sector;
639                 }
640
641 #if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
642                 /* try to find some adjacent bits.
643                  * we stop if we have already the maximum req size.
644                  *
645                  * Additionally always align bigger requests, in order to
646                  * be prepared for all stripe sizes of software RAIDs.
647                  */
648                 align = 1;
649                 rollback_i = i;
650                 while (i < number) {
651                         if (size + BM_BLOCK_SIZE > max_bio_size)
652                                 break;
653
654                         /* Be always aligned */
655                         if (sector & ((1<<(align+3))-1))
656                                 break;
657
658                         /* do not cross extent boundaries */
659                         if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
660                                 break;
661                         /* now, is it actually dirty, after all?
662                          * caution, drbd_bm_test_bit is tri-state for some
663                          * obscure reason; ( b == 0 ) would get the out-of-band
664                          * only accidentally right because of the "oddly sized"
665                          * adjustment below */
666                         if (drbd_bm_test_bit(device, bit+1) != 1)
667                                 break;
668                         bit++;
669                         size += BM_BLOCK_SIZE;
670                         if ((BM_BLOCK_SIZE << align) <= size)
671                                 align++;
672                         i++;
673                 }
674                 /* if we merged some,
675                  * reset the offset to start the next drbd_bm_find_next from */
676                 if (size > BM_BLOCK_SIZE)
677                         device->bm_resync_fo = bit + 1;
678 #endif
679
680                 /* adjust very last sectors, in case we are oddly sized */
681                 if (sector + (size>>9) > capacity)
682                         size = (capacity-sector)<<9;
683
684                 if (device->use_csums) {
685                         switch (read_for_csum(peer_device, sector, size)) {
686                         case -EIO: /* Disk failure */
687                                 put_ldev(device);
688                                 return -EIO;
689                         case -EAGAIN: /* allocation failed, or ldev busy */
690                                 drbd_rs_complete_io(device, sector);
691                                 device->bm_resync_fo = BM_SECT_TO_BIT(sector);
692                                 i = rollback_i;
693                                 goto requeue;
694                         case 0:
695                                 /* everything ok */
696                                 break;
697                         default:
698                                 BUG();
699                         }
700                 } else {
701                         int err;
702
703                         inc_rs_pending(device);
704                         err = drbd_send_drequest(peer_device, P_RS_DATA_REQUEST,
705                                                  sector, size, ID_SYNCER);
706                         if (err) {
707                                 drbd_err(device, "drbd_send_drequest() failed, aborting...\n");
708                                 dec_rs_pending(device);
709                                 put_ldev(device);
710                                 return err;
711                         }
712                 }
713         }
714
715         if (device->bm_resync_fo >= drbd_bm_bits(device)) {
716                 /* last syncer _request_ was sent,
717                  * but the P_RS_DATA_REPLY not yet received.  sync will end (and
718                  * next sync group will resume), as soon as we receive the last
719                  * resync data block, and the last bit is cleared.
720                  * until then resync "work" is "inactive" ...
721                  */
722                 put_ldev(device);
723                 return 0;
724         }
725
726  requeue:
727         device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
728         mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
729         put_ldev(device);
730         return 0;
731 }
732
733 static int make_ov_request(struct drbd_device *device, int cancel)
734 {
735         int number, i, size;
736         sector_t sector;
737         const sector_t capacity = drbd_get_capacity(device->this_bdev);
738         bool stop_sector_reached = false;
739
740         if (unlikely(cancel))
741                 return 1;
742
743         number = drbd_rs_number_requests(device);
744
745         sector = device->ov_position;
746         for (i = 0; i < number; i++) {
747                 if (sector >= capacity)
748                         return 1;
749
750                 /* We check for "finished" only in the reply path:
751                  * w_e_end_ov_reply().
752                  * We need to send at least one request out. */
753                 stop_sector_reached = i > 0
754                         && verify_can_do_stop_sector(device)
755                         && sector >= device->ov_stop_sector;
756                 if (stop_sector_reached)
757                         break;
758
759                 size = BM_BLOCK_SIZE;
760
761                 if (drbd_try_rs_begin_io(device, sector)) {
762                         device->ov_position = sector;
763                         goto requeue;
764                 }
765
766                 if (sector + (size>>9) > capacity)
767                         size = (capacity-sector)<<9;
768
769                 inc_rs_pending(device);
770                 if (drbd_send_ov_request(first_peer_device(device), sector, size)) {
771                         dec_rs_pending(device);
772                         return 0;
773                 }
774                 sector += BM_SECT_PER_BIT;
775         }
776         device->ov_position = sector;
777
778  requeue:
779         device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
780         if (i == 0 || !stop_sector_reached)
781                 mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
782         return 1;
783 }
784
785 int w_ov_finished(struct drbd_work *w, int cancel)
786 {
787         struct drbd_device_work *dw =
788                 container_of(w, struct drbd_device_work, w);
789         struct drbd_device *device = dw->device;
790         kfree(dw);
791         ov_out_of_sync_print(device);
792         drbd_resync_finished(device);
793
794         return 0;
795 }
796
797 static int w_resync_finished(struct drbd_work *w, int cancel)
798 {
799         struct drbd_device_work *dw =
800                 container_of(w, struct drbd_device_work, w);
801         struct drbd_device *device = dw->device;
802         kfree(dw);
803
804         drbd_resync_finished(device);
805
806         return 0;
807 }
808
809 static void ping_peer(struct drbd_device *device)
810 {
811         struct drbd_connection *connection = first_peer_device(device)->connection;
812
813         clear_bit(GOT_PING_ACK, &connection->flags);
814         request_ping(connection);
815         wait_event(connection->ping_wait,
816                    test_bit(GOT_PING_ACK, &connection->flags) || device->state.conn < C_CONNECTED);
817 }
818
819 int drbd_resync_finished(struct drbd_device *device)
820 {
821         unsigned long db, dt, dbdt;
822         unsigned long n_oos;
823         union drbd_state os, ns;
824         struct drbd_device_work *dw;
825         char *khelper_cmd = NULL;
826         int verify_done = 0;
827
828         /* Remove all elements from the resync LRU. Since future actions
829          * might set bits in the (main) bitmap, then the entries in the
830          * resync LRU would be wrong. */
831         if (drbd_rs_del_all(device)) {
832                 /* In case this is not possible now, most probably because
833                  * there are P_RS_DATA_REPLY Packets lingering on the worker's
834                  * queue (or even the read operations for those packets
835                  * is not finished by now).   Retry in 100ms. */
836
837                 schedule_timeout_interruptible(HZ / 10);
838                 dw = kmalloc(sizeof(struct drbd_device_work), GFP_ATOMIC);
839                 if (dw) {
840                         dw->w.cb = w_resync_finished;
841                         dw->device = device;
842                         drbd_queue_work(&first_peer_device(device)->connection->sender_work,
843                                         &dw->w);
844                         return 1;
845                 }
846                 drbd_err(device, "Warn failed to drbd_rs_del_all() and to kmalloc(dw).\n");
847         }
848
849         dt = (jiffies - device->rs_start - device->rs_paused) / HZ;
850         if (dt <= 0)
851                 dt = 1;
852
853         db = device->rs_total;
854         /* adjust for verify start and stop sectors, respective reached position */
855         if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
856                 db -= device->ov_left;
857
858         dbdt = Bit2KB(db/dt);
859         device->rs_paused /= HZ;
860
861         if (!get_ldev(device))
862                 goto out;
863
864         ping_peer(device);
865
866         spin_lock_irq(&device->resource->req_lock);
867         os = drbd_read_state(device);
868
869         verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
870
871         /* This protects us against multiple calls (that can happen in the presence
872            of application IO), and against connectivity loss just before we arrive here. */
873         if (os.conn <= C_CONNECTED)
874                 goto out_unlock;
875
876         ns = os;
877         ns.conn = C_CONNECTED;
878
879         drbd_info(device, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
880              verify_done ? "Online verify" : "Resync",
881              dt + device->rs_paused, device->rs_paused, dbdt);
882
883         n_oos = drbd_bm_total_weight(device);
884
885         if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
886                 if (n_oos) {
887                         drbd_alert(device, "Online verify found %lu %dk block out of sync!\n",
888                               n_oos, Bit2KB(1));
889                         khelper_cmd = "out-of-sync";
890                 }
891         } else {
892                 D_ASSERT(device, (n_oos - device->rs_failed) == 0);
893
894                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
895                         khelper_cmd = "after-resync-target";
896
897                 if (device->use_csums && device->rs_total) {
898                         const unsigned long s = device->rs_same_csum;
899                         const unsigned long t = device->rs_total;
900                         const int ratio =
901                                 (t == 0)     ? 0 :
902                         (t < 100000) ? ((s*100)/t) : (s/(t/100));
903                         drbd_info(device, "%u %% had equal checksums, eliminated: %luK; "
904                              "transferred %luK total %luK\n",
905                              ratio,
906                              Bit2KB(device->rs_same_csum),
907                              Bit2KB(device->rs_total - device->rs_same_csum),
908                              Bit2KB(device->rs_total));
909                 }
910         }
911
912         if (device->rs_failed) {
913                 drbd_info(device, "            %lu failed blocks\n", device->rs_failed);
914
915                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
916                         ns.disk = D_INCONSISTENT;
917                         ns.pdsk = D_UP_TO_DATE;
918                 } else {
919                         ns.disk = D_UP_TO_DATE;
920                         ns.pdsk = D_INCONSISTENT;
921                 }
922         } else {
923                 ns.disk = D_UP_TO_DATE;
924                 ns.pdsk = D_UP_TO_DATE;
925
926                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
927                         if (device->p_uuid) {
928                                 int i;
929                                 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
930                                         _drbd_uuid_set(device, i, device->p_uuid[i]);
931                                 drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_CURRENT]);
932                                 _drbd_uuid_set(device, UI_CURRENT, device->p_uuid[UI_CURRENT]);
933                         } else {
934                                 drbd_err(device, "device->p_uuid is NULL! BUG\n");
935                         }
936                 }
937
938                 if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
939                         /* for verify runs, we don't update uuids here,
940                          * so there would be nothing to report. */
941                         drbd_uuid_set_bm(device, 0UL);
942                         drbd_print_uuids(device, "updated UUIDs");
943                         if (device->p_uuid) {
944                                 /* Now the two UUID sets are equal, update what we
945                                  * know of the peer. */
946                                 int i;
947                                 for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
948                                         device->p_uuid[i] = device->ldev->md.uuid[i];
949                         }
950                 }
951         }
952
953         _drbd_set_state(device, ns, CS_VERBOSE, NULL);
954 out_unlock:
955         spin_unlock_irq(&device->resource->req_lock);
956         put_ldev(device);
957 out:
958         device->rs_total  = 0;
959         device->rs_failed = 0;
960         device->rs_paused = 0;
961
962         /* reset start sector, if we reached end of device */
963         if (verify_done && device->ov_left == 0)
964                 device->ov_start_sector = 0;
965
966         drbd_md_sync(device);
967
968         if (khelper_cmd)
969                 drbd_khelper(device, khelper_cmd);
970
971         return 1;
972 }
973
974 /* helper */
975 static void move_to_net_ee_or_free(struct drbd_device *device, struct drbd_peer_request *peer_req)
976 {
977         if (drbd_peer_req_has_active_page(peer_req)) {
978                 /* This might happen if sendpage() has not finished */
979                 int i = (peer_req->i.size + PAGE_SIZE -1) >> PAGE_SHIFT;
980                 atomic_add(i, &device->pp_in_use_by_net);
981                 atomic_sub(i, &device->pp_in_use);
982                 spin_lock_irq(&device->resource->req_lock);
983                 list_add_tail(&peer_req->w.list, &device->net_ee);
984                 spin_unlock_irq(&device->resource->req_lock);
985                 wake_up(&drbd_pp_wait);
986         } else
987                 drbd_free_peer_req(device, peer_req);
988 }
989
990 /**
991  * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
992  * @device:     DRBD device.
993  * @w:          work object.
994  * @cancel:     The connection will be closed anyways
995  */
996 int w_e_end_data_req(struct drbd_work *w, int cancel)
997 {
998         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
999         struct drbd_peer_device *peer_device = peer_req->peer_device;
1000         struct drbd_device *device = peer_device->device;
1001         int err;
1002
1003         if (unlikely(cancel)) {
1004                 drbd_free_peer_req(device, peer_req);
1005                 dec_unacked(device);
1006                 return 0;
1007         }
1008
1009         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1010                 err = drbd_send_block(peer_device, P_DATA_REPLY, peer_req);
1011         } else {
1012                 if (__ratelimit(&drbd_ratelimit_state))
1013                         drbd_err(device, "Sending NegDReply. sector=%llus.\n",
1014                             (unsigned long long)peer_req->i.sector);
1015
1016                 err = drbd_send_ack(peer_device, P_NEG_DREPLY, peer_req);
1017         }
1018
1019         dec_unacked(device);
1020
1021         move_to_net_ee_or_free(device, peer_req);
1022
1023         if (unlikely(err))
1024                 drbd_err(device, "drbd_send_block() failed\n");
1025         return err;
1026 }
1027
1028 /**
1029  * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUEST
1030  * @w:          work object.
1031  * @cancel:     The connection will be closed anyways
1032  */
1033 int w_e_end_rsdata_req(struct drbd_work *w, int cancel)
1034 {
1035         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1036         struct drbd_peer_device *peer_device = peer_req->peer_device;
1037         struct drbd_device *device = peer_device->device;
1038         int err;
1039
1040         if (unlikely(cancel)) {
1041                 drbd_free_peer_req(device, peer_req);
1042                 dec_unacked(device);
1043                 return 0;
1044         }
1045
1046         if (get_ldev_if_state(device, D_FAILED)) {
1047                 drbd_rs_complete_io(device, peer_req->i.sector);
1048                 put_ldev(device);
1049         }
1050
1051         if (device->state.conn == C_AHEAD) {
1052                 err = drbd_send_ack(peer_device, P_RS_CANCEL, peer_req);
1053         } else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1054                 if (likely(device->state.pdsk >= D_INCONSISTENT)) {
1055                         inc_rs_pending(device);
1056                         err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1057                 } else {
1058                         if (__ratelimit(&drbd_ratelimit_state))
1059                                 drbd_err(device, "Not sending RSDataReply, "
1060                                     "partner DISKLESS!\n");
1061                         err = 0;
1062                 }
1063         } else {
1064                 if (__ratelimit(&drbd_ratelimit_state))
1065                         drbd_err(device, "Sending NegRSDReply. sector %llus.\n",
1066                             (unsigned long long)peer_req->i.sector);
1067
1068                 err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1069
1070                 /* update resync data with failure */
1071                 drbd_rs_failed_io(device, peer_req->i.sector, peer_req->i.size);
1072         }
1073
1074         dec_unacked(device);
1075
1076         move_to_net_ee_or_free(device, peer_req);
1077
1078         if (unlikely(err))
1079                 drbd_err(device, "drbd_send_block() failed\n");
1080         return err;
1081 }
1082
1083 int w_e_end_csum_rs_req(struct drbd_work *w, int cancel)
1084 {
1085         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1086         struct drbd_peer_device *peer_device = peer_req->peer_device;
1087         struct drbd_device *device = peer_device->device;
1088         struct digest_info *di;
1089         int digest_size;
1090         void *digest = NULL;
1091         int err, eq = 0;
1092
1093         if (unlikely(cancel)) {
1094                 drbd_free_peer_req(device, peer_req);
1095                 dec_unacked(device);
1096                 return 0;
1097         }
1098
1099         if (get_ldev(device)) {
1100                 drbd_rs_complete_io(device, peer_req->i.sector);
1101                 put_ldev(device);
1102         }
1103
1104         di = peer_req->digest;
1105
1106         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1107                 /* quick hack to try to avoid a race against reconfiguration.
1108                  * a real fix would be much more involved,
1109                  * introducing more locking mechanisms */
1110                 if (peer_device->connection->csums_tfm) {
1111                         digest_size = crypto_hash_digestsize(peer_device->connection->csums_tfm);
1112                         D_ASSERT(device, digest_size == di->digest_size);
1113                         digest = kmalloc(digest_size, GFP_NOIO);
1114                 }
1115                 if (digest) {
1116                         drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
1117                         eq = !memcmp(digest, di->digest, digest_size);
1118                         kfree(digest);
1119                 }
1120
1121                 if (eq) {
1122                         drbd_set_in_sync(device, peer_req->i.sector, peer_req->i.size);
1123                         /* rs_same_csums unit is BM_BLOCK_SIZE */
1124                         device->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT;
1125                         err = drbd_send_ack(peer_device, P_RS_IS_IN_SYNC, peer_req);
1126                 } else {
1127                         inc_rs_pending(device);
1128                         peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1129                         peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */
1130                         kfree(di);
1131                         err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1132                 }
1133         } else {
1134                 err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1135                 if (__ratelimit(&drbd_ratelimit_state))
1136                         drbd_err(device, "Sending NegDReply. I guess it gets messy.\n");
1137         }
1138
1139         dec_unacked(device);
1140         move_to_net_ee_or_free(device, peer_req);
1141
1142         if (unlikely(err))
1143                 drbd_err(device, "drbd_send_block/ack() failed\n");
1144         return err;
1145 }
1146
1147 int w_e_end_ov_req(struct drbd_work *w, int cancel)
1148 {
1149         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1150         struct drbd_peer_device *peer_device = peer_req->peer_device;
1151         struct drbd_device *device = peer_device->device;
1152         sector_t sector = peer_req->i.sector;
1153         unsigned int size = peer_req->i.size;
1154         int digest_size;
1155         void *digest;
1156         int err = 0;
1157
1158         if (unlikely(cancel))
1159                 goto out;
1160
1161         digest_size = crypto_hash_digestsize(peer_device->connection->verify_tfm);
1162         digest = kmalloc(digest_size, GFP_NOIO);
1163         if (!digest) {
1164                 err = 1;        /* terminate the connection in case the allocation failed */
1165                 goto out;
1166         }
1167
1168         if (likely(!(peer_req->flags & EE_WAS_ERROR)))
1169                 drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1170         else
1171                 memset(digest, 0, digest_size);
1172
1173         /* Free e and pages before send.
1174          * In case we block on congestion, we could otherwise run into
1175          * some distributed deadlock, if the other side blocks on
1176          * congestion as well, because our receiver blocks in
1177          * drbd_alloc_pages due to pp_in_use > max_buffers. */
1178         drbd_free_peer_req(device, peer_req);
1179         peer_req = NULL;
1180         inc_rs_pending(device);
1181         err = drbd_send_drequest_csum(peer_device, sector, size, digest, digest_size, P_OV_REPLY);
1182         if (err)
1183                 dec_rs_pending(device);
1184         kfree(digest);
1185
1186 out:
1187         if (peer_req)
1188                 drbd_free_peer_req(device, peer_req);
1189         dec_unacked(device);
1190         return err;
1191 }
1192
1193 void drbd_ov_out_of_sync_found(struct drbd_device *device, sector_t sector, int size)
1194 {
1195         if (device->ov_last_oos_start + device->ov_last_oos_size == sector) {
1196                 device->ov_last_oos_size += size>>9;
1197         } else {
1198                 device->ov_last_oos_start = sector;
1199                 device->ov_last_oos_size = size>>9;
1200         }
1201         drbd_set_out_of_sync(device, sector, size);
1202 }
1203
1204 int w_e_end_ov_reply(struct drbd_work *w, int cancel)
1205 {
1206         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1207         struct drbd_peer_device *peer_device = peer_req->peer_device;
1208         struct drbd_device *device = peer_device->device;
1209         struct digest_info *di;
1210         void *digest;
1211         sector_t sector = peer_req->i.sector;
1212         unsigned int size = peer_req->i.size;
1213         int digest_size;
1214         int err, eq = 0;
1215         bool stop_sector_reached = false;
1216
1217         if (unlikely(cancel)) {
1218                 drbd_free_peer_req(device, peer_req);
1219                 dec_unacked(device);
1220                 return 0;
1221         }
1222
1223         /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1224          * the resync lru has been cleaned up already */
1225         if (get_ldev(device)) {
1226                 drbd_rs_complete_io(device, peer_req->i.sector);
1227                 put_ldev(device);
1228         }
1229
1230         di = peer_req->digest;
1231
1232         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1233                 digest_size = crypto_hash_digestsize(peer_device->connection->verify_tfm);
1234                 digest = kmalloc(digest_size, GFP_NOIO);
1235                 if (digest) {
1236                         drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1237
1238                         D_ASSERT(device, digest_size == di->digest_size);
1239                         eq = !memcmp(digest, di->digest, digest_size);
1240                         kfree(digest);
1241                 }
1242         }
1243
1244         /* Free peer_req and pages before send.
1245          * In case we block on congestion, we could otherwise run into
1246          * some distributed deadlock, if the other side blocks on
1247          * congestion as well, because our receiver blocks in
1248          * drbd_alloc_pages due to pp_in_use > max_buffers. */
1249         drbd_free_peer_req(device, peer_req);
1250         if (!eq)
1251                 drbd_ov_out_of_sync_found(device, sector, size);
1252         else
1253                 ov_out_of_sync_print(device);
1254
1255         err = drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size,
1256                                eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1257
1258         dec_unacked(device);
1259
1260         --device->ov_left;
1261
1262         /* let's advance progress step marks only for every other megabyte */
1263         if ((device->ov_left & 0x200) == 0x200)
1264                 drbd_advance_rs_marks(device, device->ov_left);
1265
1266         stop_sector_reached = verify_can_do_stop_sector(device) &&
1267                 (sector + (size>>9)) >= device->ov_stop_sector;
1268
1269         if (device->ov_left == 0 || stop_sector_reached) {
1270                 ov_out_of_sync_print(device);
1271                 drbd_resync_finished(device);
1272         }
1273
1274         return err;
1275 }
1276
1277 /* FIXME
1278  * We need to track the number of pending barrier acks,
1279  * and to be able to wait for them.
1280  * See also comment in drbd_adm_attach before drbd_suspend_io.
1281  */
1282 static int drbd_send_barrier(struct drbd_connection *connection)
1283 {
1284         struct p_barrier *p;
1285         struct drbd_socket *sock;
1286
1287         sock = &connection->data;
1288         p = conn_prepare_command(connection, sock);
1289         if (!p)
1290                 return -EIO;
1291         p->barrier = connection->send.current_epoch_nr;
1292         p->pad = 0;
1293         connection->send.current_epoch_writes = 0;
1294
1295         return conn_send_command(connection, sock, P_BARRIER, sizeof(*p), NULL, 0);
1296 }
1297
1298 int w_send_write_hint(struct drbd_work *w, int cancel)
1299 {
1300         struct drbd_device *device =
1301                 container_of(w, struct drbd_device, unplug_work);
1302         struct drbd_socket *sock;
1303
1304         if (cancel)
1305                 return 0;
1306         sock = &first_peer_device(device)->connection->data;
1307         if (!drbd_prepare_command(first_peer_device(device), sock))
1308                 return -EIO;
1309         return drbd_send_command(first_peer_device(device), sock, P_UNPLUG_REMOTE, 0, NULL, 0);
1310 }
1311
1312 static void re_init_if_first_write(struct drbd_connection *connection, unsigned int epoch)
1313 {
1314         if (!connection->send.seen_any_write_yet) {
1315                 connection->send.seen_any_write_yet = true;
1316                 connection->send.current_epoch_nr = epoch;
1317                 connection->send.current_epoch_writes = 0;
1318         }
1319 }
1320
1321 static void maybe_send_barrier(struct drbd_connection *connection, unsigned int epoch)
1322 {
1323         /* re-init if first write on this connection */
1324         if (!connection->send.seen_any_write_yet)
1325                 return;
1326         if (connection->send.current_epoch_nr != epoch) {
1327                 if (connection->send.current_epoch_writes)
1328                         drbd_send_barrier(connection);
1329                 connection->send.current_epoch_nr = epoch;
1330         }
1331 }
1332
1333 int w_send_out_of_sync(struct drbd_work *w, int cancel)
1334 {
1335         struct drbd_request *req = container_of(w, struct drbd_request, w);
1336         struct drbd_device *device = req->device;
1337         struct drbd_peer_device *const peer_device = first_peer_device(device);
1338         struct drbd_connection *const connection = peer_device->connection;
1339         int err;
1340
1341         if (unlikely(cancel)) {
1342                 req_mod(req, SEND_CANCELED);
1343                 return 0;
1344         }
1345         req->pre_send_jif = jiffies;
1346
1347         /* this time, no connection->send.current_epoch_writes++;
1348          * If it was sent, it was the closing barrier for the last
1349          * replicated epoch, before we went into AHEAD mode.
1350          * No more barriers will be sent, until we leave AHEAD mode again. */
1351         maybe_send_barrier(connection, req->epoch);
1352
1353         err = drbd_send_out_of_sync(peer_device, req);
1354         req_mod(req, OOS_HANDED_TO_NETWORK);
1355
1356         return err;
1357 }
1358
1359 /**
1360  * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1361  * @w:          work object.
1362  * @cancel:     The connection will be closed anyways
1363  */
1364 int w_send_dblock(struct drbd_work *w, int cancel)
1365 {
1366         struct drbd_request *req = container_of(w, struct drbd_request, w);
1367         struct drbd_device *device = req->device;
1368         struct drbd_peer_device *const peer_device = first_peer_device(device);
1369         struct drbd_connection *connection = peer_device->connection;
1370         int err;
1371
1372         if (unlikely(cancel)) {
1373                 req_mod(req, SEND_CANCELED);
1374                 return 0;
1375         }
1376         req->pre_send_jif = jiffies;
1377
1378         re_init_if_first_write(connection, req->epoch);
1379         maybe_send_barrier(connection, req->epoch);
1380         connection->send.current_epoch_writes++;
1381
1382         err = drbd_send_dblock(peer_device, req);
1383         req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1384
1385         return err;
1386 }
1387
1388 /**
1389  * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1390  * @w:          work object.
1391  * @cancel:     The connection will be closed anyways
1392  */
1393 int w_send_read_req(struct drbd_work *w, int cancel)
1394 {
1395         struct drbd_request *req = container_of(w, struct drbd_request, w);
1396         struct drbd_device *device = req->device;
1397         struct drbd_peer_device *const peer_device = first_peer_device(device);
1398         struct drbd_connection *connection = peer_device->connection;
1399         int err;
1400
1401         if (unlikely(cancel)) {
1402                 req_mod(req, SEND_CANCELED);
1403                 return 0;
1404         }
1405         req->pre_send_jif = jiffies;
1406
1407         /* Even read requests may close a write epoch,
1408          * if there was any yet. */
1409         maybe_send_barrier(connection, req->epoch);
1410
1411         err = drbd_send_drequest(peer_device, P_DATA_REQUEST, req->i.sector, req->i.size,
1412                                  (unsigned long)req);
1413
1414         req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1415
1416         return err;
1417 }
1418
1419 int w_restart_disk_io(struct drbd_work *w, int cancel)
1420 {
1421         struct drbd_request *req = container_of(w, struct drbd_request, w);
1422         struct drbd_device *device = req->device;
1423
1424         if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1425                 drbd_al_begin_io(device, &req->i);
1426
1427         drbd_req_make_private_bio(req, req->master_bio);
1428         req->private_bio->bi_bdev = device->ldev->backing_bdev;
1429         generic_make_request(req->private_bio);
1430
1431         return 0;
1432 }
1433
1434 static int _drbd_may_sync_now(struct drbd_device *device)
1435 {
1436         struct drbd_device *odev = device;
1437         int resync_after;
1438
1439         while (1) {
1440                 if (!odev->ldev || odev->state.disk == D_DISKLESS)
1441                         return 1;
1442                 rcu_read_lock();
1443                 resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1444                 rcu_read_unlock();
1445                 if (resync_after == -1)
1446                         return 1;
1447                 odev = minor_to_device(resync_after);
1448                 if (!odev)
1449                         return 1;
1450                 if ((odev->state.conn >= C_SYNC_SOURCE &&
1451                      odev->state.conn <= C_PAUSED_SYNC_T) ||
1452                     odev->state.aftr_isp || odev->state.peer_isp ||
1453                     odev->state.user_isp)
1454                         return 0;
1455         }
1456 }
1457
1458 /**
1459  * _drbd_pause_after() - Pause resync on all devices that may not resync now
1460  * @device:     DRBD device.
1461  *
1462  * Called from process context only (admin command and after_state_ch).
1463  */
1464 static int _drbd_pause_after(struct drbd_device *device)
1465 {
1466         struct drbd_device *odev;
1467         int i, rv = 0;
1468
1469         rcu_read_lock();
1470         idr_for_each_entry(&drbd_devices, odev, i) {
1471                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1472                         continue;
1473                 if (!_drbd_may_sync_now(odev))
1474                         rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1475                                != SS_NOTHING_TO_DO);
1476         }
1477         rcu_read_unlock();
1478
1479         return rv;
1480 }
1481
1482 /**
1483  * _drbd_resume_next() - Resume resync on all devices that may resync now
1484  * @device:     DRBD device.
1485  *
1486  * Called from process context only (admin command and worker).
1487  */
1488 static int _drbd_resume_next(struct drbd_device *device)
1489 {
1490         struct drbd_device *odev;
1491         int i, rv = 0;
1492
1493         rcu_read_lock();
1494         idr_for_each_entry(&drbd_devices, odev, i) {
1495                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1496                         continue;
1497                 if (odev->state.aftr_isp) {
1498                         if (_drbd_may_sync_now(odev))
1499                                 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1500                                                         CS_HARD, NULL)
1501                                        != SS_NOTHING_TO_DO) ;
1502                 }
1503         }
1504         rcu_read_unlock();
1505         return rv;
1506 }
1507
1508 void resume_next_sg(struct drbd_device *device)
1509 {
1510         write_lock_irq(&global_state_lock);
1511         _drbd_resume_next(device);
1512         write_unlock_irq(&global_state_lock);
1513 }
1514
1515 void suspend_other_sg(struct drbd_device *device)
1516 {
1517         write_lock_irq(&global_state_lock);
1518         _drbd_pause_after(device);
1519         write_unlock_irq(&global_state_lock);
1520 }
1521
1522 /* caller must hold global_state_lock */
1523 enum drbd_ret_code drbd_resync_after_valid(struct drbd_device *device, int o_minor)
1524 {
1525         struct drbd_device *odev;
1526         int resync_after;
1527
1528         if (o_minor == -1)
1529                 return NO_ERROR;
1530         if (o_minor < -1 || o_minor > MINORMASK)
1531                 return ERR_RESYNC_AFTER;
1532
1533         /* check for loops */
1534         odev = minor_to_device(o_minor);
1535         while (1) {
1536                 if (odev == device)
1537                         return ERR_RESYNC_AFTER_CYCLE;
1538
1539                 /* You are free to depend on diskless, non-existing,
1540                  * or not yet/no longer existing minors.
1541                  * We only reject dependency loops.
1542                  * We cannot follow the dependency chain beyond a detached or
1543                  * missing minor.
1544                  */
1545                 if (!odev || !odev->ldev || odev->state.disk == D_DISKLESS)
1546                         return NO_ERROR;
1547
1548                 rcu_read_lock();
1549                 resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1550                 rcu_read_unlock();
1551                 /* dependency chain ends here, no cycles. */
1552                 if (resync_after == -1)
1553                         return NO_ERROR;
1554
1555                 /* follow the dependency chain */
1556                 odev = minor_to_device(resync_after);
1557         }
1558 }
1559
1560 /* caller must hold global_state_lock */
1561 void drbd_resync_after_changed(struct drbd_device *device)
1562 {
1563         int changes;
1564
1565         do {
1566                 changes  = _drbd_pause_after(device);
1567                 changes |= _drbd_resume_next(device);
1568         } while (changes);
1569 }
1570
1571 void drbd_rs_controller_reset(struct drbd_device *device)
1572 {
1573         struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk;
1574         struct fifo_buffer *plan;
1575
1576         atomic_set(&device->rs_sect_in, 0);
1577         atomic_set(&device->rs_sect_ev, 0);
1578         device->rs_in_flight = 0;
1579         device->rs_last_events =
1580                 (int)part_stat_read(&disk->part0, sectors[0]) +
1581                 (int)part_stat_read(&disk->part0, sectors[1]);
1582
1583         /* Updating the RCU protected object in place is necessary since
1584            this function gets called from atomic context.
1585            It is valid since all other updates also lead to an completely
1586            empty fifo */
1587         rcu_read_lock();
1588         plan = rcu_dereference(device->rs_plan_s);
1589         plan->total = 0;
1590         fifo_set(plan, 0);
1591         rcu_read_unlock();
1592 }
1593
1594 void start_resync_timer_fn(unsigned long data)
1595 {
1596         struct drbd_device *device = (struct drbd_device *) data;
1597         drbd_device_post_work(device, RS_START);
1598 }
1599
1600 static void do_start_resync(struct drbd_device *device)
1601 {
1602         if (atomic_read(&device->unacked_cnt) || atomic_read(&device->rs_pending_cnt)) {
1603                 drbd_warn(device, "postponing start_resync ...\n");
1604                 device->start_resync_timer.expires = jiffies + HZ/10;
1605                 add_timer(&device->start_resync_timer);
1606                 return;
1607         }
1608
1609         drbd_start_resync(device, C_SYNC_SOURCE);
1610         clear_bit(AHEAD_TO_SYNC_SOURCE, &device->flags);
1611 }
1612
1613 static bool use_checksum_based_resync(struct drbd_connection *connection, struct drbd_device *device)
1614 {
1615         bool csums_after_crash_only;
1616         rcu_read_lock();
1617         csums_after_crash_only = rcu_dereference(connection->net_conf)->csums_after_crash_only;
1618         rcu_read_unlock();
1619         return connection->agreed_pro_version >= 89 &&          /* supported? */
1620                 connection->csums_tfm &&                        /* configured? */
1621                 (csums_after_crash_only == 0                    /* use for each resync? */
1622                  || test_bit(CRASHED_PRIMARY, &device->flags)); /* or only after Primary crash? */
1623 }
1624
1625 /**
1626  * drbd_start_resync() - Start the resync process
1627  * @device:     DRBD device.
1628  * @side:       Either C_SYNC_SOURCE or C_SYNC_TARGET
1629  *
1630  * This function might bring you directly into one of the
1631  * C_PAUSED_SYNC_* states.
1632  */
1633 void drbd_start_resync(struct drbd_device *device, enum drbd_conns side)
1634 {
1635         struct drbd_peer_device *peer_device = first_peer_device(device);
1636         struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
1637         union drbd_state ns;
1638         int r;
1639
1640         if (device->state.conn >= C_SYNC_SOURCE && device->state.conn < C_AHEAD) {
1641                 drbd_err(device, "Resync already running!\n");
1642                 return;
1643         }
1644
1645         if (!test_bit(B_RS_H_DONE, &device->flags)) {
1646                 if (side == C_SYNC_TARGET) {
1647                         /* Since application IO was locked out during C_WF_BITMAP_T and
1648                            C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1649                            we check that we might make the data inconsistent. */
1650                         r = drbd_khelper(device, "before-resync-target");
1651                         r = (r >> 8) & 0xff;
1652                         if (r > 0) {
1653                                 drbd_info(device, "before-resync-target handler returned %d, "
1654                                          "dropping connection.\n", r);
1655                                 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
1656                                 return;
1657                         }
1658                 } else /* C_SYNC_SOURCE */ {
1659                         r = drbd_khelper(device, "before-resync-source");
1660                         r = (r >> 8) & 0xff;
1661                         if (r > 0) {
1662                                 if (r == 3) {
1663                                         drbd_info(device, "before-resync-source handler returned %d, "
1664                                                  "ignoring. Old userland tools?", r);
1665                                 } else {
1666                                         drbd_info(device, "before-resync-source handler returned %d, "
1667                                                  "dropping connection.\n", r);
1668                                         conn_request_state(connection,
1669                                                            NS(conn, C_DISCONNECTING), CS_HARD);
1670                                         return;
1671                                 }
1672                         }
1673                 }
1674         }
1675
1676         if (current == connection->worker.task) {
1677                 /* The worker should not sleep waiting for state_mutex,
1678                    that can take long */
1679                 if (!mutex_trylock(device->state_mutex)) {
1680                         set_bit(B_RS_H_DONE, &device->flags);
1681                         device->start_resync_timer.expires = jiffies + HZ/5;
1682                         add_timer(&device->start_resync_timer);
1683                         return;
1684                 }
1685         } else {
1686                 mutex_lock(device->state_mutex);
1687         }
1688         clear_bit(B_RS_H_DONE, &device->flags);
1689
1690         /* req_lock: serialize with drbd_send_and_submit() and others
1691          * global_state_lock: for stable sync-after dependencies */
1692         spin_lock_irq(&device->resource->req_lock);
1693         write_lock(&global_state_lock);
1694         /* Did some connection breakage or IO error race with us? */
1695         if (device->state.conn < C_CONNECTED
1696         || !get_ldev_if_state(device, D_NEGOTIATING)) {
1697                 write_unlock(&global_state_lock);
1698                 spin_unlock_irq(&device->resource->req_lock);
1699                 mutex_unlock(device->state_mutex);
1700                 return;
1701         }
1702
1703         ns = drbd_read_state(device);
1704
1705         ns.aftr_isp = !_drbd_may_sync_now(device);
1706
1707         ns.conn = side;
1708
1709         if (side == C_SYNC_TARGET)
1710                 ns.disk = D_INCONSISTENT;
1711         else /* side == C_SYNC_SOURCE */
1712                 ns.pdsk = D_INCONSISTENT;
1713
1714         r = __drbd_set_state(device, ns, CS_VERBOSE, NULL);
1715         ns = drbd_read_state(device);
1716
1717         if (ns.conn < C_CONNECTED)
1718                 r = SS_UNKNOWN_ERROR;
1719
1720         if (r == SS_SUCCESS) {
1721                 unsigned long tw = drbd_bm_total_weight(device);
1722                 unsigned long now = jiffies;
1723                 int i;
1724
1725                 device->rs_failed    = 0;
1726                 device->rs_paused    = 0;
1727                 device->rs_same_csum = 0;
1728                 device->rs_last_sect_ev = 0;
1729                 device->rs_total     = tw;
1730                 device->rs_start     = now;
1731                 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1732                         device->rs_mark_left[i] = tw;
1733                         device->rs_mark_time[i] = now;
1734                 }
1735                 _drbd_pause_after(device);
1736                 /* Forget potentially stale cached per resync extent bit-counts.
1737                  * Open coded drbd_rs_cancel_all(device), we already have IRQs
1738                  * disabled, and know the disk state is ok. */
1739                 spin_lock(&device->al_lock);
1740                 lc_reset(device->resync);
1741                 device->resync_locked = 0;
1742                 device->resync_wenr = LC_FREE;
1743                 spin_unlock(&device->al_lock);
1744         }
1745         write_unlock(&global_state_lock);
1746         spin_unlock_irq(&device->resource->req_lock);
1747
1748         if (r == SS_SUCCESS) {
1749                 wake_up(&device->al_wait); /* for lc_reset() above */
1750                 /* reset rs_last_bcast when a resync or verify is started,
1751                  * to deal with potential jiffies wrap. */
1752                 device->rs_last_bcast = jiffies - HZ;
1753
1754                 drbd_info(device, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1755                      drbd_conn_str(ns.conn),
1756                      (unsigned long) device->rs_total << (BM_BLOCK_SHIFT-10),
1757                      (unsigned long) device->rs_total);
1758                 if (side == C_SYNC_TARGET) {
1759                         device->bm_resync_fo = 0;
1760                         device->use_csums = use_checksum_based_resync(connection, device);
1761                 } else {
1762                         device->use_csums = 0;
1763                 }
1764
1765                 /* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1766                  * with w_send_oos, or the sync target will get confused as to
1767                  * how much bits to resync.  We cannot do that always, because for an
1768                  * empty resync and protocol < 95, we need to do it here, as we call
1769                  * drbd_resync_finished from here in that case.
1770                  * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1771                  * and from after_state_ch otherwise. */
1772                 if (side == C_SYNC_SOURCE && connection->agreed_pro_version < 96)
1773                         drbd_gen_and_send_sync_uuid(peer_device);
1774
1775                 if (connection->agreed_pro_version < 95 && device->rs_total == 0) {
1776                         /* This still has a race (about when exactly the peers
1777                          * detect connection loss) that can lead to a full sync
1778                          * on next handshake. In 8.3.9 we fixed this with explicit
1779                          * resync-finished notifications, but the fix
1780                          * introduces a protocol change.  Sleeping for some
1781                          * time longer than the ping interval + timeout on the
1782                          * SyncSource, to give the SyncTarget the chance to
1783                          * detect connection loss, then waiting for a ping
1784                          * response (implicit in drbd_resync_finished) reduces
1785                          * the race considerably, but does not solve it. */
1786                         if (side == C_SYNC_SOURCE) {
1787                                 struct net_conf *nc;
1788                                 int timeo;
1789
1790                                 rcu_read_lock();
1791                                 nc = rcu_dereference(connection->net_conf);
1792                                 timeo = nc->ping_int * HZ + nc->ping_timeo * HZ / 9;
1793                                 rcu_read_unlock();
1794                                 schedule_timeout_interruptible(timeo);
1795                         }
1796                         drbd_resync_finished(device);
1797                 }
1798
1799                 drbd_rs_controller_reset(device);
1800                 /* ns.conn may already be != device->state.conn,
1801                  * we may have been paused in between, or become paused until
1802                  * the timer triggers.
1803                  * No matter, that is handled in resync_timer_fn() */
1804                 if (ns.conn == C_SYNC_TARGET)
1805                         mod_timer(&device->resync_timer, jiffies);
1806
1807                 drbd_md_sync(device);
1808         }
1809         put_ldev(device);
1810         mutex_unlock(device->state_mutex);
1811 }
1812
1813 static void update_on_disk_bitmap(struct drbd_device *device, bool resync_done)
1814 {
1815         struct sib_info sib = { .sib_reason = SIB_SYNC_PROGRESS, };
1816         device->rs_last_bcast = jiffies;
1817
1818         if (!get_ldev(device))
1819                 return;
1820
1821         drbd_bm_write_lazy(device, 0);
1822         if (resync_done && is_sync_state(device->state.conn))
1823                 drbd_resync_finished(device);
1824
1825         drbd_bcast_event(device, &sib);
1826         /* update timestamp, in case it took a while to write out stuff */
1827         device->rs_last_bcast = jiffies;
1828         put_ldev(device);
1829 }
1830
1831 static void drbd_ldev_destroy(struct drbd_device *device)
1832 {
1833         lc_destroy(device->resync);
1834         device->resync = NULL;
1835         lc_destroy(device->act_log);
1836         device->act_log = NULL;
1837
1838         __acquire(local);
1839         drbd_free_ldev(device->ldev);
1840         device->ldev = NULL;
1841         __release(local);
1842
1843         clear_bit(GOING_DISKLESS, &device->flags);
1844         wake_up(&device->misc_wait);
1845 }
1846
1847 static void go_diskless(struct drbd_device *device)
1848 {
1849         D_ASSERT(device, device->state.disk == D_FAILED);
1850         /* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
1851          * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
1852          * the protected members anymore, though, so once put_ldev reaches zero
1853          * again, it will be safe to free them. */
1854
1855         /* Try to write changed bitmap pages, read errors may have just
1856          * set some bits outside the area covered by the activity log.
1857          *
1858          * If we have an IO error during the bitmap writeout,
1859          * we will want a full sync next time, just in case.
1860          * (Do we want a specific meta data flag for this?)
1861          *
1862          * If that does not make it to stable storage either,
1863          * we cannot do anything about that anymore.
1864          *
1865          * We still need to check if both bitmap and ldev are present, we may
1866          * end up here after a failed attach, before ldev was even assigned.
1867          */
1868         if (device->bitmap && device->ldev) {
1869                 /* An interrupted resync or similar is allowed to recounts bits
1870                  * while we detach.
1871                  * Any modifications would not be expected anymore, though.
1872                  */
1873                 if (drbd_bitmap_io_from_worker(device, drbd_bm_write,
1874                                         "detach", BM_LOCKED_TEST_ALLOWED)) {
1875                         if (test_bit(WAS_READ_ERROR, &device->flags)) {
1876                                 drbd_md_set_flag(device, MDF_FULL_SYNC);
1877                                 drbd_md_sync(device);
1878                         }
1879                 }
1880         }
1881
1882         drbd_force_state(device, NS(disk, D_DISKLESS));
1883 }
1884
1885 static int do_md_sync(struct drbd_device *device)
1886 {
1887         drbd_warn(device, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
1888         drbd_md_sync(device);
1889         return 0;
1890 }
1891
1892 /* only called from drbd_worker thread, no locking */
1893 void __update_timing_details(
1894                 struct drbd_thread_timing_details *tdp,
1895                 unsigned int *cb_nr,
1896                 void *cb,
1897                 const char *fn, const unsigned int line)
1898 {
1899         unsigned int i = *cb_nr % DRBD_THREAD_DETAILS_HIST;
1900         struct drbd_thread_timing_details *td = tdp + i;
1901
1902         td->start_jif = jiffies;
1903         td->cb_addr = cb;
1904         td->caller_fn = fn;
1905         td->line = line;
1906         td->cb_nr = *cb_nr;
1907
1908         i = (i+1) % DRBD_THREAD_DETAILS_HIST;
1909         td = tdp + i;
1910         memset(td, 0, sizeof(*td));
1911
1912         ++(*cb_nr);
1913 }
1914
1915 static void do_device_work(struct drbd_device *device, const unsigned long todo)
1916 {
1917         if (test_bit(MD_SYNC, &todo))
1918                 do_md_sync(device);
1919         if (test_bit(RS_DONE, &todo) ||
1920             test_bit(RS_PROGRESS, &todo))
1921                 update_on_disk_bitmap(device, test_bit(RS_DONE, &todo));
1922         if (test_bit(GO_DISKLESS, &todo))
1923                 go_diskless(device);
1924         if (test_bit(DESTROY_DISK, &todo))
1925                 drbd_ldev_destroy(device);
1926         if (test_bit(RS_START, &todo))
1927                 do_start_resync(device);
1928 }
1929
1930 #define DRBD_DEVICE_WORK_MASK   \
1931         ((1UL << GO_DISKLESS)   \
1932         |(1UL << DESTROY_DISK)  \
1933         |(1UL << MD_SYNC)       \
1934         |(1UL << RS_START)      \
1935         |(1UL << RS_PROGRESS)   \
1936         |(1UL << RS_DONE)       \
1937         )
1938
1939 static unsigned long get_work_bits(unsigned long *flags)
1940 {
1941         unsigned long old, new;
1942         do {
1943                 old = *flags;
1944                 new = old & ~DRBD_DEVICE_WORK_MASK;
1945         } while (cmpxchg(flags, old, new) != old);
1946         return old & DRBD_DEVICE_WORK_MASK;
1947 }
1948
1949 static void do_unqueued_work(struct drbd_connection *connection)
1950 {
1951         struct drbd_peer_device *peer_device;
1952         int vnr;
1953
1954         rcu_read_lock();
1955         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1956                 struct drbd_device *device = peer_device->device;
1957                 unsigned long todo = get_work_bits(&device->flags);
1958                 if (!todo)
1959                         continue;
1960
1961                 kref_get(&device->kref);
1962                 rcu_read_unlock();
1963                 do_device_work(device, todo);
1964                 kref_put(&device->kref, drbd_destroy_device);
1965                 rcu_read_lock();
1966         }
1967         rcu_read_unlock();
1968 }
1969
1970 static bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list)
1971 {
1972         spin_lock_irq(&queue->q_lock);
1973         list_splice_tail_init(&queue->q, work_list);
1974         spin_unlock_irq(&queue->q_lock);
1975         return !list_empty(work_list);
1976 }
1977
1978 static void wait_for_work(struct drbd_connection *connection, struct list_head *work_list)
1979 {
1980         DEFINE_WAIT(wait);
1981         struct net_conf *nc;
1982         int uncork, cork;
1983
1984         dequeue_work_batch(&connection->sender_work, work_list);
1985         if (!list_empty(work_list))
1986                 return;
1987
1988         /* Still nothing to do?
1989          * Maybe we still need to close the current epoch,
1990          * even if no new requests are queued yet.
1991          *
1992          * Also, poke TCP, just in case.
1993          * Then wait for new work (or signal). */
1994         rcu_read_lock();
1995         nc = rcu_dereference(connection->net_conf);
1996         uncork = nc ? nc->tcp_cork : 0;
1997         rcu_read_unlock();
1998         if (uncork) {
1999                 mutex_lock(&connection->data.mutex);
2000                 if (connection->data.socket)
2001                         drbd_tcp_uncork(connection->data.socket);
2002                 mutex_unlock(&connection->data.mutex);
2003         }
2004
2005         for (;;) {
2006                 int send_barrier;
2007                 prepare_to_wait(&connection->sender_work.q_wait, &wait, TASK_INTERRUPTIBLE);
2008                 spin_lock_irq(&connection->resource->req_lock);
2009                 spin_lock(&connection->sender_work.q_lock);     /* FIXME get rid of this one? */
2010                 if (!list_empty(&connection->sender_work.q))
2011                         list_splice_tail_init(&connection->sender_work.q, work_list);
2012                 spin_unlock(&connection->sender_work.q_lock);   /* FIXME get rid of this one? */
2013                 if (!list_empty(work_list) || signal_pending(current)) {
2014                         spin_unlock_irq(&connection->resource->req_lock);
2015                         break;
2016                 }
2017
2018                 /* We found nothing new to do, no to-be-communicated request,
2019                  * no other work item.  We may still need to close the last
2020                  * epoch.  Next incoming request epoch will be connection ->
2021                  * current transfer log epoch number.  If that is different
2022                  * from the epoch of the last request we communicated, it is
2023                  * safe to send the epoch separating barrier now.
2024                  */
2025                 send_barrier =
2026                         atomic_read(&connection->current_tle_nr) !=
2027                         connection->send.current_epoch_nr;
2028                 spin_unlock_irq(&connection->resource->req_lock);
2029
2030                 if (send_barrier)
2031                         maybe_send_barrier(connection,
2032                                         connection->send.current_epoch_nr + 1);
2033
2034                 if (test_bit(DEVICE_WORK_PENDING, &connection->flags))
2035                         break;
2036
2037                 /* drbd_send() may have called flush_signals() */
2038                 if (get_t_state(&connection->worker) != RUNNING)
2039                         break;
2040
2041                 schedule();
2042                 /* may be woken up for other things but new work, too,
2043                  * e.g. if the current epoch got closed.
2044                  * In which case we send the barrier above. */
2045         }
2046         finish_wait(&connection->sender_work.q_wait, &wait);
2047
2048         /* someone may have changed the config while we have been waiting above. */
2049         rcu_read_lock();
2050         nc = rcu_dereference(connection->net_conf);
2051         cork = nc ? nc->tcp_cork : 0;
2052         rcu_read_unlock();
2053         mutex_lock(&connection->data.mutex);
2054         if (connection->data.socket) {
2055                 if (cork)
2056                         drbd_tcp_cork(connection->data.socket);
2057                 else if (!uncork)
2058                         drbd_tcp_uncork(connection->data.socket);
2059         }
2060         mutex_unlock(&connection->data.mutex);
2061 }
2062
2063 int drbd_worker(struct drbd_thread *thi)
2064 {
2065         struct drbd_connection *connection = thi->connection;
2066         struct drbd_work *w = NULL;
2067         struct drbd_peer_device *peer_device;
2068         LIST_HEAD(work_list);
2069         int vnr;
2070
2071         while (get_t_state(thi) == RUNNING) {
2072                 drbd_thread_current_set_cpu(thi);
2073
2074                 if (list_empty(&work_list)) {
2075                         update_worker_timing_details(connection, wait_for_work);
2076                         wait_for_work(connection, &work_list);
2077                 }
2078
2079                 if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2080                         update_worker_timing_details(connection, do_unqueued_work);
2081                         do_unqueued_work(connection);
2082                 }
2083
2084                 if (signal_pending(current)) {
2085                         flush_signals(current);
2086                         if (get_t_state(thi) == RUNNING) {
2087                                 drbd_warn(connection, "Worker got an unexpected signal\n");
2088                                 continue;
2089                         }
2090                         break;
2091                 }
2092
2093                 if (get_t_state(thi) != RUNNING)
2094                         break;
2095
2096                 if (!list_empty(&work_list)) {
2097                         w = list_first_entry(&work_list, struct drbd_work, list);
2098                         list_del_init(&w->list);
2099                         update_worker_timing_details(connection, w->cb);
2100                         if (w->cb(w, connection->cstate < C_WF_REPORT_PARAMS) == 0)
2101                                 continue;
2102                         if (connection->cstate >= C_WF_REPORT_PARAMS)
2103                                 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
2104                 }
2105         }
2106
2107         do {
2108                 if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2109                         update_worker_timing_details(connection, do_unqueued_work);
2110                         do_unqueued_work(connection);
2111                 }
2112                 if (!list_empty(&work_list)) {
2113                         w = list_first_entry(&work_list, struct drbd_work, list);
2114                         list_del_init(&w->list);
2115                         update_worker_timing_details(connection, w->cb);
2116                         w->cb(w, 1);
2117                 } else
2118                         dequeue_work_batch(&connection->sender_work, &work_list);
2119         } while (!list_empty(&work_list) || test_bit(DEVICE_WORK_PENDING, &connection->flags));
2120
2121         rcu_read_lock();
2122         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
2123                 struct drbd_device *device = peer_device->device;
2124                 D_ASSERT(device, device->state.disk == D_DISKLESS && device->state.conn == C_STANDALONE);
2125                 kref_get(&device->kref);
2126                 rcu_read_unlock();
2127                 drbd_device_cleanup(device);
2128                 kref_put(&device->kref, drbd_destroy_device);
2129                 rcu_read_lock();
2130         }
2131         rcu_read_unlock();
2132
2133         return 0;
2134 }