Add the rt linux 4.1.3-rt3 as base
[kvmfornfv.git] / kernel / drivers / block / drbd / drbd_worker.c
1 /*
2    drbd_worker.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23
24 */
25
26 #include <linux/module.h>
27 #include <linux/drbd.h>
28 #include <linux/sched.h>
29 #include <linux/wait.h>
30 #include <linux/mm.h>
31 #include <linux/memcontrol.h>
32 #include <linux/mm_inline.h>
33 #include <linux/slab.h>
34 #include <linux/random.h>
35 #include <linux/string.h>
36 #include <linux/scatterlist.h>
37
38 #include "drbd_int.h"
39 #include "drbd_protocol.h"
40 #include "drbd_req.h"
41
42 static int make_ov_request(struct drbd_device *, int);
43 static int make_resync_request(struct drbd_device *, int);
44
45 /* endio handlers:
46  *   drbd_md_endio (defined here)
47  *   drbd_request_endio (defined here)
48  *   drbd_peer_request_endio (defined here)
49  *   drbd_bm_endio (defined in drbd_bitmap.c)
50  *
51  * For all these callbacks, note the following:
52  * The callbacks will be called in irq context by the IDE drivers,
53  * and in Softirqs/Tasklets/BH context by the SCSI drivers.
54  * Try to get the locking right :)
55  *
56  */
57
58
59 /* About the global_state_lock
60    Each state transition on an device holds a read lock. In case we have
61    to evaluate the resync after dependencies, we grab a write lock, because
62    we need stable states on all devices for that.  */
63 rwlock_t global_state_lock;
64
65 /* used for synchronous meta data and bitmap IO
66  * submitted by drbd_md_sync_page_io()
67  */
68 void drbd_md_endio(struct bio *bio, int error)
69 {
70         struct drbd_device *device;
71
72         device = bio->bi_private;
73         device->md_io.error = error;
74
75         /* We grabbed an extra reference in _drbd_md_sync_page_io() to be able
76          * to timeout on the lower level device, and eventually detach from it.
77          * If this io completion runs after that timeout expired, this
78          * drbd_md_put_buffer() may allow us to finally try and re-attach.
79          * During normal operation, this only puts that extra reference
80          * down to 1 again.
81          * Make sure we first drop the reference, and only then signal
82          * completion, or we may (in drbd_al_read_log()) cycle so fast into the
83          * next drbd_md_sync_page_io(), that we trigger the
84          * ASSERT(atomic_read(&device->md_io_in_use) == 1) there.
85          */
86         drbd_md_put_buffer(device);
87         device->md_io.done = 1;
88         wake_up(&device->misc_wait);
89         bio_put(bio);
90         if (device->ldev) /* special case: drbd_md_read() during drbd_adm_attach() */
91                 put_ldev(device);
92 }
93
94 /* reads on behalf of the partner,
95  * "submitted" by the receiver
96  */
97 static void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local)
98 {
99         unsigned long flags = 0;
100         struct drbd_peer_device *peer_device = peer_req->peer_device;
101         struct drbd_device *device = peer_device->device;
102
103         spin_lock_irqsave(&device->resource->req_lock, flags);
104         device->read_cnt += peer_req->i.size >> 9;
105         list_del(&peer_req->w.list);
106         if (list_empty(&device->read_ee))
107                 wake_up(&device->ee_wait);
108         if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
109                 __drbd_chk_io_error(device, DRBD_READ_ERROR);
110         spin_unlock_irqrestore(&device->resource->req_lock, flags);
111
112         drbd_queue_work(&peer_device->connection->sender_work, &peer_req->w);
113         put_ldev(device);
114 }
115
116 /* writes on behalf of the partner, or resync writes,
117  * "submitted" by the receiver, final stage.  */
118 void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local)
119 {
120         unsigned long flags = 0;
121         struct drbd_peer_device *peer_device = peer_req->peer_device;
122         struct drbd_device *device = peer_device->device;
123         struct drbd_interval i;
124         int do_wake;
125         u64 block_id;
126         int do_al_complete_io;
127
128         /* after we moved peer_req to done_ee,
129          * we may no longer access it,
130          * it may be freed/reused already!
131          * (as soon as we release the req_lock) */
132         i = peer_req->i;
133         do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO;
134         block_id = peer_req->block_id;
135         peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
136
137         spin_lock_irqsave(&device->resource->req_lock, flags);
138         device->writ_cnt += peer_req->i.size >> 9;
139         list_move_tail(&peer_req->w.list, &device->done_ee);
140
141         /*
142          * Do not remove from the write_requests tree here: we did not send the
143          * Ack yet and did not wake possibly waiting conflicting requests.
144          * Removed from the tree from "drbd_process_done_ee" within the
145          * appropriate dw.cb (e_end_block/e_end_resync_block) or from
146          * _drbd_clear_done_ee.
147          */
148
149         do_wake = list_empty(block_id == ID_SYNCER ? &device->sync_ee : &device->active_ee);
150
151         /* FIXME do we want to detach for failed REQ_DISCARD?
152          * ((peer_req->flags & (EE_WAS_ERROR|EE_IS_TRIM)) == EE_WAS_ERROR) */
153         if (peer_req->flags & EE_WAS_ERROR)
154                 __drbd_chk_io_error(device, DRBD_WRITE_ERROR);
155         spin_unlock_irqrestore(&device->resource->req_lock, flags);
156
157         if (block_id == ID_SYNCER)
158                 drbd_rs_complete_io(device, i.sector);
159
160         if (do_wake)
161                 wake_up(&device->ee_wait);
162
163         if (do_al_complete_io)
164                 drbd_al_complete_io(device, &i);
165
166         wake_asender(peer_device->connection);
167         put_ldev(device);
168 }
169
170 /* writes on behalf of the partner, or resync writes,
171  * "submitted" by the receiver.
172  */
173 void drbd_peer_request_endio(struct bio *bio, int error)
174 {
175         struct drbd_peer_request *peer_req = bio->bi_private;
176         struct drbd_device *device = peer_req->peer_device->device;
177         int uptodate = bio_flagged(bio, BIO_UPTODATE);
178         int is_write = bio_data_dir(bio) == WRITE;
179         int is_discard = !!(bio->bi_rw & REQ_DISCARD);
180
181         if (error && __ratelimit(&drbd_ratelimit_state))
182                 drbd_warn(device, "%s: error=%d s=%llus\n",
183                                 is_write ? (is_discard ? "discard" : "write")
184                                         : "read", error,
185                                 (unsigned long long)peer_req->i.sector);
186         if (!error && !uptodate) {
187                 if (__ratelimit(&drbd_ratelimit_state))
188                         drbd_warn(device, "%s: setting error to -EIO s=%llus\n",
189                                         is_write ? "write" : "read",
190                                         (unsigned long long)peer_req->i.sector);
191                 /* strange behavior of some lower level drivers...
192                  * fail the request by clearing the uptodate flag,
193                  * but do not return any error?! */
194                 error = -EIO;
195         }
196
197         if (error)
198                 set_bit(__EE_WAS_ERROR, &peer_req->flags);
199
200         bio_put(bio); /* no need for the bio anymore */
201         if (atomic_dec_and_test(&peer_req->pending_bios)) {
202                 if (is_write)
203                         drbd_endio_write_sec_final(peer_req);
204                 else
205                         drbd_endio_read_sec_final(peer_req);
206         }
207 }
208
209 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
210  */
211 void drbd_request_endio(struct bio *bio, int error)
212 {
213         unsigned long flags;
214         struct drbd_request *req = bio->bi_private;
215         struct drbd_device *device = req->device;
216         struct bio_and_error m;
217         enum drbd_req_event what;
218         int uptodate = bio_flagged(bio, BIO_UPTODATE);
219
220         if (!error && !uptodate) {
221                 drbd_warn(device, "p %s: setting error to -EIO\n",
222                          bio_data_dir(bio) == WRITE ? "write" : "read");
223                 /* strange behavior of some lower level drivers...
224                  * fail the request by clearing the uptodate flag,
225                  * but do not return any error?! */
226                 error = -EIO;
227         }
228
229
230         /* If this request was aborted locally before,
231          * but now was completed "successfully",
232          * chances are that this caused arbitrary data corruption.
233          *
234          * "aborting" requests, or force-detaching the disk, is intended for
235          * completely blocked/hung local backing devices which do no longer
236          * complete requests at all, not even do error completions.  In this
237          * situation, usually a hard-reset and failover is the only way out.
238          *
239          * By "aborting", basically faking a local error-completion,
240          * we allow for a more graceful swichover by cleanly migrating services.
241          * Still the affected node has to be rebooted "soon".
242          *
243          * By completing these requests, we allow the upper layers to re-use
244          * the associated data pages.
245          *
246          * If later the local backing device "recovers", and now DMAs some data
247          * from disk into the original request pages, in the best case it will
248          * just put random data into unused pages; but typically it will corrupt
249          * meanwhile completely unrelated data, causing all sorts of damage.
250          *
251          * Which means delayed successful completion,
252          * especially for READ requests,
253          * is a reason to panic().
254          *
255          * We assume that a delayed *error* completion is OK,
256          * though we still will complain noisily about it.
257          */
258         if (unlikely(req->rq_state & RQ_LOCAL_ABORTED)) {
259                 if (__ratelimit(&drbd_ratelimit_state))
260                         drbd_emerg(device, "delayed completion of aborted local request; disk-timeout may be too aggressive\n");
261
262                 if (!error)
263                         panic("possible random memory corruption caused by delayed completion of aborted local request\n");
264         }
265
266         /* to avoid recursion in __req_mod */
267         if (unlikely(error)) {
268                 if (bio->bi_rw & REQ_DISCARD)
269                         what = (error == -EOPNOTSUPP)
270                                 ? DISCARD_COMPLETED_NOTSUPP
271                                 : DISCARD_COMPLETED_WITH_ERROR;
272                 else
273                         what = (bio_data_dir(bio) == WRITE)
274                         ? WRITE_COMPLETED_WITH_ERROR
275                         : (bio_rw(bio) == READ)
276                           ? READ_COMPLETED_WITH_ERROR
277                           : READ_AHEAD_COMPLETED_WITH_ERROR;
278         } else
279                 what = COMPLETED_OK;
280
281         bio_put(req->private_bio);
282         req->private_bio = ERR_PTR(error);
283
284         /* not req_mod(), we need irqsave here! */
285         spin_lock_irqsave(&device->resource->req_lock, flags);
286         __req_mod(req, what, &m);
287         spin_unlock_irqrestore(&device->resource->req_lock, flags);
288         put_ldev(device);
289
290         if (m.bio)
291                 complete_master_bio(device, &m);
292 }
293
294 void drbd_csum_ee(struct crypto_hash *tfm, struct drbd_peer_request *peer_req, void *digest)
295 {
296         struct hash_desc desc;
297         struct scatterlist sg;
298         struct page *page = peer_req->pages;
299         struct page *tmp;
300         unsigned len;
301
302         desc.tfm = tfm;
303         desc.flags = 0;
304
305         sg_init_table(&sg, 1);
306         crypto_hash_init(&desc);
307
308         while ((tmp = page_chain_next(page))) {
309                 /* all but the last page will be fully used */
310                 sg_set_page(&sg, page, PAGE_SIZE, 0);
311                 crypto_hash_update(&desc, &sg, sg.length);
312                 page = tmp;
313         }
314         /* and now the last, possibly only partially used page */
315         len = peer_req->i.size & (PAGE_SIZE - 1);
316         sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
317         crypto_hash_update(&desc, &sg, sg.length);
318         crypto_hash_final(&desc, digest);
319 }
320
321 void drbd_csum_bio(struct crypto_hash *tfm, struct bio *bio, void *digest)
322 {
323         struct hash_desc desc;
324         struct scatterlist sg;
325         struct bio_vec bvec;
326         struct bvec_iter iter;
327
328         desc.tfm = tfm;
329         desc.flags = 0;
330
331         sg_init_table(&sg, 1);
332         crypto_hash_init(&desc);
333
334         bio_for_each_segment(bvec, bio, iter) {
335                 sg_set_page(&sg, bvec.bv_page, bvec.bv_len, bvec.bv_offset);
336                 crypto_hash_update(&desc, &sg, sg.length);
337         }
338         crypto_hash_final(&desc, digest);
339 }
340
341 /* MAYBE merge common code with w_e_end_ov_req */
342 static int w_e_send_csum(struct drbd_work *w, int cancel)
343 {
344         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
345         struct drbd_peer_device *peer_device = peer_req->peer_device;
346         struct drbd_device *device = peer_device->device;
347         int digest_size;
348         void *digest;
349         int err = 0;
350
351         if (unlikely(cancel))
352                 goto out;
353
354         if (unlikely((peer_req->flags & EE_WAS_ERROR) != 0))
355                 goto out;
356
357         digest_size = crypto_hash_digestsize(peer_device->connection->csums_tfm);
358         digest = kmalloc(digest_size, GFP_NOIO);
359         if (digest) {
360                 sector_t sector = peer_req->i.sector;
361                 unsigned int size = peer_req->i.size;
362                 drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
363                 /* Free peer_req and pages before send.
364                  * In case we block on congestion, we could otherwise run into
365                  * some distributed deadlock, if the other side blocks on
366                  * congestion as well, because our receiver blocks in
367                  * drbd_alloc_pages due to pp_in_use > max_buffers. */
368                 drbd_free_peer_req(device, peer_req);
369                 peer_req = NULL;
370                 inc_rs_pending(device);
371                 err = drbd_send_drequest_csum(peer_device, sector, size,
372                                               digest, digest_size,
373                                               P_CSUM_RS_REQUEST);
374                 kfree(digest);
375         } else {
376                 drbd_err(device, "kmalloc() of digest failed.\n");
377                 err = -ENOMEM;
378         }
379
380 out:
381         if (peer_req)
382                 drbd_free_peer_req(device, peer_req);
383
384         if (unlikely(err))
385                 drbd_err(device, "drbd_send_drequest(..., csum) failed\n");
386         return err;
387 }
388
389 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
390
391 static int read_for_csum(struct drbd_peer_device *peer_device, sector_t sector, int size)
392 {
393         struct drbd_device *device = peer_device->device;
394         struct drbd_peer_request *peer_req;
395
396         if (!get_ldev(device))
397                 return -EIO;
398
399         /* GFP_TRY, because if there is no memory available right now, this may
400          * be rescheduled for later. It is "only" background resync, after all. */
401         peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER /* unused */, sector,
402                                        size, true /* has real payload */, GFP_TRY);
403         if (!peer_req)
404                 goto defer;
405
406         peer_req->w.cb = w_e_send_csum;
407         spin_lock_irq(&device->resource->req_lock);
408         list_add_tail(&peer_req->w.list, &device->read_ee);
409         spin_unlock_irq(&device->resource->req_lock);
410
411         atomic_add(size >> 9, &device->rs_sect_ev);
412         if (drbd_submit_peer_request(device, peer_req, READ, DRBD_FAULT_RS_RD) == 0)
413                 return 0;
414
415         /* If it failed because of ENOMEM, retry should help.  If it failed
416          * because bio_add_page failed (probably broken lower level driver),
417          * retry may or may not help.
418          * If it does not, you may need to force disconnect. */
419         spin_lock_irq(&device->resource->req_lock);
420         list_del(&peer_req->w.list);
421         spin_unlock_irq(&device->resource->req_lock);
422
423         drbd_free_peer_req(device, peer_req);
424 defer:
425         put_ldev(device);
426         return -EAGAIN;
427 }
428
429 int w_resync_timer(struct drbd_work *w, int cancel)
430 {
431         struct drbd_device *device =
432                 container_of(w, struct drbd_device, resync_work);
433
434         switch (device->state.conn) {
435         case C_VERIFY_S:
436                 make_ov_request(device, cancel);
437                 break;
438         case C_SYNC_TARGET:
439                 make_resync_request(device, cancel);
440                 break;
441         }
442
443         return 0;
444 }
445
446 void resync_timer_fn(unsigned long data)
447 {
448         struct drbd_device *device = (struct drbd_device *) data;
449
450         drbd_queue_work_if_unqueued(
451                 &first_peer_device(device)->connection->sender_work,
452                 &device->resync_work);
453 }
454
455 static void fifo_set(struct fifo_buffer *fb, int value)
456 {
457         int i;
458
459         for (i = 0; i < fb->size; i++)
460                 fb->values[i] = value;
461 }
462
463 static int fifo_push(struct fifo_buffer *fb, int value)
464 {
465         int ov;
466
467         ov = fb->values[fb->head_index];
468         fb->values[fb->head_index++] = value;
469
470         if (fb->head_index >= fb->size)
471                 fb->head_index = 0;
472
473         return ov;
474 }
475
476 static void fifo_add_val(struct fifo_buffer *fb, int value)
477 {
478         int i;
479
480         for (i = 0; i < fb->size; i++)
481                 fb->values[i] += value;
482 }
483
484 struct fifo_buffer *fifo_alloc(int fifo_size)
485 {
486         struct fifo_buffer *fb;
487
488         fb = kzalloc(sizeof(struct fifo_buffer) + sizeof(int) * fifo_size, GFP_NOIO);
489         if (!fb)
490                 return NULL;
491
492         fb->head_index = 0;
493         fb->size = fifo_size;
494         fb->total = 0;
495
496         return fb;
497 }
498
499 static int drbd_rs_controller(struct drbd_device *device, unsigned int sect_in)
500 {
501         struct disk_conf *dc;
502         unsigned int want;     /* The number of sectors we want in-flight */
503         int req_sect; /* Number of sectors to request in this turn */
504         int correction; /* Number of sectors more we need in-flight */
505         int cps; /* correction per invocation of drbd_rs_controller() */
506         int steps; /* Number of time steps to plan ahead */
507         int curr_corr;
508         int max_sect;
509         struct fifo_buffer *plan;
510
511         dc = rcu_dereference(device->ldev->disk_conf);
512         plan = rcu_dereference(device->rs_plan_s);
513
514         steps = plan->size; /* (dc->c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
515
516         if (device->rs_in_flight + sect_in == 0) { /* At start of resync */
517                 want = ((dc->resync_rate * 2 * SLEEP_TIME) / HZ) * steps;
518         } else { /* normal path */
519                 want = dc->c_fill_target ? dc->c_fill_target :
520                         sect_in * dc->c_delay_target * HZ / (SLEEP_TIME * 10);
521         }
522
523         correction = want - device->rs_in_flight - plan->total;
524
525         /* Plan ahead */
526         cps = correction / steps;
527         fifo_add_val(plan, cps);
528         plan->total += cps * steps;
529
530         /* What we do in this step */
531         curr_corr = fifo_push(plan, 0);
532         plan->total -= curr_corr;
533
534         req_sect = sect_in + curr_corr;
535         if (req_sect < 0)
536                 req_sect = 0;
537
538         max_sect = (dc->c_max_rate * 2 * SLEEP_TIME) / HZ;
539         if (req_sect > max_sect)
540                 req_sect = max_sect;
541
542         /*
543         drbd_warn(device, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
544                  sect_in, device->rs_in_flight, want, correction,
545                  steps, cps, device->rs_planed, curr_corr, req_sect);
546         */
547
548         return req_sect;
549 }
550
551 static int drbd_rs_number_requests(struct drbd_device *device)
552 {
553         unsigned int sect_in;  /* Number of sectors that came in since the last turn */
554         int number, mxb;
555
556         sect_in = atomic_xchg(&device->rs_sect_in, 0);
557         device->rs_in_flight -= sect_in;
558
559         rcu_read_lock();
560         mxb = drbd_get_max_buffers(device) / 2;
561         if (rcu_dereference(device->rs_plan_s)->size) {
562                 number = drbd_rs_controller(device, sect_in) >> (BM_BLOCK_SHIFT - 9);
563                 device->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
564         } else {
565                 device->c_sync_rate = rcu_dereference(device->ldev->disk_conf)->resync_rate;
566                 number = SLEEP_TIME * device->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
567         }
568         rcu_read_unlock();
569
570         /* Don't have more than "max-buffers"/2 in-flight.
571          * Otherwise we may cause the remote site to stall on drbd_alloc_pages(),
572          * potentially causing a distributed deadlock on congestion during
573          * online-verify or (checksum-based) resync, if max-buffers,
574          * socket buffer sizes and resync rate settings are mis-configured. */
575
576         /* note that "number" is in units of "BM_BLOCK_SIZE" (which is 4k),
577          * mxb (as used here, and in drbd_alloc_pages on the peer) is
578          * "number of pages" (typically also 4k),
579          * but "rs_in_flight" is in "sectors" (512 Byte). */
580         if (mxb - device->rs_in_flight/8 < number)
581                 number = mxb - device->rs_in_flight/8;
582
583         return number;
584 }
585
586 static int make_resync_request(struct drbd_device *const device, int cancel)
587 {
588         struct drbd_peer_device *const peer_device = first_peer_device(device);
589         struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
590         unsigned long bit;
591         sector_t sector;
592         const sector_t capacity = drbd_get_capacity(device->this_bdev);
593         int max_bio_size;
594         int number, rollback_i, size;
595         int align, requeue = 0;
596         int i = 0;
597
598         if (unlikely(cancel))
599                 return 0;
600
601         if (device->rs_total == 0) {
602                 /* empty resync? */
603                 drbd_resync_finished(device);
604                 return 0;
605         }
606
607         if (!get_ldev(device)) {
608                 /* Since we only need to access device->rsync a
609                    get_ldev_if_state(device,D_FAILED) would be sufficient, but
610                    to continue resync with a broken disk makes no sense at
611                    all */
612                 drbd_err(device, "Disk broke down during resync!\n");
613                 return 0;
614         }
615
616         max_bio_size = queue_max_hw_sectors(device->rq_queue) << 9;
617         number = drbd_rs_number_requests(device);
618         if (number <= 0)
619                 goto requeue;
620
621         for (i = 0; i < number; i++) {
622                 /* Stop generating RS requests when half of the send buffer is filled,
623                  * but notify TCP that we'd like to have more space. */
624                 mutex_lock(&connection->data.mutex);
625                 if (connection->data.socket) {
626                         struct sock *sk = connection->data.socket->sk;
627                         int queued = sk->sk_wmem_queued;
628                         int sndbuf = sk->sk_sndbuf;
629                         if (queued > sndbuf / 2) {
630                                 requeue = 1;
631                                 if (sk->sk_socket)
632                                         set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
633                         }
634                 } else
635                         requeue = 1;
636                 mutex_unlock(&connection->data.mutex);
637                 if (requeue)
638                         goto requeue;
639
640 next_sector:
641                 size = BM_BLOCK_SIZE;
642                 bit  = drbd_bm_find_next(device, device->bm_resync_fo);
643
644                 if (bit == DRBD_END_OF_BITMAP) {
645                         device->bm_resync_fo = drbd_bm_bits(device);
646                         put_ldev(device);
647                         return 0;
648                 }
649
650                 sector = BM_BIT_TO_SECT(bit);
651
652                 if (drbd_try_rs_begin_io(device, sector)) {
653                         device->bm_resync_fo = bit;
654                         goto requeue;
655                 }
656                 device->bm_resync_fo = bit + 1;
657
658                 if (unlikely(drbd_bm_test_bit(device, bit) == 0)) {
659                         drbd_rs_complete_io(device, sector);
660                         goto next_sector;
661                 }
662
663 #if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
664                 /* try to find some adjacent bits.
665                  * we stop if we have already the maximum req size.
666                  *
667                  * Additionally always align bigger requests, in order to
668                  * be prepared for all stripe sizes of software RAIDs.
669                  */
670                 align = 1;
671                 rollback_i = i;
672                 while (i < number) {
673                         if (size + BM_BLOCK_SIZE > max_bio_size)
674                                 break;
675
676                         /* Be always aligned */
677                         if (sector & ((1<<(align+3))-1))
678                                 break;
679
680                         /* do not cross extent boundaries */
681                         if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
682                                 break;
683                         /* now, is it actually dirty, after all?
684                          * caution, drbd_bm_test_bit is tri-state for some
685                          * obscure reason; ( b == 0 ) would get the out-of-band
686                          * only accidentally right because of the "oddly sized"
687                          * adjustment below */
688                         if (drbd_bm_test_bit(device, bit+1) != 1)
689                                 break;
690                         bit++;
691                         size += BM_BLOCK_SIZE;
692                         if ((BM_BLOCK_SIZE << align) <= size)
693                                 align++;
694                         i++;
695                 }
696                 /* if we merged some,
697                  * reset the offset to start the next drbd_bm_find_next from */
698                 if (size > BM_BLOCK_SIZE)
699                         device->bm_resync_fo = bit + 1;
700 #endif
701
702                 /* adjust very last sectors, in case we are oddly sized */
703                 if (sector + (size>>9) > capacity)
704                         size = (capacity-sector)<<9;
705
706                 if (device->use_csums) {
707                         switch (read_for_csum(peer_device, sector, size)) {
708                         case -EIO: /* Disk failure */
709                                 put_ldev(device);
710                                 return -EIO;
711                         case -EAGAIN: /* allocation failed, or ldev busy */
712                                 drbd_rs_complete_io(device, sector);
713                                 device->bm_resync_fo = BM_SECT_TO_BIT(sector);
714                                 i = rollback_i;
715                                 goto requeue;
716                         case 0:
717                                 /* everything ok */
718                                 break;
719                         default:
720                                 BUG();
721                         }
722                 } else {
723                         int err;
724
725                         inc_rs_pending(device);
726                         err = drbd_send_drequest(peer_device, P_RS_DATA_REQUEST,
727                                                  sector, size, ID_SYNCER);
728                         if (err) {
729                                 drbd_err(device, "drbd_send_drequest() failed, aborting...\n");
730                                 dec_rs_pending(device);
731                                 put_ldev(device);
732                                 return err;
733                         }
734                 }
735         }
736
737         if (device->bm_resync_fo >= drbd_bm_bits(device)) {
738                 /* last syncer _request_ was sent,
739                  * but the P_RS_DATA_REPLY not yet received.  sync will end (and
740                  * next sync group will resume), as soon as we receive the last
741                  * resync data block, and the last bit is cleared.
742                  * until then resync "work" is "inactive" ...
743                  */
744                 put_ldev(device);
745                 return 0;
746         }
747
748  requeue:
749         device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
750         mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
751         put_ldev(device);
752         return 0;
753 }
754
755 static int make_ov_request(struct drbd_device *device, int cancel)
756 {
757         int number, i, size;
758         sector_t sector;
759         const sector_t capacity = drbd_get_capacity(device->this_bdev);
760         bool stop_sector_reached = false;
761
762         if (unlikely(cancel))
763                 return 1;
764
765         number = drbd_rs_number_requests(device);
766
767         sector = device->ov_position;
768         for (i = 0; i < number; i++) {
769                 if (sector >= capacity)
770                         return 1;
771
772                 /* We check for "finished" only in the reply path:
773                  * w_e_end_ov_reply().
774                  * We need to send at least one request out. */
775                 stop_sector_reached = i > 0
776                         && verify_can_do_stop_sector(device)
777                         && sector >= device->ov_stop_sector;
778                 if (stop_sector_reached)
779                         break;
780
781                 size = BM_BLOCK_SIZE;
782
783                 if (drbd_try_rs_begin_io(device, sector)) {
784                         device->ov_position = sector;
785                         goto requeue;
786                 }
787
788                 if (sector + (size>>9) > capacity)
789                         size = (capacity-sector)<<9;
790
791                 inc_rs_pending(device);
792                 if (drbd_send_ov_request(first_peer_device(device), sector, size)) {
793                         dec_rs_pending(device);
794                         return 0;
795                 }
796                 sector += BM_SECT_PER_BIT;
797         }
798         device->ov_position = sector;
799
800  requeue:
801         device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
802         if (i == 0 || !stop_sector_reached)
803                 mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
804         return 1;
805 }
806
807 int w_ov_finished(struct drbd_work *w, int cancel)
808 {
809         struct drbd_device_work *dw =
810                 container_of(w, struct drbd_device_work, w);
811         struct drbd_device *device = dw->device;
812         kfree(dw);
813         ov_out_of_sync_print(device);
814         drbd_resync_finished(device);
815
816         return 0;
817 }
818
819 static int w_resync_finished(struct drbd_work *w, int cancel)
820 {
821         struct drbd_device_work *dw =
822                 container_of(w, struct drbd_device_work, w);
823         struct drbd_device *device = dw->device;
824         kfree(dw);
825
826         drbd_resync_finished(device);
827
828         return 0;
829 }
830
831 static void ping_peer(struct drbd_device *device)
832 {
833         struct drbd_connection *connection = first_peer_device(device)->connection;
834
835         clear_bit(GOT_PING_ACK, &connection->flags);
836         request_ping(connection);
837         wait_event(connection->ping_wait,
838                    test_bit(GOT_PING_ACK, &connection->flags) || device->state.conn < C_CONNECTED);
839 }
840
841 int drbd_resync_finished(struct drbd_device *device)
842 {
843         unsigned long db, dt, dbdt;
844         unsigned long n_oos;
845         union drbd_state os, ns;
846         struct drbd_device_work *dw;
847         char *khelper_cmd = NULL;
848         int verify_done = 0;
849
850         /* Remove all elements from the resync LRU. Since future actions
851          * might set bits in the (main) bitmap, then the entries in the
852          * resync LRU would be wrong. */
853         if (drbd_rs_del_all(device)) {
854                 /* In case this is not possible now, most probably because
855                  * there are P_RS_DATA_REPLY Packets lingering on the worker's
856                  * queue (or even the read operations for those packets
857                  * is not finished by now).   Retry in 100ms. */
858
859                 schedule_timeout_interruptible(HZ / 10);
860                 dw = kmalloc(sizeof(struct drbd_device_work), GFP_ATOMIC);
861                 if (dw) {
862                         dw->w.cb = w_resync_finished;
863                         dw->device = device;
864                         drbd_queue_work(&first_peer_device(device)->connection->sender_work,
865                                         &dw->w);
866                         return 1;
867                 }
868                 drbd_err(device, "Warn failed to drbd_rs_del_all() and to kmalloc(dw).\n");
869         }
870
871         dt = (jiffies - device->rs_start - device->rs_paused) / HZ;
872         if (dt <= 0)
873                 dt = 1;
874
875         db = device->rs_total;
876         /* adjust for verify start and stop sectors, respective reached position */
877         if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
878                 db -= device->ov_left;
879
880         dbdt = Bit2KB(db/dt);
881         device->rs_paused /= HZ;
882
883         if (!get_ldev(device))
884                 goto out;
885
886         ping_peer(device);
887
888         spin_lock_irq(&device->resource->req_lock);
889         os = drbd_read_state(device);
890
891         verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
892
893         /* This protects us against multiple calls (that can happen in the presence
894            of application IO), and against connectivity loss just before we arrive here. */
895         if (os.conn <= C_CONNECTED)
896                 goto out_unlock;
897
898         ns = os;
899         ns.conn = C_CONNECTED;
900
901         drbd_info(device, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
902              verify_done ? "Online verify" : "Resync",
903              dt + device->rs_paused, device->rs_paused, dbdt);
904
905         n_oos = drbd_bm_total_weight(device);
906
907         if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
908                 if (n_oos) {
909                         drbd_alert(device, "Online verify found %lu %dk block out of sync!\n",
910                               n_oos, Bit2KB(1));
911                         khelper_cmd = "out-of-sync";
912                 }
913         } else {
914                 D_ASSERT(device, (n_oos - device->rs_failed) == 0);
915
916                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
917                         khelper_cmd = "after-resync-target";
918
919                 if (device->use_csums && device->rs_total) {
920                         const unsigned long s = device->rs_same_csum;
921                         const unsigned long t = device->rs_total;
922                         const int ratio =
923                                 (t == 0)     ? 0 :
924                         (t < 100000) ? ((s*100)/t) : (s/(t/100));
925                         drbd_info(device, "%u %% had equal checksums, eliminated: %luK; "
926                              "transferred %luK total %luK\n",
927                              ratio,
928                              Bit2KB(device->rs_same_csum),
929                              Bit2KB(device->rs_total - device->rs_same_csum),
930                              Bit2KB(device->rs_total));
931                 }
932         }
933
934         if (device->rs_failed) {
935                 drbd_info(device, "            %lu failed blocks\n", device->rs_failed);
936
937                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
938                         ns.disk = D_INCONSISTENT;
939                         ns.pdsk = D_UP_TO_DATE;
940                 } else {
941                         ns.disk = D_UP_TO_DATE;
942                         ns.pdsk = D_INCONSISTENT;
943                 }
944         } else {
945                 ns.disk = D_UP_TO_DATE;
946                 ns.pdsk = D_UP_TO_DATE;
947
948                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
949                         if (device->p_uuid) {
950                                 int i;
951                                 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
952                                         _drbd_uuid_set(device, i, device->p_uuid[i]);
953                                 drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_CURRENT]);
954                                 _drbd_uuid_set(device, UI_CURRENT, device->p_uuid[UI_CURRENT]);
955                         } else {
956                                 drbd_err(device, "device->p_uuid is NULL! BUG\n");
957                         }
958                 }
959
960                 if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
961                         /* for verify runs, we don't update uuids here,
962                          * so there would be nothing to report. */
963                         drbd_uuid_set_bm(device, 0UL);
964                         drbd_print_uuids(device, "updated UUIDs");
965                         if (device->p_uuid) {
966                                 /* Now the two UUID sets are equal, update what we
967                                  * know of the peer. */
968                                 int i;
969                                 for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
970                                         device->p_uuid[i] = device->ldev->md.uuid[i];
971                         }
972                 }
973         }
974
975         _drbd_set_state(device, ns, CS_VERBOSE, NULL);
976 out_unlock:
977         spin_unlock_irq(&device->resource->req_lock);
978         put_ldev(device);
979 out:
980         device->rs_total  = 0;
981         device->rs_failed = 0;
982         device->rs_paused = 0;
983
984         /* reset start sector, if we reached end of device */
985         if (verify_done && device->ov_left == 0)
986                 device->ov_start_sector = 0;
987
988         drbd_md_sync(device);
989
990         if (khelper_cmd)
991                 drbd_khelper(device, khelper_cmd);
992
993         return 1;
994 }
995
996 /* helper */
997 static void move_to_net_ee_or_free(struct drbd_device *device, struct drbd_peer_request *peer_req)
998 {
999         if (drbd_peer_req_has_active_page(peer_req)) {
1000                 /* This might happen if sendpage() has not finished */
1001                 int i = (peer_req->i.size + PAGE_SIZE -1) >> PAGE_SHIFT;
1002                 atomic_add(i, &device->pp_in_use_by_net);
1003                 atomic_sub(i, &device->pp_in_use);
1004                 spin_lock_irq(&device->resource->req_lock);
1005                 list_add_tail(&peer_req->w.list, &device->net_ee);
1006                 spin_unlock_irq(&device->resource->req_lock);
1007                 wake_up(&drbd_pp_wait);
1008         } else
1009                 drbd_free_peer_req(device, peer_req);
1010 }
1011
1012 /**
1013  * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
1014  * @device:     DRBD device.
1015  * @w:          work object.
1016  * @cancel:     The connection will be closed anyways
1017  */
1018 int w_e_end_data_req(struct drbd_work *w, int cancel)
1019 {
1020         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1021         struct drbd_peer_device *peer_device = peer_req->peer_device;
1022         struct drbd_device *device = peer_device->device;
1023         int err;
1024
1025         if (unlikely(cancel)) {
1026                 drbd_free_peer_req(device, peer_req);
1027                 dec_unacked(device);
1028                 return 0;
1029         }
1030
1031         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1032                 err = drbd_send_block(peer_device, P_DATA_REPLY, peer_req);
1033         } else {
1034                 if (__ratelimit(&drbd_ratelimit_state))
1035                         drbd_err(device, "Sending NegDReply. sector=%llus.\n",
1036                             (unsigned long long)peer_req->i.sector);
1037
1038                 err = drbd_send_ack(peer_device, P_NEG_DREPLY, peer_req);
1039         }
1040
1041         dec_unacked(device);
1042
1043         move_to_net_ee_or_free(device, peer_req);
1044
1045         if (unlikely(err))
1046                 drbd_err(device, "drbd_send_block() failed\n");
1047         return err;
1048 }
1049
1050 /**
1051  * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUEST
1052  * @w:          work object.
1053  * @cancel:     The connection will be closed anyways
1054  */
1055 int w_e_end_rsdata_req(struct drbd_work *w, int cancel)
1056 {
1057         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1058         struct drbd_peer_device *peer_device = peer_req->peer_device;
1059         struct drbd_device *device = peer_device->device;
1060         int err;
1061
1062         if (unlikely(cancel)) {
1063                 drbd_free_peer_req(device, peer_req);
1064                 dec_unacked(device);
1065                 return 0;
1066         }
1067
1068         if (get_ldev_if_state(device, D_FAILED)) {
1069                 drbd_rs_complete_io(device, peer_req->i.sector);
1070                 put_ldev(device);
1071         }
1072
1073         if (device->state.conn == C_AHEAD) {
1074                 err = drbd_send_ack(peer_device, P_RS_CANCEL, peer_req);
1075         } else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1076                 if (likely(device->state.pdsk >= D_INCONSISTENT)) {
1077                         inc_rs_pending(device);
1078                         err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1079                 } else {
1080                         if (__ratelimit(&drbd_ratelimit_state))
1081                                 drbd_err(device, "Not sending RSDataReply, "
1082                                     "partner DISKLESS!\n");
1083                         err = 0;
1084                 }
1085         } else {
1086                 if (__ratelimit(&drbd_ratelimit_state))
1087                         drbd_err(device, "Sending NegRSDReply. sector %llus.\n",
1088                             (unsigned long long)peer_req->i.sector);
1089
1090                 err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1091
1092                 /* update resync data with failure */
1093                 drbd_rs_failed_io(device, peer_req->i.sector, peer_req->i.size);
1094         }
1095
1096         dec_unacked(device);
1097
1098         move_to_net_ee_or_free(device, peer_req);
1099
1100         if (unlikely(err))
1101                 drbd_err(device, "drbd_send_block() failed\n");
1102         return err;
1103 }
1104
1105 int w_e_end_csum_rs_req(struct drbd_work *w, int cancel)
1106 {
1107         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1108         struct drbd_peer_device *peer_device = peer_req->peer_device;
1109         struct drbd_device *device = peer_device->device;
1110         struct digest_info *di;
1111         int digest_size;
1112         void *digest = NULL;
1113         int err, eq = 0;
1114
1115         if (unlikely(cancel)) {
1116                 drbd_free_peer_req(device, peer_req);
1117                 dec_unacked(device);
1118                 return 0;
1119         }
1120
1121         if (get_ldev(device)) {
1122                 drbd_rs_complete_io(device, peer_req->i.sector);
1123                 put_ldev(device);
1124         }
1125
1126         di = peer_req->digest;
1127
1128         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1129                 /* quick hack to try to avoid a race against reconfiguration.
1130                  * a real fix would be much more involved,
1131                  * introducing more locking mechanisms */
1132                 if (peer_device->connection->csums_tfm) {
1133                         digest_size = crypto_hash_digestsize(peer_device->connection->csums_tfm);
1134                         D_ASSERT(device, digest_size == di->digest_size);
1135                         digest = kmalloc(digest_size, GFP_NOIO);
1136                 }
1137                 if (digest) {
1138                         drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
1139                         eq = !memcmp(digest, di->digest, digest_size);
1140                         kfree(digest);
1141                 }
1142
1143                 if (eq) {
1144                         drbd_set_in_sync(device, peer_req->i.sector, peer_req->i.size);
1145                         /* rs_same_csums unit is BM_BLOCK_SIZE */
1146                         device->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT;
1147                         err = drbd_send_ack(peer_device, P_RS_IS_IN_SYNC, peer_req);
1148                 } else {
1149                         inc_rs_pending(device);
1150                         peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1151                         peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */
1152                         kfree(di);
1153                         err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1154                 }
1155         } else {
1156                 err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1157                 if (__ratelimit(&drbd_ratelimit_state))
1158                         drbd_err(device, "Sending NegDReply. I guess it gets messy.\n");
1159         }
1160
1161         dec_unacked(device);
1162         move_to_net_ee_or_free(device, peer_req);
1163
1164         if (unlikely(err))
1165                 drbd_err(device, "drbd_send_block/ack() failed\n");
1166         return err;
1167 }
1168
1169 int w_e_end_ov_req(struct drbd_work *w, int cancel)
1170 {
1171         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1172         struct drbd_peer_device *peer_device = peer_req->peer_device;
1173         struct drbd_device *device = peer_device->device;
1174         sector_t sector = peer_req->i.sector;
1175         unsigned int size = peer_req->i.size;
1176         int digest_size;
1177         void *digest;
1178         int err = 0;
1179
1180         if (unlikely(cancel))
1181                 goto out;
1182
1183         digest_size = crypto_hash_digestsize(peer_device->connection->verify_tfm);
1184         digest = kmalloc(digest_size, GFP_NOIO);
1185         if (!digest) {
1186                 err = 1;        /* terminate the connection in case the allocation failed */
1187                 goto out;
1188         }
1189
1190         if (likely(!(peer_req->flags & EE_WAS_ERROR)))
1191                 drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1192         else
1193                 memset(digest, 0, digest_size);
1194
1195         /* Free e and pages before send.
1196          * In case we block on congestion, we could otherwise run into
1197          * some distributed deadlock, if the other side blocks on
1198          * congestion as well, because our receiver blocks in
1199          * drbd_alloc_pages due to pp_in_use > max_buffers. */
1200         drbd_free_peer_req(device, peer_req);
1201         peer_req = NULL;
1202         inc_rs_pending(device);
1203         err = drbd_send_drequest_csum(peer_device, sector, size, digest, digest_size, P_OV_REPLY);
1204         if (err)
1205                 dec_rs_pending(device);
1206         kfree(digest);
1207
1208 out:
1209         if (peer_req)
1210                 drbd_free_peer_req(device, peer_req);
1211         dec_unacked(device);
1212         return err;
1213 }
1214
1215 void drbd_ov_out_of_sync_found(struct drbd_device *device, sector_t sector, int size)
1216 {
1217         if (device->ov_last_oos_start + device->ov_last_oos_size == sector) {
1218                 device->ov_last_oos_size += size>>9;
1219         } else {
1220                 device->ov_last_oos_start = sector;
1221                 device->ov_last_oos_size = size>>9;
1222         }
1223         drbd_set_out_of_sync(device, sector, size);
1224 }
1225
1226 int w_e_end_ov_reply(struct drbd_work *w, int cancel)
1227 {
1228         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1229         struct drbd_peer_device *peer_device = peer_req->peer_device;
1230         struct drbd_device *device = peer_device->device;
1231         struct digest_info *di;
1232         void *digest;
1233         sector_t sector = peer_req->i.sector;
1234         unsigned int size = peer_req->i.size;
1235         int digest_size;
1236         int err, eq = 0;
1237         bool stop_sector_reached = false;
1238
1239         if (unlikely(cancel)) {
1240                 drbd_free_peer_req(device, peer_req);
1241                 dec_unacked(device);
1242                 return 0;
1243         }
1244
1245         /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1246          * the resync lru has been cleaned up already */
1247         if (get_ldev(device)) {
1248                 drbd_rs_complete_io(device, peer_req->i.sector);
1249                 put_ldev(device);
1250         }
1251
1252         di = peer_req->digest;
1253
1254         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1255                 digest_size = crypto_hash_digestsize(peer_device->connection->verify_tfm);
1256                 digest = kmalloc(digest_size, GFP_NOIO);
1257                 if (digest) {
1258                         drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1259
1260                         D_ASSERT(device, digest_size == di->digest_size);
1261                         eq = !memcmp(digest, di->digest, digest_size);
1262                         kfree(digest);
1263                 }
1264         }
1265
1266         /* Free peer_req and pages before send.
1267          * In case we block on congestion, we could otherwise run into
1268          * some distributed deadlock, if the other side blocks on
1269          * congestion as well, because our receiver blocks in
1270          * drbd_alloc_pages due to pp_in_use > max_buffers. */
1271         drbd_free_peer_req(device, peer_req);
1272         if (!eq)
1273                 drbd_ov_out_of_sync_found(device, sector, size);
1274         else
1275                 ov_out_of_sync_print(device);
1276
1277         err = drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size,
1278                                eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1279
1280         dec_unacked(device);
1281
1282         --device->ov_left;
1283
1284         /* let's advance progress step marks only for every other megabyte */
1285         if ((device->ov_left & 0x200) == 0x200)
1286                 drbd_advance_rs_marks(device, device->ov_left);
1287
1288         stop_sector_reached = verify_can_do_stop_sector(device) &&
1289                 (sector + (size>>9)) >= device->ov_stop_sector;
1290
1291         if (device->ov_left == 0 || stop_sector_reached) {
1292                 ov_out_of_sync_print(device);
1293                 drbd_resync_finished(device);
1294         }
1295
1296         return err;
1297 }
1298
1299 /* FIXME
1300  * We need to track the number of pending barrier acks,
1301  * and to be able to wait for them.
1302  * See also comment in drbd_adm_attach before drbd_suspend_io.
1303  */
1304 static int drbd_send_barrier(struct drbd_connection *connection)
1305 {
1306         struct p_barrier *p;
1307         struct drbd_socket *sock;
1308
1309         sock = &connection->data;
1310         p = conn_prepare_command(connection, sock);
1311         if (!p)
1312                 return -EIO;
1313         p->barrier = connection->send.current_epoch_nr;
1314         p->pad = 0;
1315         connection->send.current_epoch_writes = 0;
1316
1317         return conn_send_command(connection, sock, P_BARRIER, sizeof(*p), NULL, 0);
1318 }
1319
1320 int w_send_write_hint(struct drbd_work *w, int cancel)
1321 {
1322         struct drbd_device *device =
1323                 container_of(w, struct drbd_device, unplug_work);
1324         struct drbd_socket *sock;
1325
1326         if (cancel)
1327                 return 0;
1328         sock = &first_peer_device(device)->connection->data;
1329         if (!drbd_prepare_command(first_peer_device(device), sock))
1330                 return -EIO;
1331         return drbd_send_command(first_peer_device(device), sock, P_UNPLUG_REMOTE, 0, NULL, 0);
1332 }
1333
1334 static void re_init_if_first_write(struct drbd_connection *connection, unsigned int epoch)
1335 {
1336         if (!connection->send.seen_any_write_yet) {
1337                 connection->send.seen_any_write_yet = true;
1338                 connection->send.current_epoch_nr = epoch;
1339                 connection->send.current_epoch_writes = 0;
1340         }
1341 }
1342
1343 static void maybe_send_barrier(struct drbd_connection *connection, unsigned int epoch)
1344 {
1345         /* re-init if first write on this connection */
1346         if (!connection->send.seen_any_write_yet)
1347                 return;
1348         if (connection->send.current_epoch_nr != epoch) {
1349                 if (connection->send.current_epoch_writes)
1350                         drbd_send_barrier(connection);
1351                 connection->send.current_epoch_nr = epoch;
1352         }
1353 }
1354
1355 int w_send_out_of_sync(struct drbd_work *w, int cancel)
1356 {
1357         struct drbd_request *req = container_of(w, struct drbd_request, w);
1358         struct drbd_device *device = req->device;
1359         struct drbd_peer_device *const peer_device = first_peer_device(device);
1360         struct drbd_connection *const connection = peer_device->connection;
1361         int err;
1362
1363         if (unlikely(cancel)) {
1364                 req_mod(req, SEND_CANCELED);
1365                 return 0;
1366         }
1367         req->pre_send_jif = jiffies;
1368
1369         /* this time, no connection->send.current_epoch_writes++;
1370          * If it was sent, it was the closing barrier for the last
1371          * replicated epoch, before we went into AHEAD mode.
1372          * No more barriers will be sent, until we leave AHEAD mode again. */
1373         maybe_send_barrier(connection, req->epoch);
1374
1375         err = drbd_send_out_of_sync(peer_device, req);
1376         req_mod(req, OOS_HANDED_TO_NETWORK);
1377
1378         return err;
1379 }
1380
1381 /**
1382  * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1383  * @w:          work object.
1384  * @cancel:     The connection will be closed anyways
1385  */
1386 int w_send_dblock(struct drbd_work *w, int cancel)
1387 {
1388         struct drbd_request *req = container_of(w, struct drbd_request, w);
1389         struct drbd_device *device = req->device;
1390         struct drbd_peer_device *const peer_device = first_peer_device(device);
1391         struct drbd_connection *connection = peer_device->connection;
1392         int err;
1393
1394         if (unlikely(cancel)) {
1395                 req_mod(req, SEND_CANCELED);
1396                 return 0;
1397         }
1398         req->pre_send_jif = jiffies;
1399
1400         re_init_if_first_write(connection, req->epoch);
1401         maybe_send_barrier(connection, req->epoch);
1402         connection->send.current_epoch_writes++;
1403
1404         err = drbd_send_dblock(peer_device, req);
1405         req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1406
1407         return err;
1408 }
1409
1410 /**
1411  * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1412  * @w:          work object.
1413  * @cancel:     The connection will be closed anyways
1414  */
1415 int w_send_read_req(struct drbd_work *w, int cancel)
1416 {
1417         struct drbd_request *req = container_of(w, struct drbd_request, w);
1418         struct drbd_device *device = req->device;
1419         struct drbd_peer_device *const peer_device = first_peer_device(device);
1420         struct drbd_connection *connection = peer_device->connection;
1421         int err;
1422
1423         if (unlikely(cancel)) {
1424                 req_mod(req, SEND_CANCELED);
1425                 return 0;
1426         }
1427         req->pre_send_jif = jiffies;
1428
1429         /* Even read requests may close a write epoch,
1430          * if there was any yet. */
1431         maybe_send_barrier(connection, req->epoch);
1432
1433         err = drbd_send_drequest(peer_device, P_DATA_REQUEST, req->i.sector, req->i.size,
1434                                  (unsigned long)req);
1435
1436         req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1437
1438         return err;
1439 }
1440
1441 int w_restart_disk_io(struct drbd_work *w, int cancel)
1442 {
1443         struct drbd_request *req = container_of(w, struct drbd_request, w);
1444         struct drbd_device *device = req->device;
1445
1446         if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1447                 drbd_al_begin_io(device, &req->i);
1448
1449         drbd_req_make_private_bio(req, req->master_bio);
1450         req->private_bio->bi_bdev = device->ldev->backing_bdev;
1451         generic_make_request(req->private_bio);
1452
1453         return 0;
1454 }
1455
1456 static int _drbd_may_sync_now(struct drbd_device *device)
1457 {
1458         struct drbd_device *odev = device;
1459         int resync_after;
1460
1461         while (1) {
1462                 if (!odev->ldev || odev->state.disk == D_DISKLESS)
1463                         return 1;
1464                 rcu_read_lock();
1465                 resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1466                 rcu_read_unlock();
1467                 if (resync_after == -1)
1468                         return 1;
1469                 odev = minor_to_device(resync_after);
1470                 if (!odev)
1471                         return 1;
1472                 if ((odev->state.conn >= C_SYNC_SOURCE &&
1473                      odev->state.conn <= C_PAUSED_SYNC_T) ||
1474                     odev->state.aftr_isp || odev->state.peer_isp ||
1475                     odev->state.user_isp)
1476                         return 0;
1477         }
1478 }
1479
1480 /**
1481  * _drbd_pause_after() - Pause resync on all devices that may not resync now
1482  * @device:     DRBD device.
1483  *
1484  * Called from process context only (admin command and after_state_ch).
1485  */
1486 static int _drbd_pause_after(struct drbd_device *device)
1487 {
1488         struct drbd_device *odev;
1489         int i, rv = 0;
1490
1491         rcu_read_lock();
1492         idr_for_each_entry(&drbd_devices, odev, i) {
1493                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1494                         continue;
1495                 if (!_drbd_may_sync_now(odev))
1496                         rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1497                                != SS_NOTHING_TO_DO);
1498         }
1499         rcu_read_unlock();
1500
1501         return rv;
1502 }
1503
1504 /**
1505  * _drbd_resume_next() - Resume resync on all devices that may resync now
1506  * @device:     DRBD device.
1507  *
1508  * Called from process context only (admin command and worker).
1509  */
1510 static int _drbd_resume_next(struct drbd_device *device)
1511 {
1512         struct drbd_device *odev;
1513         int i, rv = 0;
1514
1515         rcu_read_lock();
1516         idr_for_each_entry(&drbd_devices, odev, i) {
1517                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1518                         continue;
1519                 if (odev->state.aftr_isp) {
1520                         if (_drbd_may_sync_now(odev))
1521                                 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1522                                                         CS_HARD, NULL)
1523                                        != SS_NOTHING_TO_DO) ;
1524                 }
1525         }
1526         rcu_read_unlock();
1527         return rv;
1528 }
1529
1530 void resume_next_sg(struct drbd_device *device)
1531 {
1532         write_lock_irq(&global_state_lock);
1533         _drbd_resume_next(device);
1534         write_unlock_irq(&global_state_lock);
1535 }
1536
1537 void suspend_other_sg(struct drbd_device *device)
1538 {
1539         write_lock_irq(&global_state_lock);
1540         _drbd_pause_after(device);
1541         write_unlock_irq(&global_state_lock);
1542 }
1543
1544 /* caller must hold global_state_lock */
1545 enum drbd_ret_code drbd_resync_after_valid(struct drbd_device *device, int o_minor)
1546 {
1547         struct drbd_device *odev;
1548         int resync_after;
1549
1550         if (o_minor == -1)
1551                 return NO_ERROR;
1552         if (o_minor < -1 || o_minor > MINORMASK)
1553                 return ERR_RESYNC_AFTER;
1554
1555         /* check for loops */
1556         odev = minor_to_device(o_minor);
1557         while (1) {
1558                 if (odev == device)
1559                         return ERR_RESYNC_AFTER_CYCLE;
1560
1561                 /* You are free to depend on diskless, non-existing,
1562                  * or not yet/no longer existing minors.
1563                  * We only reject dependency loops.
1564                  * We cannot follow the dependency chain beyond a detached or
1565                  * missing minor.
1566                  */
1567                 if (!odev || !odev->ldev || odev->state.disk == D_DISKLESS)
1568                         return NO_ERROR;
1569
1570                 rcu_read_lock();
1571                 resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1572                 rcu_read_unlock();
1573                 /* dependency chain ends here, no cycles. */
1574                 if (resync_after == -1)
1575                         return NO_ERROR;
1576
1577                 /* follow the dependency chain */
1578                 odev = minor_to_device(resync_after);
1579         }
1580 }
1581
1582 /* caller must hold global_state_lock */
1583 void drbd_resync_after_changed(struct drbd_device *device)
1584 {
1585         int changes;
1586
1587         do {
1588                 changes  = _drbd_pause_after(device);
1589                 changes |= _drbd_resume_next(device);
1590         } while (changes);
1591 }
1592
1593 void drbd_rs_controller_reset(struct drbd_device *device)
1594 {
1595         struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk;
1596         struct fifo_buffer *plan;
1597
1598         atomic_set(&device->rs_sect_in, 0);
1599         atomic_set(&device->rs_sect_ev, 0);
1600         device->rs_in_flight = 0;
1601         device->rs_last_events =
1602                 (int)part_stat_read(&disk->part0, sectors[0]) +
1603                 (int)part_stat_read(&disk->part0, sectors[1]);
1604
1605         /* Updating the RCU protected object in place is necessary since
1606            this function gets called from atomic context.
1607            It is valid since all other updates also lead to an completely
1608            empty fifo */
1609         rcu_read_lock();
1610         plan = rcu_dereference(device->rs_plan_s);
1611         plan->total = 0;
1612         fifo_set(plan, 0);
1613         rcu_read_unlock();
1614 }
1615
1616 void start_resync_timer_fn(unsigned long data)
1617 {
1618         struct drbd_device *device = (struct drbd_device *) data;
1619         drbd_device_post_work(device, RS_START);
1620 }
1621
1622 static void do_start_resync(struct drbd_device *device)
1623 {
1624         if (atomic_read(&device->unacked_cnt) || atomic_read(&device->rs_pending_cnt)) {
1625                 drbd_warn(device, "postponing start_resync ...\n");
1626                 device->start_resync_timer.expires = jiffies + HZ/10;
1627                 add_timer(&device->start_resync_timer);
1628                 return;
1629         }
1630
1631         drbd_start_resync(device, C_SYNC_SOURCE);
1632         clear_bit(AHEAD_TO_SYNC_SOURCE, &device->flags);
1633 }
1634
1635 static bool use_checksum_based_resync(struct drbd_connection *connection, struct drbd_device *device)
1636 {
1637         bool csums_after_crash_only;
1638         rcu_read_lock();
1639         csums_after_crash_only = rcu_dereference(connection->net_conf)->csums_after_crash_only;
1640         rcu_read_unlock();
1641         return connection->agreed_pro_version >= 89 &&          /* supported? */
1642                 connection->csums_tfm &&                        /* configured? */
1643                 (csums_after_crash_only == 0                    /* use for each resync? */
1644                  || test_bit(CRASHED_PRIMARY, &device->flags)); /* or only after Primary crash? */
1645 }
1646
1647 /**
1648  * drbd_start_resync() - Start the resync process
1649  * @device:     DRBD device.
1650  * @side:       Either C_SYNC_SOURCE or C_SYNC_TARGET
1651  *
1652  * This function might bring you directly into one of the
1653  * C_PAUSED_SYNC_* states.
1654  */
1655 void drbd_start_resync(struct drbd_device *device, enum drbd_conns side)
1656 {
1657         struct drbd_peer_device *peer_device = first_peer_device(device);
1658         struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
1659         union drbd_state ns;
1660         int r;
1661
1662         if (device->state.conn >= C_SYNC_SOURCE && device->state.conn < C_AHEAD) {
1663                 drbd_err(device, "Resync already running!\n");
1664                 return;
1665         }
1666
1667         if (!test_bit(B_RS_H_DONE, &device->flags)) {
1668                 if (side == C_SYNC_TARGET) {
1669                         /* Since application IO was locked out during C_WF_BITMAP_T and
1670                            C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1671                            we check that we might make the data inconsistent. */
1672                         r = drbd_khelper(device, "before-resync-target");
1673                         r = (r >> 8) & 0xff;
1674                         if (r > 0) {
1675                                 drbd_info(device, "before-resync-target handler returned %d, "
1676                                          "dropping connection.\n", r);
1677                                 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
1678                                 return;
1679                         }
1680                 } else /* C_SYNC_SOURCE */ {
1681                         r = drbd_khelper(device, "before-resync-source");
1682                         r = (r >> 8) & 0xff;
1683                         if (r > 0) {
1684                                 if (r == 3) {
1685                                         drbd_info(device, "before-resync-source handler returned %d, "
1686                                                  "ignoring. Old userland tools?", r);
1687                                 } else {
1688                                         drbd_info(device, "before-resync-source handler returned %d, "
1689                                                  "dropping connection.\n", r);
1690                                         conn_request_state(connection,
1691                                                            NS(conn, C_DISCONNECTING), CS_HARD);
1692                                         return;
1693                                 }
1694                         }
1695                 }
1696         }
1697
1698         if (current == connection->worker.task) {
1699                 /* The worker should not sleep waiting for state_mutex,
1700                    that can take long */
1701                 if (!mutex_trylock(device->state_mutex)) {
1702                         set_bit(B_RS_H_DONE, &device->flags);
1703                         device->start_resync_timer.expires = jiffies + HZ/5;
1704                         add_timer(&device->start_resync_timer);
1705                         return;
1706                 }
1707         } else {
1708                 mutex_lock(device->state_mutex);
1709         }
1710         clear_bit(B_RS_H_DONE, &device->flags);
1711
1712         /* req_lock: serialize with drbd_send_and_submit() and others
1713          * global_state_lock: for stable sync-after dependencies */
1714         spin_lock_irq(&device->resource->req_lock);
1715         write_lock(&global_state_lock);
1716         /* Did some connection breakage or IO error race with us? */
1717         if (device->state.conn < C_CONNECTED
1718         || !get_ldev_if_state(device, D_NEGOTIATING)) {
1719                 write_unlock(&global_state_lock);
1720                 spin_unlock_irq(&device->resource->req_lock);
1721                 mutex_unlock(device->state_mutex);
1722                 return;
1723         }
1724
1725         ns = drbd_read_state(device);
1726
1727         ns.aftr_isp = !_drbd_may_sync_now(device);
1728
1729         ns.conn = side;
1730
1731         if (side == C_SYNC_TARGET)
1732                 ns.disk = D_INCONSISTENT;
1733         else /* side == C_SYNC_SOURCE */
1734                 ns.pdsk = D_INCONSISTENT;
1735
1736         r = __drbd_set_state(device, ns, CS_VERBOSE, NULL);
1737         ns = drbd_read_state(device);
1738
1739         if (ns.conn < C_CONNECTED)
1740                 r = SS_UNKNOWN_ERROR;
1741
1742         if (r == SS_SUCCESS) {
1743                 unsigned long tw = drbd_bm_total_weight(device);
1744                 unsigned long now = jiffies;
1745                 int i;
1746
1747                 device->rs_failed    = 0;
1748                 device->rs_paused    = 0;
1749                 device->rs_same_csum = 0;
1750                 device->rs_last_sect_ev = 0;
1751                 device->rs_total     = tw;
1752                 device->rs_start     = now;
1753                 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1754                         device->rs_mark_left[i] = tw;
1755                         device->rs_mark_time[i] = now;
1756                 }
1757                 _drbd_pause_after(device);
1758                 /* Forget potentially stale cached per resync extent bit-counts.
1759                  * Open coded drbd_rs_cancel_all(device), we already have IRQs
1760                  * disabled, and know the disk state is ok. */
1761                 spin_lock(&device->al_lock);
1762                 lc_reset(device->resync);
1763                 device->resync_locked = 0;
1764                 device->resync_wenr = LC_FREE;
1765                 spin_unlock(&device->al_lock);
1766         }
1767         write_unlock(&global_state_lock);
1768         spin_unlock_irq(&device->resource->req_lock);
1769
1770         if (r == SS_SUCCESS) {
1771                 wake_up(&device->al_wait); /* for lc_reset() above */
1772                 /* reset rs_last_bcast when a resync or verify is started,
1773                  * to deal with potential jiffies wrap. */
1774                 device->rs_last_bcast = jiffies - HZ;
1775
1776                 drbd_info(device, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1777                      drbd_conn_str(ns.conn),
1778                      (unsigned long) device->rs_total << (BM_BLOCK_SHIFT-10),
1779                      (unsigned long) device->rs_total);
1780                 if (side == C_SYNC_TARGET) {
1781                         device->bm_resync_fo = 0;
1782                         device->use_csums = use_checksum_based_resync(connection, device);
1783                 } else {
1784                         device->use_csums = 0;
1785                 }
1786
1787                 /* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1788                  * with w_send_oos, or the sync target will get confused as to
1789                  * how much bits to resync.  We cannot do that always, because for an
1790                  * empty resync and protocol < 95, we need to do it here, as we call
1791                  * drbd_resync_finished from here in that case.
1792                  * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1793                  * and from after_state_ch otherwise. */
1794                 if (side == C_SYNC_SOURCE && connection->agreed_pro_version < 96)
1795                         drbd_gen_and_send_sync_uuid(peer_device);
1796
1797                 if (connection->agreed_pro_version < 95 && device->rs_total == 0) {
1798                         /* This still has a race (about when exactly the peers
1799                          * detect connection loss) that can lead to a full sync
1800                          * on next handshake. In 8.3.9 we fixed this with explicit
1801                          * resync-finished notifications, but the fix
1802                          * introduces a protocol change.  Sleeping for some
1803                          * time longer than the ping interval + timeout on the
1804                          * SyncSource, to give the SyncTarget the chance to
1805                          * detect connection loss, then waiting for a ping
1806                          * response (implicit in drbd_resync_finished) reduces
1807                          * the race considerably, but does not solve it. */
1808                         if (side == C_SYNC_SOURCE) {
1809                                 struct net_conf *nc;
1810                                 int timeo;
1811
1812                                 rcu_read_lock();
1813                                 nc = rcu_dereference(connection->net_conf);
1814                                 timeo = nc->ping_int * HZ + nc->ping_timeo * HZ / 9;
1815                                 rcu_read_unlock();
1816                                 schedule_timeout_interruptible(timeo);
1817                         }
1818                         drbd_resync_finished(device);
1819                 }
1820
1821                 drbd_rs_controller_reset(device);
1822                 /* ns.conn may already be != device->state.conn,
1823                  * we may have been paused in between, or become paused until
1824                  * the timer triggers.
1825                  * No matter, that is handled in resync_timer_fn() */
1826                 if (ns.conn == C_SYNC_TARGET)
1827                         mod_timer(&device->resync_timer, jiffies);
1828
1829                 drbd_md_sync(device);
1830         }
1831         put_ldev(device);
1832         mutex_unlock(device->state_mutex);
1833 }
1834
1835 static void update_on_disk_bitmap(struct drbd_device *device, bool resync_done)
1836 {
1837         struct sib_info sib = { .sib_reason = SIB_SYNC_PROGRESS, };
1838         device->rs_last_bcast = jiffies;
1839
1840         if (!get_ldev(device))
1841                 return;
1842
1843         drbd_bm_write_lazy(device, 0);
1844         if (resync_done && is_sync_state(device->state.conn))
1845                 drbd_resync_finished(device);
1846
1847         drbd_bcast_event(device, &sib);
1848         /* update timestamp, in case it took a while to write out stuff */
1849         device->rs_last_bcast = jiffies;
1850         put_ldev(device);
1851 }
1852
1853 static void drbd_ldev_destroy(struct drbd_device *device)
1854 {
1855         lc_destroy(device->resync);
1856         device->resync = NULL;
1857         lc_destroy(device->act_log);
1858         device->act_log = NULL;
1859
1860         __acquire(local);
1861         drbd_free_ldev(device->ldev);
1862         device->ldev = NULL;
1863         __release(local);
1864
1865         clear_bit(GOING_DISKLESS, &device->flags);
1866         wake_up(&device->misc_wait);
1867 }
1868
1869 static void go_diskless(struct drbd_device *device)
1870 {
1871         D_ASSERT(device, device->state.disk == D_FAILED);
1872         /* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
1873          * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
1874          * the protected members anymore, though, so once put_ldev reaches zero
1875          * again, it will be safe to free them. */
1876
1877         /* Try to write changed bitmap pages, read errors may have just
1878          * set some bits outside the area covered by the activity log.
1879          *
1880          * If we have an IO error during the bitmap writeout,
1881          * we will want a full sync next time, just in case.
1882          * (Do we want a specific meta data flag for this?)
1883          *
1884          * If that does not make it to stable storage either,
1885          * we cannot do anything about that anymore.
1886          *
1887          * We still need to check if both bitmap and ldev are present, we may
1888          * end up here after a failed attach, before ldev was even assigned.
1889          */
1890         if (device->bitmap && device->ldev) {
1891                 /* An interrupted resync or similar is allowed to recounts bits
1892                  * while we detach.
1893                  * Any modifications would not be expected anymore, though.
1894                  */
1895                 if (drbd_bitmap_io_from_worker(device, drbd_bm_write,
1896                                         "detach", BM_LOCKED_TEST_ALLOWED)) {
1897                         if (test_bit(WAS_READ_ERROR, &device->flags)) {
1898                                 drbd_md_set_flag(device, MDF_FULL_SYNC);
1899                                 drbd_md_sync(device);
1900                         }
1901                 }
1902         }
1903
1904         drbd_force_state(device, NS(disk, D_DISKLESS));
1905 }
1906
1907 static int do_md_sync(struct drbd_device *device)
1908 {
1909         drbd_warn(device, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
1910         drbd_md_sync(device);
1911         return 0;
1912 }
1913
1914 /* only called from drbd_worker thread, no locking */
1915 void __update_timing_details(
1916                 struct drbd_thread_timing_details *tdp,
1917                 unsigned int *cb_nr,
1918                 void *cb,
1919                 const char *fn, const unsigned int line)
1920 {
1921         unsigned int i = *cb_nr % DRBD_THREAD_DETAILS_HIST;
1922         struct drbd_thread_timing_details *td = tdp + i;
1923
1924         td->start_jif = jiffies;
1925         td->cb_addr = cb;
1926         td->caller_fn = fn;
1927         td->line = line;
1928         td->cb_nr = *cb_nr;
1929
1930         i = (i+1) % DRBD_THREAD_DETAILS_HIST;
1931         td = tdp + i;
1932         memset(td, 0, sizeof(*td));
1933
1934         ++(*cb_nr);
1935 }
1936
1937 static void do_device_work(struct drbd_device *device, const unsigned long todo)
1938 {
1939         if (test_bit(MD_SYNC, &todo))
1940                 do_md_sync(device);
1941         if (test_bit(RS_DONE, &todo) ||
1942             test_bit(RS_PROGRESS, &todo))
1943                 update_on_disk_bitmap(device, test_bit(RS_DONE, &todo));
1944         if (test_bit(GO_DISKLESS, &todo))
1945                 go_diskless(device);
1946         if (test_bit(DESTROY_DISK, &todo))
1947                 drbd_ldev_destroy(device);
1948         if (test_bit(RS_START, &todo))
1949                 do_start_resync(device);
1950 }
1951
1952 #define DRBD_DEVICE_WORK_MASK   \
1953         ((1UL << GO_DISKLESS)   \
1954         |(1UL << DESTROY_DISK)  \
1955         |(1UL << MD_SYNC)       \
1956         |(1UL << RS_START)      \
1957         |(1UL << RS_PROGRESS)   \
1958         |(1UL << RS_DONE)       \
1959         )
1960
1961 static unsigned long get_work_bits(unsigned long *flags)
1962 {
1963         unsigned long old, new;
1964         do {
1965                 old = *flags;
1966                 new = old & ~DRBD_DEVICE_WORK_MASK;
1967         } while (cmpxchg(flags, old, new) != old);
1968         return old & DRBD_DEVICE_WORK_MASK;
1969 }
1970
1971 static void do_unqueued_work(struct drbd_connection *connection)
1972 {
1973         struct drbd_peer_device *peer_device;
1974         int vnr;
1975
1976         rcu_read_lock();
1977         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1978                 struct drbd_device *device = peer_device->device;
1979                 unsigned long todo = get_work_bits(&device->flags);
1980                 if (!todo)
1981                         continue;
1982
1983                 kref_get(&device->kref);
1984                 rcu_read_unlock();
1985                 do_device_work(device, todo);
1986                 kref_put(&device->kref, drbd_destroy_device);
1987                 rcu_read_lock();
1988         }
1989         rcu_read_unlock();
1990 }
1991
1992 static bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list)
1993 {
1994         spin_lock_irq(&queue->q_lock);
1995         list_splice_tail_init(&queue->q, work_list);
1996         spin_unlock_irq(&queue->q_lock);
1997         return !list_empty(work_list);
1998 }
1999
2000 static void wait_for_work(struct drbd_connection *connection, struct list_head *work_list)
2001 {
2002         DEFINE_WAIT(wait);
2003         struct net_conf *nc;
2004         int uncork, cork;
2005
2006         dequeue_work_batch(&connection->sender_work, work_list);
2007         if (!list_empty(work_list))
2008                 return;
2009
2010         /* Still nothing to do?
2011          * Maybe we still need to close the current epoch,
2012          * even if no new requests are queued yet.
2013          *
2014          * Also, poke TCP, just in case.
2015          * Then wait for new work (or signal). */
2016         rcu_read_lock();
2017         nc = rcu_dereference(connection->net_conf);
2018         uncork = nc ? nc->tcp_cork : 0;
2019         rcu_read_unlock();
2020         if (uncork) {
2021                 mutex_lock(&connection->data.mutex);
2022                 if (connection->data.socket)
2023                         drbd_tcp_uncork(connection->data.socket);
2024                 mutex_unlock(&connection->data.mutex);
2025         }
2026
2027         for (;;) {
2028                 int send_barrier;
2029                 prepare_to_wait(&connection->sender_work.q_wait, &wait, TASK_INTERRUPTIBLE);
2030                 spin_lock_irq(&connection->resource->req_lock);
2031                 spin_lock(&connection->sender_work.q_lock);     /* FIXME get rid of this one? */
2032                 if (!list_empty(&connection->sender_work.q))
2033                         list_splice_tail_init(&connection->sender_work.q, work_list);
2034                 spin_unlock(&connection->sender_work.q_lock);   /* FIXME get rid of this one? */
2035                 if (!list_empty(work_list) || signal_pending(current)) {
2036                         spin_unlock_irq(&connection->resource->req_lock);
2037                         break;
2038                 }
2039
2040                 /* We found nothing new to do, no to-be-communicated request,
2041                  * no other work item.  We may still need to close the last
2042                  * epoch.  Next incoming request epoch will be connection ->
2043                  * current transfer log epoch number.  If that is different
2044                  * from the epoch of the last request we communicated, it is
2045                  * safe to send the epoch separating barrier now.
2046                  */
2047                 send_barrier =
2048                         atomic_read(&connection->current_tle_nr) !=
2049                         connection->send.current_epoch_nr;
2050                 spin_unlock_irq(&connection->resource->req_lock);
2051
2052                 if (send_barrier)
2053                         maybe_send_barrier(connection,
2054                                         connection->send.current_epoch_nr + 1);
2055
2056                 if (test_bit(DEVICE_WORK_PENDING, &connection->flags))
2057                         break;
2058
2059                 /* drbd_send() may have called flush_signals() */
2060                 if (get_t_state(&connection->worker) != RUNNING)
2061                         break;
2062
2063                 schedule();
2064                 /* may be woken up for other things but new work, too,
2065                  * e.g. if the current epoch got closed.
2066                  * In which case we send the barrier above. */
2067         }
2068         finish_wait(&connection->sender_work.q_wait, &wait);
2069
2070         /* someone may have changed the config while we have been waiting above. */
2071         rcu_read_lock();
2072         nc = rcu_dereference(connection->net_conf);
2073         cork = nc ? nc->tcp_cork : 0;
2074         rcu_read_unlock();
2075         mutex_lock(&connection->data.mutex);
2076         if (connection->data.socket) {
2077                 if (cork)
2078                         drbd_tcp_cork(connection->data.socket);
2079                 else if (!uncork)
2080                         drbd_tcp_uncork(connection->data.socket);
2081         }
2082         mutex_unlock(&connection->data.mutex);
2083 }
2084
2085 int drbd_worker(struct drbd_thread *thi)
2086 {
2087         struct drbd_connection *connection = thi->connection;
2088         struct drbd_work *w = NULL;
2089         struct drbd_peer_device *peer_device;
2090         LIST_HEAD(work_list);
2091         int vnr;
2092
2093         while (get_t_state(thi) == RUNNING) {
2094                 drbd_thread_current_set_cpu(thi);
2095
2096                 if (list_empty(&work_list)) {
2097                         update_worker_timing_details(connection, wait_for_work);
2098                         wait_for_work(connection, &work_list);
2099                 }
2100
2101                 if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2102                         update_worker_timing_details(connection, do_unqueued_work);
2103                         do_unqueued_work(connection);
2104                 }
2105
2106                 if (signal_pending(current)) {
2107                         flush_signals(current);
2108                         if (get_t_state(thi) == RUNNING) {
2109                                 drbd_warn(connection, "Worker got an unexpected signal\n");
2110                                 continue;
2111                         }
2112                         break;
2113                 }
2114
2115                 if (get_t_state(thi) != RUNNING)
2116                         break;
2117
2118                 if (!list_empty(&work_list)) {
2119                         w = list_first_entry(&work_list, struct drbd_work, list);
2120                         list_del_init(&w->list);
2121                         update_worker_timing_details(connection, w->cb);
2122                         if (w->cb(w, connection->cstate < C_WF_REPORT_PARAMS) == 0)
2123                                 continue;
2124                         if (connection->cstate >= C_WF_REPORT_PARAMS)
2125                                 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
2126                 }
2127         }
2128
2129         do {
2130                 if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2131                         update_worker_timing_details(connection, do_unqueued_work);
2132                         do_unqueued_work(connection);
2133                 }
2134                 if (!list_empty(&work_list)) {
2135                         w = list_first_entry(&work_list, struct drbd_work, list);
2136                         list_del_init(&w->list);
2137                         update_worker_timing_details(connection, w->cb);
2138                         w->cb(w, 1);
2139                 } else
2140                         dequeue_work_batch(&connection->sender_work, &work_list);
2141         } while (!list_empty(&work_list) || test_bit(DEVICE_WORK_PENDING, &connection->flags));
2142
2143         rcu_read_lock();
2144         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
2145                 struct drbd_device *device = peer_device->device;
2146                 D_ASSERT(device, device->state.disk == D_DISKLESS && device->state.conn == C_STANDALONE);
2147                 kref_get(&device->kref);
2148                 rcu_read_unlock();
2149                 drbd_device_cleanup(device);
2150                 kref_put(&device->kref, drbd_destroy_device);
2151                 rcu_read_lock();
2152         }
2153         rcu_read_unlock();
2154
2155         return 0;
2156 }