Add the rt linux 4.1.3-rt3 as base
[kvmfornfv.git] / kernel / drivers / block / drbd / drbd_receiver.c
1 /*
2    drbd_receiver.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25
26 #include <linux/module.h>
27
28 #include <asm/uaccess.h>
29 #include <net/sock.h>
30
31 #include <linux/drbd.h>
32 #include <linux/fs.h>
33 #include <linux/file.h>
34 #include <linux/in.h>
35 #include <linux/mm.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/pkt_sched.h>
40 #define __KERNEL_SYSCALLS__
41 #include <linux/unistd.h>
42 #include <linux/vmalloc.h>
43 #include <linux/random.h>
44 #include <linux/string.h>
45 #include <linux/scatterlist.h>
46 #include "drbd_int.h"
47 #include "drbd_protocol.h"
48 #include "drbd_req.h"
49 #include "drbd_vli.h"
50
51 #define PRO_FEATURES (FF_TRIM)
52
53 struct packet_info {
54         enum drbd_packet cmd;
55         unsigned int size;
56         unsigned int vnr;
57         void *data;
58 };
59
60 enum finish_epoch {
61         FE_STILL_LIVE,
62         FE_DESTROYED,
63         FE_RECYCLED,
64 };
65
66 static int drbd_do_features(struct drbd_connection *connection);
67 static int drbd_do_auth(struct drbd_connection *connection);
68 static int drbd_disconnected(struct drbd_peer_device *);
69 static void conn_wait_active_ee_empty(struct drbd_connection *connection);
70 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *, struct drbd_epoch *, enum epoch_event);
71 static int e_end_block(struct drbd_work *, int);
72
73
74 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
75
76 /*
77  * some helper functions to deal with single linked page lists,
78  * page->private being our "next" pointer.
79  */
80
81 /* If at least n pages are linked at head, get n pages off.
82  * Otherwise, don't modify head, and return NULL.
83  * Locking is the responsibility of the caller.
84  */
85 static struct page *page_chain_del(struct page **head, int n)
86 {
87         struct page *page;
88         struct page *tmp;
89
90         BUG_ON(!n);
91         BUG_ON(!head);
92
93         page = *head;
94
95         if (!page)
96                 return NULL;
97
98         while (page) {
99                 tmp = page_chain_next(page);
100                 if (--n == 0)
101                         break; /* found sufficient pages */
102                 if (tmp == NULL)
103                         /* insufficient pages, don't use any of them. */
104                         return NULL;
105                 page = tmp;
106         }
107
108         /* add end of list marker for the returned list */
109         set_page_private(page, 0);
110         /* actual return value, and adjustment of head */
111         page = *head;
112         *head = tmp;
113         return page;
114 }
115
116 /* may be used outside of locks to find the tail of a (usually short)
117  * "private" page chain, before adding it back to a global chain head
118  * with page_chain_add() under a spinlock. */
119 static struct page *page_chain_tail(struct page *page, int *len)
120 {
121         struct page *tmp;
122         int i = 1;
123         while ((tmp = page_chain_next(page)))
124                 ++i, page = tmp;
125         if (len)
126                 *len = i;
127         return page;
128 }
129
130 static int page_chain_free(struct page *page)
131 {
132         struct page *tmp;
133         int i = 0;
134         page_chain_for_each_safe(page, tmp) {
135                 put_page(page);
136                 ++i;
137         }
138         return i;
139 }
140
141 static void page_chain_add(struct page **head,
142                 struct page *chain_first, struct page *chain_last)
143 {
144 #if 1
145         struct page *tmp;
146         tmp = page_chain_tail(chain_first, NULL);
147         BUG_ON(tmp != chain_last);
148 #endif
149
150         /* add chain to head */
151         set_page_private(chain_last, (unsigned long)*head);
152         *head = chain_first;
153 }
154
155 static struct page *__drbd_alloc_pages(struct drbd_device *device,
156                                        unsigned int number)
157 {
158         struct page *page = NULL;
159         struct page *tmp = NULL;
160         unsigned int i = 0;
161
162         /* Yes, testing drbd_pp_vacant outside the lock is racy.
163          * So what. It saves a spin_lock. */
164         if (drbd_pp_vacant >= number) {
165                 spin_lock(&drbd_pp_lock);
166                 page = page_chain_del(&drbd_pp_pool, number);
167                 if (page)
168                         drbd_pp_vacant -= number;
169                 spin_unlock(&drbd_pp_lock);
170                 if (page)
171                         return page;
172         }
173
174         /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
175          * "criss-cross" setup, that might cause write-out on some other DRBD,
176          * which in turn might block on the other node at this very place.  */
177         for (i = 0; i < number; i++) {
178                 tmp = alloc_page(GFP_TRY);
179                 if (!tmp)
180                         break;
181                 set_page_private(tmp, (unsigned long)page);
182                 page = tmp;
183         }
184
185         if (i == number)
186                 return page;
187
188         /* Not enough pages immediately available this time.
189          * No need to jump around here, drbd_alloc_pages will retry this
190          * function "soon". */
191         if (page) {
192                 tmp = page_chain_tail(page, NULL);
193                 spin_lock(&drbd_pp_lock);
194                 page_chain_add(&drbd_pp_pool, page, tmp);
195                 drbd_pp_vacant += i;
196                 spin_unlock(&drbd_pp_lock);
197         }
198         return NULL;
199 }
200
201 static void reclaim_finished_net_peer_reqs(struct drbd_device *device,
202                                            struct list_head *to_be_freed)
203 {
204         struct drbd_peer_request *peer_req, *tmp;
205
206         /* The EEs are always appended to the end of the list. Since
207            they are sent in order over the wire, they have to finish
208            in order. As soon as we see the first not finished we can
209            stop to examine the list... */
210
211         list_for_each_entry_safe(peer_req, tmp, &device->net_ee, w.list) {
212                 if (drbd_peer_req_has_active_page(peer_req))
213                         break;
214                 list_move(&peer_req->w.list, to_be_freed);
215         }
216 }
217
218 static void drbd_kick_lo_and_reclaim_net(struct drbd_device *device)
219 {
220         LIST_HEAD(reclaimed);
221         struct drbd_peer_request *peer_req, *t;
222
223         spin_lock_irq(&device->resource->req_lock);
224         reclaim_finished_net_peer_reqs(device, &reclaimed);
225         spin_unlock_irq(&device->resource->req_lock);
226
227         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
228                 drbd_free_net_peer_req(device, peer_req);
229 }
230
231 /**
232  * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
233  * @device:     DRBD device.
234  * @number:     number of pages requested
235  * @retry:      whether to retry, if not enough pages are available right now
236  *
237  * Tries to allocate number pages, first from our own page pool, then from
238  * the kernel.
239  * Possibly retry until DRBD frees sufficient pages somewhere else.
240  *
241  * If this allocation would exceed the max_buffers setting, we throttle
242  * allocation (schedule_timeout) to give the system some room to breathe.
243  *
244  * We do not use max-buffers as hard limit, because it could lead to
245  * congestion and further to a distributed deadlock during online-verify or
246  * (checksum based) resync, if the max-buffers, socket buffer sizes and
247  * resync-rate settings are mis-configured.
248  *
249  * Returns a page chain linked via page->private.
250  */
251 struct page *drbd_alloc_pages(struct drbd_peer_device *peer_device, unsigned int number,
252                               bool retry)
253 {
254         struct drbd_device *device = peer_device->device;
255         struct page *page = NULL;
256         struct net_conf *nc;
257         DEFINE_WAIT(wait);
258         unsigned int mxb;
259
260         rcu_read_lock();
261         nc = rcu_dereference(peer_device->connection->net_conf);
262         mxb = nc ? nc->max_buffers : 1000000;
263         rcu_read_unlock();
264
265         if (atomic_read(&device->pp_in_use) < mxb)
266                 page = __drbd_alloc_pages(device, number);
267
268         while (page == NULL) {
269                 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
270
271                 drbd_kick_lo_and_reclaim_net(device);
272
273                 if (atomic_read(&device->pp_in_use) < mxb) {
274                         page = __drbd_alloc_pages(device, number);
275                         if (page)
276                                 break;
277                 }
278
279                 if (!retry)
280                         break;
281
282                 if (signal_pending(current)) {
283                         drbd_warn(device, "drbd_alloc_pages interrupted!\n");
284                         break;
285                 }
286
287                 if (schedule_timeout(HZ/10) == 0)
288                         mxb = UINT_MAX;
289         }
290         finish_wait(&drbd_pp_wait, &wait);
291
292         if (page)
293                 atomic_add(number, &device->pp_in_use);
294         return page;
295 }
296
297 /* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
298  * Is also used from inside an other spin_lock_irq(&resource->req_lock);
299  * Either links the page chain back to the global pool,
300  * or returns all pages to the system. */
301 static void drbd_free_pages(struct drbd_device *device, struct page *page, int is_net)
302 {
303         atomic_t *a = is_net ? &device->pp_in_use_by_net : &device->pp_in_use;
304         int i;
305
306         if (page == NULL)
307                 return;
308
309         if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count)
310                 i = page_chain_free(page);
311         else {
312                 struct page *tmp;
313                 tmp = page_chain_tail(page, &i);
314                 spin_lock(&drbd_pp_lock);
315                 page_chain_add(&drbd_pp_pool, page, tmp);
316                 drbd_pp_vacant += i;
317                 spin_unlock(&drbd_pp_lock);
318         }
319         i = atomic_sub_return(i, a);
320         if (i < 0)
321                 drbd_warn(device, "ASSERTION FAILED: %s: %d < 0\n",
322                         is_net ? "pp_in_use_by_net" : "pp_in_use", i);
323         wake_up(&drbd_pp_wait);
324 }
325
326 /*
327 You need to hold the req_lock:
328  _drbd_wait_ee_list_empty()
329
330 You must not have the req_lock:
331  drbd_free_peer_req()
332  drbd_alloc_peer_req()
333  drbd_free_peer_reqs()
334  drbd_ee_fix_bhs()
335  drbd_finish_peer_reqs()
336  drbd_clear_done_ee()
337  drbd_wait_ee_list_empty()
338 */
339
340 struct drbd_peer_request *
341 drbd_alloc_peer_req(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
342                     unsigned int data_size, bool has_payload, gfp_t gfp_mask) __must_hold(local)
343 {
344         struct drbd_device *device = peer_device->device;
345         struct drbd_peer_request *peer_req;
346         struct page *page = NULL;
347         unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
348
349         if (drbd_insert_fault(device, DRBD_FAULT_AL_EE))
350                 return NULL;
351
352         peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
353         if (!peer_req) {
354                 if (!(gfp_mask & __GFP_NOWARN))
355                         drbd_err(device, "%s: allocation failed\n", __func__);
356                 return NULL;
357         }
358
359         if (has_payload && data_size) {
360                 page = drbd_alloc_pages(peer_device, nr_pages, (gfp_mask & __GFP_WAIT));
361                 if (!page)
362                         goto fail;
363         }
364
365         memset(peer_req, 0, sizeof(*peer_req));
366         INIT_LIST_HEAD(&peer_req->w.list);
367         drbd_clear_interval(&peer_req->i);
368         peer_req->i.size = data_size;
369         peer_req->i.sector = sector;
370         peer_req->submit_jif = jiffies;
371         peer_req->peer_device = peer_device;
372         peer_req->pages = page;
373         /*
374          * The block_id is opaque to the receiver.  It is not endianness
375          * converted, and sent back to the sender unchanged.
376          */
377         peer_req->block_id = id;
378
379         return peer_req;
380
381  fail:
382         mempool_free(peer_req, drbd_ee_mempool);
383         return NULL;
384 }
385
386 void __drbd_free_peer_req(struct drbd_device *device, struct drbd_peer_request *peer_req,
387                        int is_net)
388 {
389         might_sleep();
390         if (peer_req->flags & EE_HAS_DIGEST)
391                 kfree(peer_req->digest);
392         drbd_free_pages(device, peer_req->pages, is_net);
393         D_ASSERT(device, atomic_read(&peer_req->pending_bios) == 0);
394         D_ASSERT(device, drbd_interval_empty(&peer_req->i));
395         if (!expect(!(peer_req->flags & EE_CALL_AL_COMPLETE_IO))) {
396                 peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
397                 drbd_al_complete_io(device, &peer_req->i);
398         }
399         mempool_free(peer_req, drbd_ee_mempool);
400 }
401
402 int drbd_free_peer_reqs(struct drbd_device *device, struct list_head *list)
403 {
404         LIST_HEAD(work_list);
405         struct drbd_peer_request *peer_req, *t;
406         int count = 0;
407         int is_net = list == &device->net_ee;
408
409         spin_lock_irq(&device->resource->req_lock);
410         list_splice_init(list, &work_list);
411         spin_unlock_irq(&device->resource->req_lock);
412
413         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
414                 __drbd_free_peer_req(device, peer_req, is_net);
415                 count++;
416         }
417         return count;
418 }
419
420 /*
421  * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
422  */
423 static int drbd_finish_peer_reqs(struct drbd_device *device)
424 {
425         LIST_HEAD(work_list);
426         LIST_HEAD(reclaimed);
427         struct drbd_peer_request *peer_req, *t;
428         int err = 0;
429
430         spin_lock_irq(&device->resource->req_lock);
431         reclaim_finished_net_peer_reqs(device, &reclaimed);
432         list_splice_init(&device->done_ee, &work_list);
433         spin_unlock_irq(&device->resource->req_lock);
434
435         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
436                 drbd_free_net_peer_req(device, peer_req);
437
438         /* possible callbacks here:
439          * e_end_block, and e_end_resync_block, e_send_superseded.
440          * all ignore the last argument.
441          */
442         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
443                 int err2;
444
445                 /* list_del not necessary, next/prev members not touched */
446                 err2 = peer_req->w.cb(&peer_req->w, !!err);
447                 if (!err)
448                         err = err2;
449                 drbd_free_peer_req(device, peer_req);
450         }
451         wake_up(&device->ee_wait);
452
453         return err;
454 }
455
456 static void _drbd_wait_ee_list_empty(struct drbd_device *device,
457                                      struct list_head *head)
458 {
459         DEFINE_WAIT(wait);
460
461         /* avoids spin_lock/unlock
462          * and calling prepare_to_wait in the fast path */
463         while (!list_empty(head)) {
464                 prepare_to_wait(&device->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
465                 spin_unlock_irq(&device->resource->req_lock);
466                 io_schedule();
467                 finish_wait(&device->ee_wait, &wait);
468                 spin_lock_irq(&device->resource->req_lock);
469         }
470 }
471
472 static void drbd_wait_ee_list_empty(struct drbd_device *device,
473                                     struct list_head *head)
474 {
475         spin_lock_irq(&device->resource->req_lock);
476         _drbd_wait_ee_list_empty(device, head);
477         spin_unlock_irq(&device->resource->req_lock);
478 }
479
480 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
481 {
482         struct kvec iov = {
483                 .iov_base = buf,
484                 .iov_len = size,
485         };
486         struct msghdr msg = {
487                 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
488         };
489         return kernel_recvmsg(sock, &msg, &iov, 1, size, msg.msg_flags);
490 }
491
492 static int drbd_recv(struct drbd_connection *connection, void *buf, size_t size)
493 {
494         int rv;
495
496         rv = drbd_recv_short(connection->data.socket, buf, size, 0);
497
498         if (rv < 0) {
499                 if (rv == -ECONNRESET)
500                         drbd_info(connection, "sock was reset by peer\n");
501                 else if (rv != -ERESTARTSYS)
502                         drbd_err(connection, "sock_recvmsg returned %d\n", rv);
503         } else if (rv == 0) {
504                 if (test_bit(DISCONNECT_SENT, &connection->flags)) {
505                         long t;
506                         rcu_read_lock();
507                         t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
508                         rcu_read_unlock();
509
510                         t = wait_event_timeout(connection->ping_wait, connection->cstate < C_WF_REPORT_PARAMS, t);
511
512                         if (t)
513                                 goto out;
514                 }
515                 drbd_info(connection, "sock was shut down by peer\n");
516         }
517
518         if (rv != size)
519                 conn_request_state(connection, NS(conn, C_BROKEN_PIPE), CS_HARD);
520
521 out:
522         return rv;
523 }
524
525 static int drbd_recv_all(struct drbd_connection *connection, void *buf, size_t size)
526 {
527         int err;
528
529         err = drbd_recv(connection, buf, size);
530         if (err != size) {
531                 if (err >= 0)
532                         err = -EIO;
533         } else
534                 err = 0;
535         return err;
536 }
537
538 static int drbd_recv_all_warn(struct drbd_connection *connection, void *buf, size_t size)
539 {
540         int err;
541
542         err = drbd_recv_all(connection, buf, size);
543         if (err && !signal_pending(current))
544                 drbd_warn(connection, "short read (expected size %d)\n", (int)size);
545         return err;
546 }
547
548 /* quoting tcp(7):
549  *   On individual connections, the socket buffer size must be set prior to the
550  *   listen(2) or connect(2) calls in order to have it take effect.
551  * This is our wrapper to do so.
552  */
553 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
554                 unsigned int rcv)
555 {
556         /* open coded SO_SNDBUF, SO_RCVBUF */
557         if (snd) {
558                 sock->sk->sk_sndbuf = snd;
559                 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
560         }
561         if (rcv) {
562                 sock->sk->sk_rcvbuf = rcv;
563                 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
564         }
565 }
566
567 static struct socket *drbd_try_connect(struct drbd_connection *connection)
568 {
569         const char *what;
570         struct socket *sock;
571         struct sockaddr_in6 src_in6;
572         struct sockaddr_in6 peer_in6;
573         struct net_conf *nc;
574         int err, peer_addr_len, my_addr_len;
575         int sndbuf_size, rcvbuf_size, connect_int;
576         int disconnect_on_error = 1;
577
578         rcu_read_lock();
579         nc = rcu_dereference(connection->net_conf);
580         if (!nc) {
581                 rcu_read_unlock();
582                 return NULL;
583         }
584         sndbuf_size = nc->sndbuf_size;
585         rcvbuf_size = nc->rcvbuf_size;
586         connect_int = nc->connect_int;
587         rcu_read_unlock();
588
589         my_addr_len = min_t(int, connection->my_addr_len, sizeof(src_in6));
590         memcpy(&src_in6, &connection->my_addr, my_addr_len);
591
592         if (((struct sockaddr *)&connection->my_addr)->sa_family == AF_INET6)
593                 src_in6.sin6_port = 0;
594         else
595                 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
596
597         peer_addr_len = min_t(int, connection->peer_addr_len, sizeof(src_in6));
598         memcpy(&peer_in6, &connection->peer_addr, peer_addr_len);
599
600         what = "sock_create_kern";
601         err = sock_create_kern(((struct sockaddr *)&src_in6)->sa_family,
602                                SOCK_STREAM, IPPROTO_TCP, &sock);
603         if (err < 0) {
604                 sock = NULL;
605                 goto out;
606         }
607
608         sock->sk->sk_rcvtimeo =
609         sock->sk->sk_sndtimeo = connect_int * HZ;
610         drbd_setbufsize(sock, sndbuf_size, rcvbuf_size);
611
612        /* explicitly bind to the configured IP as source IP
613         *  for the outgoing connections.
614         *  This is needed for multihomed hosts and to be
615         *  able to use lo: interfaces for drbd.
616         * Make sure to use 0 as port number, so linux selects
617         *  a free one dynamically.
618         */
619         what = "bind before connect";
620         err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len);
621         if (err < 0)
622                 goto out;
623
624         /* connect may fail, peer not yet available.
625          * stay C_WF_CONNECTION, don't go Disconnecting! */
626         disconnect_on_error = 0;
627         what = "connect";
628         err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0);
629
630 out:
631         if (err < 0) {
632                 if (sock) {
633                         sock_release(sock);
634                         sock = NULL;
635                 }
636                 switch (-err) {
637                         /* timeout, busy, signal pending */
638                 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
639                 case EINTR: case ERESTARTSYS:
640                         /* peer not (yet) available, network problem */
641                 case ECONNREFUSED: case ENETUNREACH:
642                 case EHOSTDOWN:    case EHOSTUNREACH:
643                         disconnect_on_error = 0;
644                         break;
645                 default:
646                         drbd_err(connection, "%s failed, err = %d\n", what, err);
647                 }
648                 if (disconnect_on_error)
649                         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
650         }
651
652         return sock;
653 }
654
655 struct accept_wait_data {
656         struct drbd_connection *connection;
657         struct socket *s_listen;
658         struct completion door_bell;
659         void (*original_sk_state_change)(struct sock *sk);
660
661 };
662
663 static void drbd_incoming_connection(struct sock *sk)
664 {
665         struct accept_wait_data *ad = sk->sk_user_data;
666         void (*state_change)(struct sock *sk);
667
668         state_change = ad->original_sk_state_change;
669         if (sk->sk_state == TCP_ESTABLISHED)
670                 complete(&ad->door_bell);
671         state_change(sk);
672 }
673
674 static int prepare_listen_socket(struct drbd_connection *connection, struct accept_wait_data *ad)
675 {
676         int err, sndbuf_size, rcvbuf_size, my_addr_len;
677         struct sockaddr_in6 my_addr;
678         struct socket *s_listen;
679         struct net_conf *nc;
680         const char *what;
681
682         rcu_read_lock();
683         nc = rcu_dereference(connection->net_conf);
684         if (!nc) {
685                 rcu_read_unlock();
686                 return -EIO;
687         }
688         sndbuf_size = nc->sndbuf_size;
689         rcvbuf_size = nc->rcvbuf_size;
690         rcu_read_unlock();
691
692         my_addr_len = min_t(int, connection->my_addr_len, sizeof(struct sockaddr_in6));
693         memcpy(&my_addr, &connection->my_addr, my_addr_len);
694
695         what = "sock_create_kern";
696         err = sock_create_kern(((struct sockaddr *)&my_addr)->sa_family,
697                                SOCK_STREAM, IPPROTO_TCP, &s_listen);
698         if (err) {
699                 s_listen = NULL;
700                 goto out;
701         }
702
703         s_listen->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
704         drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size);
705
706         what = "bind before listen";
707         err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len);
708         if (err < 0)
709                 goto out;
710
711         ad->s_listen = s_listen;
712         write_lock_bh(&s_listen->sk->sk_callback_lock);
713         ad->original_sk_state_change = s_listen->sk->sk_state_change;
714         s_listen->sk->sk_state_change = drbd_incoming_connection;
715         s_listen->sk->sk_user_data = ad;
716         write_unlock_bh(&s_listen->sk->sk_callback_lock);
717
718         what = "listen";
719         err = s_listen->ops->listen(s_listen, 5);
720         if (err < 0)
721                 goto out;
722
723         return 0;
724 out:
725         if (s_listen)
726                 sock_release(s_listen);
727         if (err < 0) {
728                 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
729                         drbd_err(connection, "%s failed, err = %d\n", what, err);
730                         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
731                 }
732         }
733
734         return -EIO;
735 }
736
737 static void unregister_state_change(struct sock *sk, struct accept_wait_data *ad)
738 {
739         write_lock_bh(&sk->sk_callback_lock);
740         sk->sk_state_change = ad->original_sk_state_change;
741         sk->sk_user_data = NULL;
742         write_unlock_bh(&sk->sk_callback_lock);
743 }
744
745 static struct socket *drbd_wait_for_connect(struct drbd_connection *connection, struct accept_wait_data *ad)
746 {
747         int timeo, connect_int, err = 0;
748         struct socket *s_estab = NULL;
749         struct net_conf *nc;
750
751         rcu_read_lock();
752         nc = rcu_dereference(connection->net_conf);
753         if (!nc) {
754                 rcu_read_unlock();
755                 return NULL;
756         }
757         connect_int = nc->connect_int;
758         rcu_read_unlock();
759
760         timeo = connect_int * HZ;
761         /* 28.5% random jitter */
762         timeo += (prandom_u32() & 1) ? timeo / 7 : -timeo / 7;
763
764         err = wait_for_completion_interruptible_timeout(&ad->door_bell, timeo);
765         if (err <= 0)
766                 return NULL;
767
768         err = kernel_accept(ad->s_listen, &s_estab, 0);
769         if (err < 0) {
770                 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
771                         drbd_err(connection, "accept failed, err = %d\n", err);
772                         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
773                 }
774         }
775
776         if (s_estab)
777                 unregister_state_change(s_estab->sk, ad);
778
779         return s_estab;
780 }
781
782 static int decode_header(struct drbd_connection *, void *, struct packet_info *);
783
784 static int send_first_packet(struct drbd_connection *connection, struct drbd_socket *sock,
785                              enum drbd_packet cmd)
786 {
787         if (!conn_prepare_command(connection, sock))
788                 return -EIO;
789         return conn_send_command(connection, sock, cmd, 0, NULL, 0);
790 }
791
792 static int receive_first_packet(struct drbd_connection *connection, struct socket *sock)
793 {
794         unsigned int header_size = drbd_header_size(connection);
795         struct packet_info pi;
796         struct net_conf *nc;
797         int err;
798
799         rcu_read_lock();
800         nc = rcu_dereference(connection->net_conf);
801         if (!nc) {
802                 rcu_read_unlock();
803                 return -EIO;
804         }
805         sock->sk->sk_rcvtimeo = nc->ping_timeo * 4 * HZ / 10;
806         rcu_read_unlock();
807
808         err = drbd_recv_short(sock, connection->data.rbuf, header_size, 0);
809         if (err != header_size) {
810                 if (err >= 0)
811                         err = -EIO;
812                 return err;
813         }
814         err = decode_header(connection, connection->data.rbuf, &pi);
815         if (err)
816                 return err;
817         return pi.cmd;
818 }
819
820 /**
821  * drbd_socket_okay() - Free the socket if its connection is not okay
822  * @sock:       pointer to the pointer to the socket.
823  */
824 static bool drbd_socket_okay(struct socket **sock)
825 {
826         int rr;
827         char tb[4];
828
829         if (!*sock)
830                 return false;
831
832         rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
833
834         if (rr > 0 || rr == -EAGAIN) {
835                 return true;
836         } else {
837                 sock_release(*sock);
838                 *sock = NULL;
839                 return false;
840         }
841 }
842
843 static bool connection_established(struct drbd_connection *connection,
844                                    struct socket **sock1,
845                                    struct socket **sock2)
846 {
847         struct net_conf *nc;
848         int timeout;
849         bool ok;
850
851         if (!*sock1 || !*sock2)
852                 return false;
853
854         rcu_read_lock();
855         nc = rcu_dereference(connection->net_conf);
856         timeout = (nc->sock_check_timeo ?: nc->ping_timeo) * HZ / 10;
857         rcu_read_unlock();
858         schedule_timeout_interruptible(timeout);
859
860         ok = drbd_socket_okay(sock1);
861         ok = drbd_socket_okay(sock2) && ok;
862
863         return ok;
864 }
865
866 /* Gets called if a connection is established, or if a new minor gets created
867    in a connection */
868 int drbd_connected(struct drbd_peer_device *peer_device)
869 {
870         struct drbd_device *device = peer_device->device;
871         int err;
872
873         atomic_set(&device->packet_seq, 0);
874         device->peer_seq = 0;
875
876         device->state_mutex = peer_device->connection->agreed_pro_version < 100 ?
877                 &peer_device->connection->cstate_mutex :
878                 &device->own_state_mutex;
879
880         err = drbd_send_sync_param(peer_device);
881         if (!err)
882                 err = drbd_send_sizes(peer_device, 0, 0);
883         if (!err)
884                 err = drbd_send_uuids(peer_device);
885         if (!err)
886                 err = drbd_send_current_state(peer_device);
887         clear_bit(USE_DEGR_WFC_T, &device->flags);
888         clear_bit(RESIZE_PENDING, &device->flags);
889         atomic_set(&device->ap_in_flight, 0);
890         mod_timer(&device->request_timer, jiffies + HZ); /* just start it here. */
891         return err;
892 }
893
894 /*
895  * return values:
896  *   1 yes, we have a valid connection
897  *   0 oops, did not work out, please try again
898  *  -1 peer talks different language,
899  *     no point in trying again, please go standalone.
900  *  -2 We do not have a network config...
901  */
902 static int conn_connect(struct drbd_connection *connection)
903 {
904         struct drbd_socket sock, msock;
905         struct drbd_peer_device *peer_device;
906         struct net_conf *nc;
907         int vnr, timeout, h;
908         bool discard_my_data, ok;
909         enum drbd_state_rv rv;
910         struct accept_wait_data ad = {
911                 .connection = connection,
912                 .door_bell = COMPLETION_INITIALIZER_ONSTACK(ad.door_bell),
913         };
914
915         clear_bit(DISCONNECT_SENT, &connection->flags);
916         if (conn_request_state(connection, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
917                 return -2;
918
919         mutex_init(&sock.mutex);
920         sock.sbuf = connection->data.sbuf;
921         sock.rbuf = connection->data.rbuf;
922         sock.socket = NULL;
923         mutex_init(&msock.mutex);
924         msock.sbuf = connection->meta.sbuf;
925         msock.rbuf = connection->meta.rbuf;
926         msock.socket = NULL;
927
928         /* Assume that the peer only understands protocol 80 until we know better.  */
929         connection->agreed_pro_version = 80;
930
931         if (prepare_listen_socket(connection, &ad))
932                 return 0;
933
934         do {
935                 struct socket *s;
936
937                 s = drbd_try_connect(connection);
938                 if (s) {
939                         if (!sock.socket) {
940                                 sock.socket = s;
941                                 send_first_packet(connection, &sock, P_INITIAL_DATA);
942                         } else if (!msock.socket) {
943                                 clear_bit(RESOLVE_CONFLICTS, &connection->flags);
944                                 msock.socket = s;
945                                 send_first_packet(connection, &msock, P_INITIAL_META);
946                         } else {
947                                 drbd_err(connection, "Logic error in conn_connect()\n");
948                                 goto out_release_sockets;
949                         }
950                 }
951
952                 if (connection_established(connection, &sock.socket, &msock.socket))
953                         break;
954
955 retry:
956                 s = drbd_wait_for_connect(connection, &ad);
957                 if (s) {
958                         int fp = receive_first_packet(connection, s);
959                         drbd_socket_okay(&sock.socket);
960                         drbd_socket_okay(&msock.socket);
961                         switch (fp) {
962                         case P_INITIAL_DATA:
963                                 if (sock.socket) {
964                                         drbd_warn(connection, "initial packet S crossed\n");
965                                         sock_release(sock.socket);
966                                         sock.socket = s;
967                                         goto randomize;
968                                 }
969                                 sock.socket = s;
970                                 break;
971                         case P_INITIAL_META:
972                                 set_bit(RESOLVE_CONFLICTS, &connection->flags);
973                                 if (msock.socket) {
974                                         drbd_warn(connection, "initial packet M crossed\n");
975                                         sock_release(msock.socket);
976                                         msock.socket = s;
977                                         goto randomize;
978                                 }
979                                 msock.socket = s;
980                                 break;
981                         default:
982                                 drbd_warn(connection, "Error receiving initial packet\n");
983                                 sock_release(s);
984 randomize:
985                                 if (prandom_u32() & 1)
986                                         goto retry;
987                         }
988                 }
989
990                 if (connection->cstate <= C_DISCONNECTING)
991                         goto out_release_sockets;
992                 if (signal_pending(current)) {
993                         flush_signals(current);
994                         smp_rmb();
995                         if (get_t_state(&connection->receiver) == EXITING)
996                                 goto out_release_sockets;
997                 }
998
999                 ok = connection_established(connection, &sock.socket, &msock.socket);
1000         } while (!ok);
1001
1002         if (ad.s_listen)
1003                 sock_release(ad.s_listen);
1004
1005         sock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
1006         msock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
1007
1008         sock.socket->sk->sk_allocation = GFP_NOIO;
1009         msock.socket->sk->sk_allocation = GFP_NOIO;
1010
1011         sock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
1012         msock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE;
1013
1014         /* NOT YET ...
1015          * sock.socket->sk->sk_sndtimeo = connection->net_conf->timeout*HZ/10;
1016          * sock.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1017          * first set it to the P_CONNECTION_FEATURES timeout,
1018          * which we set to 4x the configured ping_timeout. */
1019         rcu_read_lock();
1020         nc = rcu_dereference(connection->net_conf);
1021
1022         sock.socket->sk->sk_sndtimeo =
1023         sock.socket->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10;
1024
1025         msock.socket->sk->sk_rcvtimeo = nc->ping_int*HZ;
1026         timeout = nc->timeout * HZ / 10;
1027         discard_my_data = nc->discard_my_data;
1028         rcu_read_unlock();
1029
1030         msock.socket->sk->sk_sndtimeo = timeout;
1031
1032         /* we don't want delays.
1033          * we use TCP_CORK where appropriate, though */
1034         drbd_tcp_nodelay(sock.socket);
1035         drbd_tcp_nodelay(msock.socket);
1036
1037         connection->data.socket = sock.socket;
1038         connection->meta.socket = msock.socket;
1039         connection->last_received = jiffies;
1040
1041         h = drbd_do_features(connection);
1042         if (h <= 0)
1043                 return h;
1044
1045         if (connection->cram_hmac_tfm) {
1046                 /* drbd_request_state(device, NS(conn, WFAuth)); */
1047                 switch (drbd_do_auth(connection)) {
1048                 case -1:
1049                         drbd_err(connection, "Authentication of peer failed\n");
1050                         return -1;
1051                 case 0:
1052                         drbd_err(connection, "Authentication of peer failed, trying again.\n");
1053                         return 0;
1054                 }
1055         }
1056
1057         connection->data.socket->sk->sk_sndtimeo = timeout;
1058         connection->data.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1059
1060         if (drbd_send_protocol(connection) == -EOPNOTSUPP)
1061                 return -1;
1062
1063         /* Prevent a race between resync-handshake and
1064          * being promoted to Primary.
1065          *
1066          * Grab and release the state mutex, so we know that any current
1067          * drbd_set_role() is finished, and any incoming drbd_set_role
1068          * will see the STATE_SENT flag, and wait for it to be cleared.
1069          */
1070         idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1071                 mutex_lock(peer_device->device->state_mutex);
1072
1073         set_bit(STATE_SENT, &connection->flags);
1074
1075         idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1076                 mutex_unlock(peer_device->device->state_mutex);
1077
1078         rcu_read_lock();
1079         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1080                 struct drbd_device *device = peer_device->device;
1081                 kref_get(&device->kref);
1082                 rcu_read_unlock();
1083
1084                 if (discard_my_data)
1085                         set_bit(DISCARD_MY_DATA, &device->flags);
1086                 else
1087                         clear_bit(DISCARD_MY_DATA, &device->flags);
1088
1089                 drbd_connected(peer_device);
1090                 kref_put(&device->kref, drbd_destroy_device);
1091                 rcu_read_lock();
1092         }
1093         rcu_read_unlock();
1094
1095         rv = conn_request_state(connection, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE);
1096         if (rv < SS_SUCCESS || connection->cstate != C_WF_REPORT_PARAMS) {
1097                 clear_bit(STATE_SENT, &connection->flags);
1098                 return 0;
1099         }
1100
1101         drbd_thread_start(&connection->asender);
1102
1103         mutex_lock(&connection->resource->conf_update);
1104         /* The discard_my_data flag is a single-shot modifier to the next
1105          * connection attempt, the handshake of which is now well underway.
1106          * No need for rcu style copying of the whole struct
1107          * just to clear a single value. */
1108         connection->net_conf->discard_my_data = 0;
1109         mutex_unlock(&connection->resource->conf_update);
1110
1111         return h;
1112
1113 out_release_sockets:
1114         if (ad.s_listen)
1115                 sock_release(ad.s_listen);
1116         if (sock.socket)
1117                 sock_release(sock.socket);
1118         if (msock.socket)
1119                 sock_release(msock.socket);
1120         return -1;
1121 }
1122
1123 static int decode_header(struct drbd_connection *connection, void *header, struct packet_info *pi)
1124 {
1125         unsigned int header_size = drbd_header_size(connection);
1126
1127         if (header_size == sizeof(struct p_header100) &&
1128             *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
1129                 struct p_header100 *h = header;
1130                 if (h->pad != 0) {
1131                         drbd_err(connection, "Header padding is not zero\n");
1132                         return -EINVAL;
1133                 }
1134                 pi->vnr = be16_to_cpu(h->volume);
1135                 pi->cmd = be16_to_cpu(h->command);
1136                 pi->size = be32_to_cpu(h->length);
1137         } else if (header_size == sizeof(struct p_header95) &&
1138                    *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
1139                 struct p_header95 *h = header;
1140                 pi->cmd = be16_to_cpu(h->command);
1141                 pi->size = be32_to_cpu(h->length);
1142                 pi->vnr = 0;
1143         } else if (header_size == sizeof(struct p_header80) &&
1144                    *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1145                 struct p_header80 *h = header;
1146                 pi->cmd = be16_to_cpu(h->command);
1147                 pi->size = be16_to_cpu(h->length);
1148                 pi->vnr = 0;
1149         } else {
1150                 drbd_err(connection, "Wrong magic value 0x%08x in protocol version %d\n",
1151                          be32_to_cpu(*(__be32 *)header),
1152                          connection->agreed_pro_version);
1153                 return -EINVAL;
1154         }
1155         pi->data = header + header_size;
1156         return 0;
1157 }
1158
1159 static int drbd_recv_header(struct drbd_connection *connection, struct packet_info *pi)
1160 {
1161         void *buffer = connection->data.rbuf;
1162         int err;
1163
1164         err = drbd_recv_all_warn(connection, buffer, drbd_header_size(connection));
1165         if (err)
1166                 return err;
1167
1168         err = decode_header(connection, buffer, pi);
1169         connection->last_received = jiffies;
1170
1171         return err;
1172 }
1173
1174 static void drbd_flush(struct drbd_connection *connection)
1175 {
1176         int rv;
1177         struct drbd_peer_device *peer_device;
1178         int vnr;
1179
1180         if (connection->resource->write_ordering >= WO_bdev_flush) {
1181                 rcu_read_lock();
1182                 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1183                         struct drbd_device *device = peer_device->device;
1184
1185                         if (!get_ldev(device))
1186                                 continue;
1187                         kref_get(&device->kref);
1188                         rcu_read_unlock();
1189
1190                         /* Right now, we have only this one synchronous code path
1191                          * for flushes between request epochs.
1192                          * We may want to make those asynchronous,
1193                          * or at least parallelize the flushes to the volume devices.
1194                          */
1195                         device->flush_jif = jiffies;
1196                         set_bit(FLUSH_PENDING, &device->flags);
1197                         rv = blkdev_issue_flush(device->ldev->backing_bdev,
1198                                         GFP_NOIO, NULL);
1199                         clear_bit(FLUSH_PENDING, &device->flags);
1200                         if (rv) {
1201                                 drbd_info(device, "local disk flush failed with status %d\n", rv);
1202                                 /* would rather check on EOPNOTSUPP, but that is not reliable.
1203                                  * don't try again for ANY return value != 0
1204                                  * if (rv == -EOPNOTSUPP) */
1205                                 drbd_bump_write_ordering(connection->resource, NULL, WO_drain_io);
1206                         }
1207                         put_ldev(device);
1208                         kref_put(&device->kref, drbd_destroy_device);
1209
1210                         rcu_read_lock();
1211                         if (rv)
1212                                 break;
1213                 }
1214                 rcu_read_unlock();
1215         }
1216 }
1217
1218 /**
1219  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1220  * @device:     DRBD device.
1221  * @epoch:      Epoch object.
1222  * @ev:         Epoch event.
1223  */
1224 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *connection,
1225                                                struct drbd_epoch *epoch,
1226                                                enum epoch_event ev)
1227 {
1228         int epoch_size;
1229         struct drbd_epoch *next_epoch;
1230         enum finish_epoch rv = FE_STILL_LIVE;
1231
1232         spin_lock(&connection->epoch_lock);
1233         do {
1234                 next_epoch = NULL;
1235
1236                 epoch_size = atomic_read(&epoch->epoch_size);
1237
1238                 switch (ev & ~EV_CLEANUP) {
1239                 case EV_PUT:
1240                         atomic_dec(&epoch->active);
1241                         break;
1242                 case EV_GOT_BARRIER_NR:
1243                         set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1244                         break;
1245                 case EV_BECAME_LAST:
1246                         /* nothing to do*/
1247                         break;
1248                 }
1249
1250                 if (epoch_size != 0 &&
1251                     atomic_read(&epoch->active) == 0 &&
1252                     (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) {
1253                         if (!(ev & EV_CLEANUP)) {
1254                                 spin_unlock(&connection->epoch_lock);
1255                                 drbd_send_b_ack(epoch->connection, epoch->barrier_nr, epoch_size);
1256                                 spin_lock(&connection->epoch_lock);
1257                         }
1258 #if 0
1259                         /* FIXME: dec unacked on connection, once we have
1260                          * something to count pending connection packets in. */
1261                         if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags))
1262                                 dec_unacked(epoch->connection);
1263 #endif
1264
1265                         if (connection->current_epoch != epoch) {
1266                                 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1267                                 list_del(&epoch->list);
1268                                 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1269                                 connection->epochs--;
1270                                 kfree(epoch);
1271
1272                                 if (rv == FE_STILL_LIVE)
1273                                         rv = FE_DESTROYED;
1274                         } else {
1275                                 epoch->flags = 0;
1276                                 atomic_set(&epoch->epoch_size, 0);
1277                                 /* atomic_set(&epoch->active, 0); is already zero */
1278                                 if (rv == FE_STILL_LIVE)
1279                                         rv = FE_RECYCLED;
1280                         }
1281                 }
1282
1283                 if (!next_epoch)
1284                         break;
1285
1286                 epoch = next_epoch;
1287         } while (1);
1288
1289         spin_unlock(&connection->epoch_lock);
1290
1291         return rv;
1292 }
1293
1294 static enum write_ordering_e
1295 max_allowed_wo(struct drbd_backing_dev *bdev, enum write_ordering_e wo)
1296 {
1297         struct disk_conf *dc;
1298
1299         dc = rcu_dereference(bdev->disk_conf);
1300
1301         if (wo == WO_bdev_flush && !dc->disk_flushes)
1302                 wo = WO_drain_io;
1303         if (wo == WO_drain_io && !dc->disk_drain)
1304                 wo = WO_none;
1305
1306         return wo;
1307 }
1308
1309 /**
1310  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1311  * @connection: DRBD connection.
1312  * @wo:         Write ordering method to try.
1313  */
1314 void drbd_bump_write_ordering(struct drbd_resource *resource, struct drbd_backing_dev *bdev,
1315                               enum write_ordering_e wo)
1316 {
1317         struct drbd_device *device;
1318         enum write_ordering_e pwo;
1319         int vnr;
1320         static char *write_ordering_str[] = {
1321                 [WO_none] = "none",
1322                 [WO_drain_io] = "drain",
1323                 [WO_bdev_flush] = "flush",
1324         };
1325
1326         pwo = resource->write_ordering;
1327         if (wo != WO_bdev_flush)
1328                 wo = min(pwo, wo);
1329         rcu_read_lock();
1330         idr_for_each_entry(&resource->devices, device, vnr) {
1331                 if (get_ldev(device)) {
1332                         wo = max_allowed_wo(device->ldev, wo);
1333                         if (device->ldev == bdev)
1334                                 bdev = NULL;
1335                         put_ldev(device);
1336                 }
1337         }
1338
1339         if (bdev)
1340                 wo = max_allowed_wo(bdev, wo);
1341
1342         rcu_read_unlock();
1343
1344         resource->write_ordering = wo;
1345         if (pwo != resource->write_ordering || wo == WO_bdev_flush)
1346                 drbd_info(resource, "Method to ensure write ordering: %s\n", write_ordering_str[resource->write_ordering]);
1347 }
1348
1349 /**
1350  * drbd_submit_peer_request()
1351  * @device:     DRBD device.
1352  * @peer_req:   peer request
1353  * @rw:         flag field, see bio->bi_rw
1354  *
1355  * May spread the pages to multiple bios,
1356  * depending on bio_add_page restrictions.
1357  *
1358  * Returns 0 if all bios have been submitted,
1359  * -ENOMEM if we could not allocate enough bios,
1360  * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1361  *  single page to an empty bio (which should never happen and likely indicates
1362  *  that the lower level IO stack is in some way broken). This has been observed
1363  *  on certain Xen deployments.
1364  */
1365 /* TODO allocate from our own bio_set. */
1366 int drbd_submit_peer_request(struct drbd_device *device,
1367                              struct drbd_peer_request *peer_req,
1368                              const unsigned rw, const int fault_type)
1369 {
1370         struct bio *bios = NULL;
1371         struct bio *bio;
1372         struct page *page = peer_req->pages;
1373         sector_t sector = peer_req->i.sector;
1374         unsigned data_size = peer_req->i.size;
1375         unsigned n_bios = 0;
1376         unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
1377         int err = -ENOMEM;
1378
1379         if (peer_req->flags & EE_IS_TRIM_USE_ZEROOUT) {
1380                 /* wait for all pending IO completions, before we start
1381                  * zeroing things out. */
1382                 conn_wait_active_ee_empty(first_peer_device(device)->connection);
1383                 /* add it to the active list now,
1384                  * so we can find it to present it in debugfs */
1385                 peer_req->submit_jif = jiffies;
1386                 peer_req->flags |= EE_SUBMITTED;
1387                 spin_lock_irq(&device->resource->req_lock);
1388                 list_add_tail(&peer_req->w.list, &device->active_ee);
1389                 spin_unlock_irq(&device->resource->req_lock);
1390                 if (blkdev_issue_zeroout(device->ldev->backing_bdev,
1391                         sector, data_size >> 9, GFP_NOIO, false))
1392                         peer_req->flags |= EE_WAS_ERROR;
1393                 drbd_endio_write_sec_final(peer_req);
1394                 return 0;
1395         }
1396
1397         /* Discards don't have any payload.
1398          * But the scsi layer still expects a bio_vec it can use internally,
1399          * see sd_setup_discard_cmnd() and blk_add_request_payload(). */
1400         if (peer_req->flags & EE_IS_TRIM)
1401                 nr_pages = 1;
1402
1403         /* In most cases, we will only need one bio.  But in case the lower
1404          * level restrictions happen to be different at this offset on this
1405          * side than those of the sending peer, we may need to submit the
1406          * request in more than one bio.
1407          *
1408          * Plain bio_alloc is good enough here, this is no DRBD internally
1409          * generated bio, but a bio allocated on behalf of the peer.
1410          */
1411 next_bio:
1412         bio = bio_alloc(GFP_NOIO, nr_pages);
1413         if (!bio) {
1414                 drbd_err(device, "submit_ee: Allocation of a bio failed (nr_pages=%u)\n", nr_pages);
1415                 goto fail;
1416         }
1417         /* > peer_req->i.sector, unless this is the first bio */
1418         bio->bi_iter.bi_sector = sector;
1419         bio->bi_bdev = device->ldev->backing_bdev;
1420         bio->bi_rw = rw;
1421         bio->bi_private = peer_req;
1422         bio->bi_end_io = drbd_peer_request_endio;
1423
1424         bio->bi_next = bios;
1425         bios = bio;
1426         ++n_bios;
1427
1428         if (rw & REQ_DISCARD) {
1429                 bio->bi_iter.bi_size = data_size;
1430                 goto submit;
1431         }
1432
1433         page_chain_for_each(page) {
1434                 unsigned len = min_t(unsigned, data_size, PAGE_SIZE);
1435                 if (!bio_add_page(bio, page, len, 0)) {
1436                         /* A single page must always be possible!
1437                          * But in case it fails anyways,
1438                          * we deal with it, and complain (below). */
1439                         if (bio->bi_vcnt == 0) {
1440                                 drbd_err(device,
1441                                         "bio_add_page failed for len=%u, "
1442                                         "bi_vcnt=0 (bi_sector=%llu)\n",
1443                                         len, (uint64_t)bio->bi_iter.bi_sector);
1444                                 err = -ENOSPC;
1445                                 goto fail;
1446                         }
1447                         goto next_bio;
1448                 }
1449                 data_size -= len;
1450                 sector += len >> 9;
1451                 --nr_pages;
1452         }
1453         D_ASSERT(device, data_size == 0);
1454 submit:
1455         D_ASSERT(device, page == NULL);
1456
1457         atomic_set(&peer_req->pending_bios, n_bios);
1458         /* for debugfs: update timestamp, mark as submitted */
1459         peer_req->submit_jif = jiffies;
1460         peer_req->flags |= EE_SUBMITTED;
1461         do {
1462                 bio = bios;
1463                 bios = bios->bi_next;
1464                 bio->bi_next = NULL;
1465
1466                 drbd_generic_make_request(device, fault_type, bio);
1467         } while (bios);
1468         return 0;
1469
1470 fail:
1471         while (bios) {
1472                 bio = bios;
1473                 bios = bios->bi_next;
1474                 bio_put(bio);
1475         }
1476         return err;
1477 }
1478
1479 static void drbd_remove_epoch_entry_interval(struct drbd_device *device,
1480                                              struct drbd_peer_request *peer_req)
1481 {
1482         struct drbd_interval *i = &peer_req->i;
1483
1484         drbd_remove_interval(&device->write_requests, i);
1485         drbd_clear_interval(i);
1486
1487         /* Wake up any processes waiting for this peer request to complete.  */
1488         if (i->waiting)
1489                 wake_up(&device->misc_wait);
1490 }
1491
1492 static void conn_wait_active_ee_empty(struct drbd_connection *connection)
1493 {
1494         struct drbd_peer_device *peer_device;
1495         int vnr;
1496
1497         rcu_read_lock();
1498         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1499                 struct drbd_device *device = peer_device->device;
1500
1501                 kref_get(&device->kref);
1502                 rcu_read_unlock();
1503                 drbd_wait_ee_list_empty(device, &device->active_ee);
1504                 kref_put(&device->kref, drbd_destroy_device);
1505                 rcu_read_lock();
1506         }
1507         rcu_read_unlock();
1508 }
1509
1510 static struct drbd_peer_device *
1511 conn_peer_device(struct drbd_connection *connection, int volume_number)
1512 {
1513         return idr_find(&connection->peer_devices, volume_number);
1514 }
1515
1516 static int receive_Barrier(struct drbd_connection *connection, struct packet_info *pi)
1517 {
1518         int rv;
1519         struct p_barrier *p = pi->data;
1520         struct drbd_epoch *epoch;
1521
1522         /* FIXME these are unacked on connection,
1523          * not a specific (peer)device.
1524          */
1525         connection->current_epoch->barrier_nr = p->barrier;
1526         connection->current_epoch->connection = connection;
1527         rv = drbd_may_finish_epoch(connection, connection->current_epoch, EV_GOT_BARRIER_NR);
1528
1529         /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1530          * the activity log, which means it would not be resynced in case the
1531          * R_PRIMARY crashes now.
1532          * Therefore we must send the barrier_ack after the barrier request was
1533          * completed. */
1534         switch (connection->resource->write_ordering) {
1535         case WO_none:
1536                 if (rv == FE_RECYCLED)
1537                         return 0;
1538
1539                 /* receiver context, in the writeout path of the other node.
1540                  * avoid potential distributed deadlock */
1541                 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1542                 if (epoch)
1543                         break;
1544                 else
1545                         drbd_warn(connection, "Allocation of an epoch failed, slowing down\n");
1546                         /* Fall through */
1547
1548         case WO_bdev_flush:
1549         case WO_drain_io:
1550                 conn_wait_active_ee_empty(connection);
1551                 drbd_flush(connection);
1552
1553                 if (atomic_read(&connection->current_epoch->epoch_size)) {
1554                         epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1555                         if (epoch)
1556                                 break;
1557                 }
1558
1559                 return 0;
1560         default:
1561                 drbd_err(connection, "Strangeness in connection->write_ordering %d\n",
1562                          connection->resource->write_ordering);
1563                 return -EIO;
1564         }
1565
1566         epoch->flags = 0;
1567         atomic_set(&epoch->epoch_size, 0);
1568         atomic_set(&epoch->active, 0);
1569
1570         spin_lock(&connection->epoch_lock);
1571         if (atomic_read(&connection->current_epoch->epoch_size)) {
1572                 list_add(&epoch->list, &connection->current_epoch->list);
1573                 connection->current_epoch = epoch;
1574                 connection->epochs++;
1575         } else {
1576                 /* The current_epoch got recycled while we allocated this one... */
1577                 kfree(epoch);
1578         }
1579         spin_unlock(&connection->epoch_lock);
1580
1581         return 0;
1582 }
1583
1584 /* used from receive_RSDataReply (recv_resync_read)
1585  * and from receive_Data */
1586 static struct drbd_peer_request *
1587 read_in_block(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
1588               struct packet_info *pi) __must_hold(local)
1589 {
1590         struct drbd_device *device = peer_device->device;
1591         const sector_t capacity = drbd_get_capacity(device->this_bdev);
1592         struct drbd_peer_request *peer_req;
1593         struct page *page;
1594         int digest_size, err;
1595         unsigned int data_size = pi->size, ds;
1596         void *dig_in = peer_device->connection->int_dig_in;
1597         void *dig_vv = peer_device->connection->int_dig_vv;
1598         unsigned long *data;
1599         struct p_trim *trim = (pi->cmd == P_TRIM) ? pi->data : NULL;
1600
1601         digest_size = 0;
1602         if (!trim && peer_device->connection->peer_integrity_tfm) {
1603                 digest_size = crypto_hash_digestsize(peer_device->connection->peer_integrity_tfm);
1604                 /*
1605                  * FIXME: Receive the incoming digest into the receive buffer
1606                  *        here, together with its struct p_data?
1607                  */
1608                 err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size);
1609                 if (err)
1610                         return NULL;
1611                 data_size -= digest_size;
1612         }
1613
1614         if (trim) {
1615                 D_ASSERT(peer_device, data_size == 0);
1616                 data_size = be32_to_cpu(trim->size);
1617         }
1618
1619         if (!expect(IS_ALIGNED(data_size, 512)))
1620                 return NULL;
1621         /* prepare for larger trim requests. */
1622         if (!trim && !expect(data_size <= DRBD_MAX_BIO_SIZE))
1623                 return NULL;
1624
1625         /* even though we trust out peer,
1626          * we sometimes have to double check. */
1627         if (sector + (data_size>>9) > capacity) {
1628                 drbd_err(device, "request from peer beyond end of local disk: "
1629                         "capacity: %llus < sector: %llus + size: %u\n",
1630                         (unsigned long long)capacity,
1631                         (unsigned long long)sector, data_size);
1632                 return NULL;
1633         }
1634
1635         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1636          * "criss-cross" setup, that might cause write-out on some other DRBD,
1637          * which in turn might block on the other node at this very place.  */
1638         peer_req = drbd_alloc_peer_req(peer_device, id, sector, data_size, trim == NULL, GFP_NOIO);
1639         if (!peer_req)
1640                 return NULL;
1641
1642         peer_req->flags |= EE_WRITE;
1643         if (trim)
1644                 return peer_req;
1645
1646         ds = data_size;
1647         page = peer_req->pages;
1648         page_chain_for_each(page) {
1649                 unsigned len = min_t(int, ds, PAGE_SIZE);
1650                 data = kmap(page);
1651                 err = drbd_recv_all_warn(peer_device->connection, data, len);
1652                 if (drbd_insert_fault(device, DRBD_FAULT_RECEIVE)) {
1653                         drbd_err(device, "Fault injection: Corrupting data on receive\n");
1654                         data[0] = data[0] ^ (unsigned long)-1;
1655                 }
1656                 kunmap(page);
1657                 if (err) {
1658                         drbd_free_peer_req(device, peer_req);
1659                         return NULL;
1660                 }
1661                 ds -= len;
1662         }
1663
1664         if (digest_size) {
1665                 drbd_csum_ee(peer_device->connection->peer_integrity_tfm, peer_req, dig_vv);
1666                 if (memcmp(dig_in, dig_vv, digest_size)) {
1667                         drbd_err(device, "Digest integrity check FAILED: %llus +%u\n",
1668                                 (unsigned long long)sector, data_size);
1669                         drbd_free_peer_req(device, peer_req);
1670                         return NULL;
1671                 }
1672         }
1673         device->recv_cnt += data_size >> 9;
1674         return peer_req;
1675 }
1676
1677 /* drbd_drain_block() just takes a data block
1678  * out of the socket input buffer, and discards it.
1679  */
1680 static int drbd_drain_block(struct drbd_peer_device *peer_device, int data_size)
1681 {
1682         struct page *page;
1683         int err = 0;
1684         void *data;
1685
1686         if (!data_size)
1687                 return 0;
1688
1689         page = drbd_alloc_pages(peer_device, 1, 1);
1690
1691         data = kmap(page);
1692         while (data_size) {
1693                 unsigned int len = min_t(int, data_size, PAGE_SIZE);
1694
1695                 err = drbd_recv_all_warn(peer_device->connection, data, len);
1696                 if (err)
1697                         break;
1698                 data_size -= len;
1699         }
1700         kunmap(page);
1701         drbd_free_pages(peer_device->device, page, 0);
1702         return err;
1703 }
1704
1705 static int recv_dless_read(struct drbd_peer_device *peer_device, struct drbd_request *req,
1706                            sector_t sector, int data_size)
1707 {
1708         struct bio_vec bvec;
1709         struct bvec_iter iter;
1710         struct bio *bio;
1711         int digest_size, err, expect;
1712         void *dig_in = peer_device->connection->int_dig_in;
1713         void *dig_vv = peer_device->connection->int_dig_vv;
1714
1715         digest_size = 0;
1716         if (peer_device->connection->peer_integrity_tfm) {
1717                 digest_size = crypto_hash_digestsize(peer_device->connection->peer_integrity_tfm);
1718                 err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size);
1719                 if (err)
1720                         return err;
1721                 data_size -= digest_size;
1722         }
1723
1724         /* optimistically update recv_cnt.  if receiving fails below,
1725          * we disconnect anyways, and counters will be reset. */
1726         peer_device->device->recv_cnt += data_size>>9;
1727
1728         bio = req->master_bio;
1729         D_ASSERT(peer_device->device, sector == bio->bi_iter.bi_sector);
1730
1731         bio_for_each_segment(bvec, bio, iter) {
1732                 void *mapped = kmap(bvec.bv_page) + bvec.bv_offset;
1733                 expect = min_t(int, data_size, bvec.bv_len);
1734                 err = drbd_recv_all_warn(peer_device->connection, mapped, expect);
1735                 kunmap(bvec.bv_page);
1736                 if (err)
1737                         return err;
1738                 data_size -= expect;
1739         }
1740
1741         if (digest_size) {
1742                 drbd_csum_bio(peer_device->connection->peer_integrity_tfm, bio, dig_vv);
1743                 if (memcmp(dig_in, dig_vv, digest_size)) {
1744                         drbd_err(peer_device, "Digest integrity check FAILED. Broken NICs?\n");
1745                         return -EINVAL;
1746                 }
1747         }
1748
1749         D_ASSERT(peer_device->device, data_size == 0);
1750         return 0;
1751 }
1752
1753 /*
1754  * e_end_resync_block() is called in asender context via
1755  * drbd_finish_peer_reqs().
1756  */
1757 static int e_end_resync_block(struct drbd_work *w, int unused)
1758 {
1759         struct drbd_peer_request *peer_req =
1760                 container_of(w, struct drbd_peer_request, w);
1761         struct drbd_peer_device *peer_device = peer_req->peer_device;
1762         struct drbd_device *device = peer_device->device;
1763         sector_t sector = peer_req->i.sector;
1764         int err;
1765
1766         D_ASSERT(device, drbd_interval_empty(&peer_req->i));
1767
1768         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1769                 drbd_set_in_sync(device, sector, peer_req->i.size);
1770                 err = drbd_send_ack(peer_device, P_RS_WRITE_ACK, peer_req);
1771         } else {
1772                 /* Record failure to sync */
1773                 drbd_rs_failed_io(device, sector, peer_req->i.size);
1774
1775                 err  = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
1776         }
1777         dec_unacked(device);
1778
1779         return err;
1780 }
1781
1782 static int recv_resync_read(struct drbd_peer_device *peer_device, sector_t sector,
1783                             struct packet_info *pi) __releases(local)
1784 {
1785         struct drbd_device *device = peer_device->device;
1786         struct drbd_peer_request *peer_req;
1787
1788         peer_req = read_in_block(peer_device, ID_SYNCER, sector, pi);
1789         if (!peer_req)
1790                 goto fail;
1791
1792         dec_rs_pending(device);
1793
1794         inc_unacked(device);
1795         /* corresponding dec_unacked() in e_end_resync_block()
1796          * respective _drbd_clear_done_ee */
1797
1798         peer_req->w.cb = e_end_resync_block;
1799         peer_req->submit_jif = jiffies;
1800
1801         spin_lock_irq(&device->resource->req_lock);
1802         list_add_tail(&peer_req->w.list, &device->sync_ee);
1803         spin_unlock_irq(&device->resource->req_lock);
1804
1805         atomic_add(pi->size >> 9, &device->rs_sect_ev);
1806         if (drbd_submit_peer_request(device, peer_req, WRITE, DRBD_FAULT_RS_WR) == 0)
1807                 return 0;
1808
1809         /* don't care for the reason here */
1810         drbd_err(device, "submit failed, triggering re-connect\n");
1811         spin_lock_irq(&device->resource->req_lock);
1812         list_del(&peer_req->w.list);
1813         spin_unlock_irq(&device->resource->req_lock);
1814
1815         drbd_free_peer_req(device, peer_req);
1816 fail:
1817         put_ldev(device);
1818         return -EIO;
1819 }
1820
1821 static struct drbd_request *
1822 find_request(struct drbd_device *device, struct rb_root *root, u64 id,
1823              sector_t sector, bool missing_ok, const char *func)
1824 {
1825         struct drbd_request *req;
1826
1827         /* Request object according to our peer */
1828         req = (struct drbd_request *)(unsigned long)id;
1829         if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
1830                 return req;
1831         if (!missing_ok) {
1832                 drbd_err(device, "%s: failed to find request 0x%lx, sector %llus\n", func,
1833                         (unsigned long)id, (unsigned long long)sector);
1834         }
1835         return NULL;
1836 }
1837
1838 static int receive_DataReply(struct drbd_connection *connection, struct packet_info *pi)
1839 {
1840         struct drbd_peer_device *peer_device;
1841         struct drbd_device *device;
1842         struct drbd_request *req;
1843         sector_t sector;
1844         int err;
1845         struct p_data *p = pi->data;
1846
1847         peer_device = conn_peer_device(connection, pi->vnr);
1848         if (!peer_device)
1849                 return -EIO;
1850         device = peer_device->device;
1851
1852         sector = be64_to_cpu(p->sector);
1853
1854         spin_lock_irq(&device->resource->req_lock);
1855         req = find_request(device, &device->read_requests, p->block_id, sector, false, __func__);
1856         spin_unlock_irq(&device->resource->req_lock);
1857         if (unlikely(!req))
1858                 return -EIO;
1859
1860         /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
1861          * special casing it there for the various failure cases.
1862          * still no race with drbd_fail_pending_reads */
1863         err = recv_dless_read(peer_device, req, sector, pi->size);
1864         if (!err)
1865                 req_mod(req, DATA_RECEIVED);
1866         /* else: nothing. handled from drbd_disconnect...
1867          * I don't think we may complete this just yet
1868          * in case we are "on-disconnect: freeze" */
1869
1870         return err;
1871 }
1872
1873 static int receive_RSDataReply(struct drbd_connection *connection, struct packet_info *pi)
1874 {
1875         struct drbd_peer_device *peer_device;
1876         struct drbd_device *device;
1877         sector_t sector;
1878         int err;
1879         struct p_data *p = pi->data;
1880
1881         peer_device = conn_peer_device(connection, pi->vnr);
1882         if (!peer_device)
1883                 return -EIO;
1884         device = peer_device->device;
1885
1886         sector = be64_to_cpu(p->sector);
1887         D_ASSERT(device, p->block_id == ID_SYNCER);
1888
1889         if (get_ldev(device)) {
1890                 /* data is submitted to disk within recv_resync_read.
1891                  * corresponding put_ldev done below on error,
1892                  * or in drbd_peer_request_endio. */
1893                 err = recv_resync_read(peer_device, sector, pi);
1894         } else {
1895                 if (__ratelimit(&drbd_ratelimit_state))
1896                         drbd_err(device, "Can not write resync data to local disk.\n");
1897
1898                 err = drbd_drain_block(peer_device, pi->size);
1899
1900                 drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
1901         }
1902
1903         atomic_add(pi->size >> 9, &device->rs_sect_in);
1904
1905         return err;
1906 }
1907
1908 static void restart_conflicting_writes(struct drbd_device *device,
1909                                        sector_t sector, int size)
1910 {
1911         struct drbd_interval *i;
1912         struct drbd_request *req;
1913
1914         drbd_for_each_overlap(i, &device->write_requests, sector, size) {
1915                 if (!i->local)
1916                         continue;
1917                 req = container_of(i, struct drbd_request, i);
1918                 if (req->rq_state & RQ_LOCAL_PENDING ||
1919                     !(req->rq_state & RQ_POSTPONED))
1920                         continue;
1921                 /* as it is RQ_POSTPONED, this will cause it to
1922                  * be queued on the retry workqueue. */
1923                 __req_mod(req, CONFLICT_RESOLVED, NULL);
1924         }
1925 }
1926
1927 /*
1928  * e_end_block() is called in asender context via drbd_finish_peer_reqs().
1929  */
1930 static int e_end_block(struct drbd_work *w, int cancel)
1931 {
1932         struct drbd_peer_request *peer_req =
1933                 container_of(w, struct drbd_peer_request, w);
1934         struct drbd_peer_device *peer_device = peer_req->peer_device;
1935         struct drbd_device *device = peer_device->device;
1936         sector_t sector = peer_req->i.sector;
1937         int err = 0, pcmd;
1938
1939         if (peer_req->flags & EE_SEND_WRITE_ACK) {
1940                 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1941                         pcmd = (device->state.conn >= C_SYNC_SOURCE &&
1942                                 device->state.conn <= C_PAUSED_SYNC_T &&
1943                                 peer_req->flags & EE_MAY_SET_IN_SYNC) ?
1944                                 P_RS_WRITE_ACK : P_WRITE_ACK;
1945                         err = drbd_send_ack(peer_device, pcmd, peer_req);
1946                         if (pcmd == P_RS_WRITE_ACK)
1947                                 drbd_set_in_sync(device, sector, peer_req->i.size);
1948                 } else {
1949                         err = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
1950                         /* we expect it to be marked out of sync anyways...
1951                          * maybe assert this?  */
1952                 }
1953                 dec_unacked(device);
1954         }
1955
1956         /* we delete from the conflict detection hash _after_ we sent out the
1957          * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
1958         if (peer_req->flags & EE_IN_INTERVAL_TREE) {
1959                 spin_lock_irq(&device->resource->req_lock);
1960                 D_ASSERT(device, !drbd_interval_empty(&peer_req->i));
1961                 drbd_remove_epoch_entry_interval(device, peer_req);
1962                 if (peer_req->flags & EE_RESTART_REQUESTS)
1963                         restart_conflicting_writes(device, sector, peer_req->i.size);
1964                 spin_unlock_irq(&device->resource->req_lock);
1965         } else
1966                 D_ASSERT(device, drbd_interval_empty(&peer_req->i));
1967
1968         drbd_may_finish_epoch(first_peer_device(device)->connection, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1969
1970         return err;
1971 }
1972
1973 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
1974 {
1975         struct drbd_peer_request *peer_req =
1976                 container_of(w, struct drbd_peer_request, w);
1977         struct drbd_peer_device *peer_device = peer_req->peer_device;
1978         int err;
1979
1980         err = drbd_send_ack(peer_device, ack, peer_req);
1981         dec_unacked(peer_device->device);
1982
1983         return err;
1984 }
1985
1986 static int e_send_superseded(struct drbd_work *w, int unused)
1987 {
1988         return e_send_ack(w, P_SUPERSEDED);
1989 }
1990
1991 static int e_send_retry_write(struct drbd_work *w, int unused)
1992 {
1993         struct drbd_peer_request *peer_req =
1994                 container_of(w, struct drbd_peer_request, w);
1995         struct drbd_connection *connection = peer_req->peer_device->connection;
1996
1997         return e_send_ack(w, connection->agreed_pro_version >= 100 ?
1998                              P_RETRY_WRITE : P_SUPERSEDED);
1999 }
2000
2001 static bool seq_greater(u32 a, u32 b)
2002 {
2003         /*
2004          * We assume 32-bit wrap-around here.
2005          * For 24-bit wrap-around, we would have to shift:
2006          *  a <<= 8; b <<= 8;
2007          */
2008         return (s32)a - (s32)b > 0;
2009 }
2010
2011 static u32 seq_max(u32 a, u32 b)
2012 {
2013         return seq_greater(a, b) ? a : b;
2014 }
2015
2016 static void update_peer_seq(struct drbd_peer_device *peer_device, unsigned int peer_seq)
2017 {
2018         struct drbd_device *device = peer_device->device;
2019         unsigned int newest_peer_seq;
2020
2021         if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)) {
2022                 spin_lock(&device->peer_seq_lock);
2023                 newest_peer_seq = seq_max(device->peer_seq, peer_seq);
2024                 device->peer_seq = newest_peer_seq;
2025                 spin_unlock(&device->peer_seq_lock);
2026                 /* wake up only if we actually changed device->peer_seq */
2027                 if (peer_seq == newest_peer_seq)
2028                         wake_up(&device->seq_wait);
2029         }
2030 }
2031
2032 static inline int overlaps(sector_t s1, int l1, sector_t s2, int l2)
2033 {
2034         return !((s1 + (l1>>9) <= s2) || (s1 >= s2 + (l2>>9)));
2035 }
2036
2037 /* maybe change sync_ee into interval trees as well? */
2038 static bool overlapping_resync_write(struct drbd_device *device, struct drbd_peer_request *peer_req)
2039 {
2040         struct drbd_peer_request *rs_req;
2041         bool rv = 0;
2042
2043         spin_lock_irq(&device->resource->req_lock);
2044         list_for_each_entry(rs_req, &device->sync_ee, w.list) {
2045                 if (overlaps(peer_req->i.sector, peer_req->i.size,
2046                              rs_req->i.sector, rs_req->i.size)) {
2047                         rv = 1;
2048                         break;
2049                 }
2050         }
2051         spin_unlock_irq(&device->resource->req_lock);
2052
2053         return rv;
2054 }
2055
2056 /* Called from receive_Data.
2057  * Synchronize packets on sock with packets on msock.
2058  *
2059  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
2060  * packet traveling on msock, they are still processed in the order they have
2061  * been sent.
2062  *
2063  * Note: we don't care for Ack packets overtaking P_DATA packets.
2064  *
2065  * In case packet_seq is larger than device->peer_seq number, there are
2066  * outstanding packets on the msock. We wait for them to arrive.
2067  * In case we are the logically next packet, we update device->peer_seq
2068  * ourselves. Correctly handles 32bit wrap around.
2069  *
2070  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
2071  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
2072  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
2073  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
2074  *
2075  * returns 0 if we may process the packet,
2076  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
2077 static int wait_for_and_update_peer_seq(struct drbd_peer_device *peer_device, const u32 peer_seq)
2078 {
2079         struct drbd_device *device = peer_device->device;
2080         DEFINE_WAIT(wait);
2081         long timeout;
2082         int ret = 0, tp;
2083
2084         if (!test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags))
2085                 return 0;
2086
2087         spin_lock(&device->peer_seq_lock);
2088         for (;;) {
2089                 if (!seq_greater(peer_seq - 1, device->peer_seq)) {
2090                         device->peer_seq = seq_max(device->peer_seq, peer_seq);
2091                         break;
2092                 }
2093
2094                 if (signal_pending(current)) {
2095                         ret = -ERESTARTSYS;
2096                         break;
2097                 }
2098
2099                 rcu_read_lock();
2100                 tp = rcu_dereference(first_peer_device(device)->connection->net_conf)->two_primaries;
2101                 rcu_read_unlock();
2102
2103                 if (!tp)
2104                         break;
2105
2106                 /* Only need to wait if two_primaries is enabled */
2107                 prepare_to_wait(&device->seq_wait, &wait, TASK_INTERRUPTIBLE);
2108                 spin_unlock(&device->peer_seq_lock);
2109                 rcu_read_lock();
2110                 timeout = rcu_dereference(peer_device->connection->net_conf)->ping_timeo*HZ/10;
2111                 rcu_read_unlock();
2112                 timeout = schedule_timeout(timeout);
2113                 spin_lock(&device->peer_seq_lock);
2114                 if (!timeout) {
2115                         ret = -ETIMEDOUT;
2116                         drbd_err(device, "Timed out waiting for missing ack packets; disconnecting\n");
2117                         break;
2118                 }
2119         }
2120         spin_unlock(&device->peer_seq_lock);
2121         finish_wait(&device->seq_wait, &wait);
2122         return ret;
2123 }
2124
2125 /* see also bio_flags_to_wire()
2126  * DRBD_REQ_*, because we need to semantically map the flags to data packet
2127  * flags and back. We may replicate to other kernel versions. */
2128 static unsigned long wire_flags_to_bio(u32 dpf)
2129 {
2130         return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
2131                 (dpf & DP_FUA ? REQ_FUA : 0) |
2132                 (dpf & DP_FLUSH ? REQ_FLUSH : 0) |
2133                 (dpf & DP_DISCARD ? REQ_DISCARD : 0);
2134 }
2135
2136 static void fail_postponed_requests(struct drbd_device *device, sector_t sector,
2137                                     unsigned int size)
2138 {
2139         struct drbd_interval *i;
2140
2141     repeat:
2142         drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2143                 struct drbd_request *req;
2144                 struct bio_and_error m;
2145
2146                 if (!i->local)
2147                         continue;
2148                 req = container_of(i, struct drbd_request, i);
2149                 if (!(req->rq_state & RQ_POSTPONED))
2150                         continue;
2151                 req->rq_state &= ~RQ_POSTPONED;
2152                 __req_mod(req, NEG_ACKED, &m);
2153                 spin_unlock_irq(&device->resource->req_lock);
2154                 if (m.bio)
2155                         complete_master_bio(device, &m);
2156                 spin_lock_irq(&device->resource->req_lock);
2157                 goto repeat;
2158         }
2159 }
2160
2161 static int handle_write_conflicts(struct drbd_device *device,
2162                                   struct drbd_peer_request *peer_req)
2163 {
2164         struct drbd_connection *connection = peer_req->peer_device->connection;
2165         bool resolve_conflicts = test_bit(RESOLVE_CONFLICTS, &connection->flags);
2166         sector_t sector = peer_req->i.sector;
2167         const unsigned int size = peer_req->i.size;
2168         struct drbd_interval *i;
2169         bool equal;
2170         int err;
2171
2172         /*
2173          * Inserting the peer request into the write_requests tree will prevent
2174          * new conflicting local requests from being added.
2175          */
2176         drbd_insert_interval(&device->write_requests, &peer_req->i);
2177
2178     repeat:
2179         drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2180                 if (i == &peer_req->i)
2181                         continue;
2182                 if (i->completed)
2183                         continue;
2184
2185                 if (!i->local) {
2186                         /*
2187                          * Our peer has sent a conflicting remote request; this
2188                          * should not happen in a two-node setup.  Wait for the
2189                          * earlier peer request to complete.
2190                          */
2191                         err = drbd_wait_misc(device, i);
2192                         if (err)
2193                                 goto out;
2194                         goto repeat;
2195                 }
2196
2197                 equal = i->sector == sector && i->size == size;
2198                 if (resolve_conflicts) {
2199                         /*
2200                          * If the peer request is fully contained within the
2201                          * overlapping request, it can be considered overwritten
2202                          * and thus superseded; otherwise, it will be retried
2203                          * once all overlapping requests have completed.
2204                          */
2205                         bool superseded = i->sector <= sector && i->sector +
2206                                        (i->size >> 9) >= sector + (size >> 9);
2207
2208                         if (!equal)
2209                                 drbd_alert(device, "Concurrent writes detected: "
2210                                                "local=%llus +%u, remote=%llus +%u, "
2211                                                "assuming %s came first\n",
2212                                           (unsigned long long)i->sector, i->size,
2213                                           (unsigned long long)sector, size,
2214                                           superseded ? "local" : "remote");
2215
2216                         peer_req->w.cb = superseded ? e_send_superseded :
2217                                                    e_send_retry_write;
2218                         list_add_tail(&peer_req->w.list, &device->done_ee);
2219                         wake_asender(connection);
2220
2221                         err = -ENOENT;
2222                         goto out;
2223                 } else {
2224                         struct drbd_request *req =
2225                                 container_of(i, struct drbd_request, i);
2226
2227                         if (!equal)
2228                                 drbd_alert(device, "Concurrent writes detected: "
2229                                                "local=%llus +%u, remote=%llus +%u\n",
2230                                           (unsigned long long)i->sector, i->size,
2231                                           (unsigned long long)sector, size);
2232
2233                         if (req->rq_state & RQ_LOCAL_PENDING ||
2234                             !(req->rq_state & RQ_POSTPONED)) {
2235                                 /*
2236                                  * Wait for the node with the discard flag to
2237                                  * decide if this request has been superseded
2238                                  * or needs to be retried.
2239                                  * Requests that have been superseded will
2240                                  * disappear from the write_requests tree.
2241                                  *
2242                                  * In addition, wait for the conflicting
2243                                  * request to finish locally before submitting
2244                                  * the conflicting peer request.
2245                                  */
2246                                 err = drbd_wait_misc(device, &req->i);
2247                                 if (err) {
2248                                         _conn_request_state(connection, NS(conn, C_TIMEOUT), CS_HARD);
2249                                         fail_postponed_requests(device, sector, size);
2250                                         goto out;
2251                                 }
2252                                 goto repeat;
2253                         }
2254                         /*
2255                          * Remember to restart the conflicting requests after
2256                          * the new peer request has completed.
2257                          */
2258                         peer_req->flags |= EE_RESTART_REQUESTS;
2259                 }
2260         }
2261         err = 0;
2262
2263     out:
2264         if (err)
2265                 drbd_remove_epoch_entry_interval(device, peer_req);
2266         return err;
2267 }
2268
2269 /* mirrored write */
2270 static int receive_Data(struct drbd_connection *connection, struct packet_info *pi)
2271 {
2272         struct drbd_peer_device *peer_device;
2273         struct drbd_device *device;
2274         struct net_conf *nc;
2275         sector_t sector;
2276         struct drbd_peer_request *peer_req;
2277         struct p_data *p = pi->data;
2278         u32 peer_seq = be32_to_cpu(p->seq_num);
2279         int rw = WRITE;
2280         u32 dp_flags;
2281         int err, tp;
2282
2283         peer_device = conn_peer_device(connection, pi->vnr);
2284         if (!peer_device)
2285                 return -EIO;
2286         device = peer_device->device;
2287
2288         if (!get_ldev(device)) {
2289                 int err2;
2290
2291                 err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2292                 drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
2293                 atomic_inc(&connection->current_epoch->epoch_size);
2294                 err2 = drbd_drain_block(peer_device, pi->size);
2295                 if (!err)
2296                         err = err2;
2297                 return err;
2298         }
2299
2300         /*
2301          * Corresponding put_ldev done either below (on various errors), or in
2302          * drbd_peer_request_endio, if we successfully submit the data at the
2303          * end of this function.
2304          */
2305
2306         sector = be64_to_cpu(p->sector);
2307         peer_req = read_in_block(peer_device, p->block_id, sector, pi);
2308         if (!peer_req) {
2309                 put_ldev(device);
2310                 return -EIO;
2311         }
2312
2313         peer_req->w.cb = e_end_block;
2314         peer_req->submit_jif = jiffies;
2315         peer_req->flags |= EE_APPLICATION;
2316
2317         dp_flags = be32_to_cpu(p->dp_flags);
2318         rw |= wire_flags_to_bio(dp_flags);
2319         if (pi->cmd == P_TRIM) {
2320                 struct request_queue *q = bdev_get_queue(device->ldev->backing_bdev);
2321                 peer_req->flags |= EE_IS_TRIM;
2322                 if (!blk_queue_discard(q))
2323                         peer_req->flags |= EE_IS_TRIM_USE_ZEROOUT;
2324                 D_ASSERT(peer_device, peer_req->i.size > 0);
2325                 D_ASSERT(peer_device, rw & REQ_DISCARD);
2326                 D_ASSERT(peer_device, peer_req->pages == NULL);
2327         } else if (peer_req->pages == NULL) {
2328                 D_ASSERT(device, peer_req->i.size == 0);
2329                 D_ASSERT(device, dp_flags & DP_FLUSH);
2330         }
2331
2332         if (dp_flags & DP_MAY_SET_IN_SYNC)
2333                 peer_req->flags |= EE_MAY_SET_IN_SYNC;
2334
2335         spin_lock(&connection->epoch_lock);
2336         peer_req->epoch = connection->current_epoch;
2337         atomic_inc(&peer_req->epoch->epoch_size);
2338         atomic_inc(&peer_req->epoch->active);
2339         spin_unlock(&connection->epoch_lock);
2340
2341         rcu_read_lock();
2342         nc = rcu_dereference(peer_device->connection->net_conf);
2343         tp = nc->two_primaries;
2344         if (peer_device->connection->agreed_pro_version < 100) {
2345                 switch (nc->wire_protocol) {
2346                 case DRBD_PROT_C:
2347                         dp_flags |= DP_SEND_WRITE_ACK;
2348                         break;
2349                 case DRBD_PROT_B:
2350                         dp_flags |= DP_SEND_RECEIVE_ACK;
2351                         break;
2352                 }
2353         }
2354         rcu_read_unlock();
2355
2356         if (dp_flags & DP_SEND_WRITE_ACK) {
2357                 peer_req->flags |= EE_SEND_WRITE_ACK;
2358                 inc_unacked(device);
2359                 /* corresponding dec_unacked() in e_end_block()
2360                  * respective _drbd_clear_done_ee */
2361         }
2362
2363         if (dp_flags & DP_SEND_RECEIVE_ACK) {
2364                 /* I really don't like it that the receiver thread
2365                  * sends on the msock, but anyways */
2366                 drbd_send_ack(first_peer_device(device), P_RECV_ACK, peer_req);
2367         }
2368
2369         if (tp) {
2370                 /* two primaries implies protocol C */
2371                 D_ASSERT(device, dp_flags & DP_SEND_WRITE_ACK);
2372                 peer_req->flags |= EE_IN_INTERVAL_TREE;
2373                 err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2374                 if (err)
2375                         goto out_interrupted;
2376                 spin_lock_irq(&device->resource->req_lock);
2377                 err = handle_write_conflicts(device, peer_req);
2378                 if (err) {
2379                         spin_unlock_irq(&device->resource->req_lock);
2380                         if (err == -ENOENT) {
2381                                 put_ldev(device);
2382                                 return 0;
2383                         }
2384                         goto out_interrupted;
2385                 }
2386         } else {
2387                 update_peer_seq(peer_device, peer_seq);
2388                 spin_lock_irq(&device->resource->req_lock);
2389         }
2390         /* if we use the zeroout fallback code, we process synchronously
2391          * and we wait for all pending requests, respectively wait for
2392          * active_ee to become empty in drbd_submit_peer_request();
2393          * better not add ourselves here. */
2394         if ((peer_req->flags & EE_IS_TRIM_USE_ZEROOUT) == 0)
2395                 list_add_tail(&peer_req->w.list, &device->active_ee);
2396         spin_unlock_irq(&device->resource->req_lock);
2397
2398         if (device->state.conn == C_SYNC_TARGET)
2399                 wait_event(device->ee_wait, !overlapping_resync_write(device, peer_req));
2400
2401         if (device->state.pdsk < D_INCONSISTENT) {
2402                 /* In case we have the only disk of the cluster, */
2403                 drbd_set_out_of_sync(device, peer_req->i.sector, peer_req->i.size);
2404                 peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2405                 drbd_al_begin_io(device, &peer_req->i);
2406                 peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2407         }
2408
2409         err = drbd_submit_peer_request(device, peer_req, rw, DRBD_FAULT_DT_WR);
2410         if (!err)
2411                 return 0;
2412
2413         /* don't care for the reason here */
2414         drbd_err(device, "submit failed, triggering re-connect\n");
2415         spin_lock_irq(&device->resource->req_lock);
2416         list_del(&peer_req->w.list);
2417         drbd_remove_epoch_entry_interval(device, peer_req);
2418         spin_unlock_irq(&device->resource->req_lock);
2419         if (peer_req->flags & EE_CALL_AL_COMPLETE_IO) {
2420                 peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
2421                 drbd_al_complete_io(device, &peer_req->i);
2422         }
2423
2424 out_interrupted:
2425         drbd_may_finish_epoch(connection, peer_req->epoch, EV_PUT + EV_CLEANUP);
2426         put_ldev(device);
2427         drbd_free_peer_req(device, peer_req);
2428         return err;
2429 }
2430
2431 /* We may throttle resync, if the lower device seems to be busy,
2432  * and current sync rate is above c_min_rate.
2433  *
2434  * To decide whether or not the lower device is busy, we use a scheme similar
2435  * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2436  * (more than 64 sectors) of activity we cannot account for with our own resync
2437  * activity, it obviously is "busy".
2438  *
2439  * The current sync rate used here uses only the most recent two step marks,
2440  * to have a short time average so we can react faster.
2441  */
2442 bool drbd_rs_should_slow_down(struct drbd_device *device, sector_t sector,
2443                 bool throttle_if_app_is_waiting)
2444 {
2445         struct lc_element *tmp;
2446         bool throttle = drbd_rs_c_min_rate_throttle(device);
2447
2448         if (!throttle || throttle_if_app_is_waiting)
2449                 return throttle;
2450
2451         spin_lock_irq(&device->al_lock);
2452         tmp = lc_find(device->resync, BM_SECT_TO_EXT(sector));
2453         if (tmp) {
2454                 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2455                 if (test_bit(BME_PRIORITY, &bm_ext->flags))
2456                         throttle = false;
2457                 /* Do not slow down if app IO is already waiting for this extent,
2458                  * and our progress is necessary for application IO to complete. */
2459         }
2460         spin_unlock_irq(&device->al_lock);
2461
2462         return throttle;
2463 }
2464
2465 bool drbd_rs_c_min_rate_throttle(struct drbd_device *device)
2466 {
2467         struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk;
2468         unsigned long db, dt, dbdt;
2469         unsigned int c_min_rate;
2470         int curr_events;
2471
2472         rcu_read_lock();
2473         c_min_rate = rcu_dereference(device->ldev->disk_conf)->c_min_rate;
2474         rcu_read_unlock();
2475
2476         /* feature disabled? */
2477         if (c_min_rate == 0)
2478                 return false;
2479
2480         curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2481                       (int)part_stat_read(&disk->part0, sectors[1]) -
2482                         atomic_read(&device->rs_sect_ev);
2483
2484         if (atomic_read(&device->ap_actlog_cnt)
2485             || curr_events - device->rs_last_events > 64) {
2486                 unsigned long rs_left;
2487                 int i;
2488
2489                 device->rs_last_events = curr_events;
2490
2491                 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2492                  * approx. */
2493                 i = (device->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2494
2495                 if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
2496                         rs_left = device->ov_left;
2497                 else
2498                         rs_left = drbd_bm_total_weight(device) - device->rs_failed;
2499
2500                 dt = ((long)jiffies - (long)device->rs_mark_time[i]) / HZ;
2501                 if (!dt)
2502                         dt++;
2503                 db = device->rs_mark_left[i] - rs_left;
2504                 dbdt = Bit2KB(db/dt);
2505
2506                 if (dbdt > c_min_rate)
2507                         return true;
2508         }
2509         return false;
2510 }
2511
2512 static int receive_DataRequest(struct drbd_connection *connection, struct packet_info *pi)
2513 {
2514         struct drbd_peer_device *peer_device;
2515         struct drbd_device *device;
2516         sector_t sector;
2517         sector_t capacity;
2518         struct drbd_peer_request *peer_req;
2519         struct digest_info *di = NULL;
2520         int size, verb;
2521         unsigned int fault_type;
2522         struct p_block_req *p = pi->data;
2523
2524         peer_device = conn_peer_device(connection, pi->vnr);
2525         if (!peer_device)
2526                 return -EIO;
2527         device = peer_device->device;
2528         capacity = drbd_get_capacity(device->this_bdev);
2529
2530         sector = be64_to_cpu(p->sector);
2531         size   = be32_to_cpu(p->blksize);
2532
2533         if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2534                 drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2535                                 (unsigned long long)sector, size);
2536                 return -EINVAL;
2537         }
2538         if (sector + (size>>9) > capacity) {
2539                 drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2540                                 (unsigned long long)sector, size);
2541                 return -EINVAL;
2542         }
2543
2544         if (!get_ldev_if_state(device, D_UP_TO_DATE)) {
2545                 verb = 1;
2546                 switch (pi->cmd) {
2547                 case P_DATA_REQUEST:
2548                         drbd_send_ack_rp(peer_device, P_NEG_DREPLY, p);
2549                         break;
2550                 case P_RS_DATA_REQUEST:
2551                 case P_CSUM_RS_REQUEST:
2552                 case P_OV_REQUEST:
2553                         drbd_send_ack_rp(peer_device, P_NEG_RS_DREPLY , p);
2554                         break;
2555                 case P_OV_REPLY:
2556                         verb = 0;
2557                         dec_rs_pending(device);
2558                         drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size, ID_IN_SYNC);
2559                         break;
2560                 default:
2561                         BUG();
2562                 }
2563                 if (verb && __ratelimit(&drbd_ratelimit_state))
2564                         drbd_err(device, "Can not satisfy peer's read request, "
2565                             "no local data.\n");
2566
2567                 /* drain possibly payload */
2568                 return drbd_drain_block(peer_device, pi->size);
2569         }
2570
2571         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2572          * "criss-cross" setup, that might cause write-out on some other DRBD,
2573          * which in turn might block on the other node at this very place.  */
2574         peer_req = drbd_alloc_peer_req(peer_device, p->block_id, sector, size,
2575                         true /* has real payload */, GFP_NOIO);
2576         if (!peer_req) {
2577                 put_ldev(device);
2578                 return -ENOMEM;
2579         }
2580
2581         switch (pi->cmd) {
2582         case P_DATA_REQUEST:
2583                 peer_req->w.cb = w_e_end_data_req;
2584                 fault_type = DRBD_FAULT_DT_RD;
2585                 /* application IO, don't drbd_rs_begin_io */
2586                 peer_req->flags |= EE_APPLICATION;
2587                 goto submit;
2588
2589         case P_RS_DATA_REQUEST:
2590                 peer_req->w.cb = w_e_end_rsdata_req;
2591                 fault_type = DRBD_FAULT_RS_RD;
2592                 /* used in the sector offset progress display */
2593                 device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2594                 break;
2595
2596         case P_OV_REPLY:
2597         case P_CSUM_RS_REQUEST:
2598                 fault_type = DRBD_FAULT_RS_RD;
2599                 di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2600                 if (!di)
2601                         goto out_free_e;
2602
2603                 di->digest_size = pi->size;
2604                 di->digest = (((char *)di)+sizeof(struct digest_info));
2605
2606                 peer_req->digest = di;
2607                 peer_req->flags |= EE_HAS_DIGEST;
2608
2609                 if (drbd_recv_all(peer_device->connection, di->digest, pi->size))
2610                         goto out_free_e;
2611
2612                 if (pi->cmd == P_CSUM_RS_REQUEST) {
2613                         D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
2614                         peer_req->w.cb = w_e_end_csum_rs_req;
2615                         /* used in the sector offset progress display */
2616                         device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2617                         /* remember to report stats in drbd_resync_finished */
2618                         device->use_csums = true;
2619                 } else if (pi->cmd == P_OV_REPLY) {
2620                         /* track progress, we may need to throttle */
2621                         atomic_add(size >> 9, &device->rs_sect_in);
2622                         peer_req->w.cb = w_e_end_ov_reply;
2623                         dec_rs_pending(device);
2624                         /* drbd_rs_begin_io done when we sent this request,
2625                          * but accounting still needs to be done. */
2626                         goto submit_for_resync;
2627                 }
2628                 break;
2629
2630         case P_OV_REQUEST:
2631                 if (device->ov_start_sector == ~(sector_t)0 &&
2632                     peer_device->connection->agreed_pro_version >= 90) {
2633                         unsigned long now = jiffies;
2634                         int i;
2635                         device->ov_start_sector = sector;
2636                         device->ov_position = sector;
2637                         device->ov_left = drbd_bm_bits(device) - BM_SECT_TO_BIT(sector);
2638                         device->rs_total = device->ov_left;
2639                         for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2640                                 device->rs_mark_left[i] = device->ov_left;
2641                                 device->rs_mark_time[i] = now;
2642                         }
2643                         drbd_info(device, "Online Verify start sector: %llu\n",
2644                                         (unsigned long long)sector);
2645                 }
2646                 peer_req->w.cb = w_e_end_ov_req;
2647                 fault_type = DRBD_FAULT_RS_RD;
2648                 break;
2649
2650         default:
2651                 BUG();
2652         }
2653
2654         /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2655          * wrt the receiver, but it is not as straightforward as it may seem.
2656          * Various places in the resync start and stop logic assume resync
2657          * requests are processed in order, requeuing this on the worker thread
2658          * introduces a bunch of new code for synchronization between threads.
2659          *
2660          * Unlimited throttling before drbd_rs_begin_io may stall the resync
2661          * "forever", throttling after drbd_rs_begin_io will lock that extent
2662          * for application writes for the same time.  For now, just throttle
2663          * here, where the rest of the code expects the receiver to sleep for
2664          * a while, anyways.
2665          */
2666
2667         /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2668          * this defers syncer requests for some time, before letting at least
2669          * on request through.  The resync controller on the receiving side
2670          * will adapt to the incoming rate accordingly.
2671          *
2672          * We cannot throttle here if remote is Primary/SyncTarget:
2673          * we would also throttle its application reads.
2674          * In that case, throttling is done on the SyncTarget only.
2675          */
2676
2677         /* Even though this may be a resync request, we do add to "read_ee";
2678          * "sync_ee" is only used for resync WRITEs.
2679          * Add to list early, so debugfs can find this request
2680          * even if we have to sleep below. */
2681         spin_lock_irq(&device->resource->req_lock);
2682         list_add_tail(&peer_req->w.list, &device->read_ee);
2683         spin_unlock_irq(&device->resource->req_lock);
2684
2685         update_receiver_timing_details(connection, drbd_rs_should_slow_down);
2686         if (device->state.peer != R_PRIMARY
2687         && drbd_rs_should_slow_down(device, sector, false))
2688                 schedule_timeout_uninterruptible(HZ/10);
2689         update_receiver_timing_details(connection, drbd_rs_begin_io);
2690         if (drbd_rs_begin_io(device, sector))
2691                 goto out_free_e;
2692
2693 submit_for_resync:
2694         atomic_add(size >> 9, &device->rs_sect_ev);
2695
2696 submit:
2697         update_receiver_timing_details(connection, drbd_submit_peer_request);
2698         inc_unacked(device);
2699         if (drbd_submit_peer_request(device, peer_req, READ, fault_type) == 0)
2700                 return 0;
2701
2702         /* don't care for the reason here */
2703         drbd_err(device, "submit failed, triggering re-connect\n");
2704
2705 out_free_e:
2706         spin_lock_irq(&device->resource->req_lock);
2707         list_del(&peer_req->w.list);
2708         spin_unlock_irq(&device->resource->req_lock);
2709         /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2710
2711         put_ldev(device);
2712         drbd_free_peer_req(device, peer_req);
2713         return -EIO;
2714 }
2715
2716 /**
2717  * drbd_asb_recover_0p  -  Recover after split-brain with no remaining primaries
2718  */
2719 static int drbd_asb_recover_0p(struct drbd_peer_device *peer_device) __must_hold(local)
2720 {
2721         struct drbd_device *device = peer_device->device;
2722         int self, peer, rv = -100;
2723         unsigned long ch_self, ch_peer;
2724         enum drbd_after_sb_p after_sb_0p;
2725
2726         self = device->ldev->md.uuid[UI_BITMAP] & 1;
2727         peer = device->p_uuid[UI_BITMAP] & 1;
2728
2729         ch_peer = device->p_uuid[UI_SIZE];
2730         ch_self = device->comm_bm_set;
2731
2732         rcu_read_lock();
2733         after_sb_0p = rcu_dereference(peer_device->connection->net_conf)->after_sb_0p;
2734         rcu_read_unlock();
2735         switch (after_sb_0p) {
2736         case ASB_CONSENSUS:
2737         case ASB_DISCARD_SECONDARY:
2738         case ASB_CALL_HELPER:
2739         case ASB_VIOLENTLY:
2740                 drbd_err(device, "Configuration error.\n");
2741                 break;
2742         case ASB_DISCONNECT:
2743                 break;
2744         case ASB_DISCARD_YOUNGER_PRI:
2745                 if (self == 0 && peer == 1) {
2746                         rv = -1;
2747                         break;
2748                 }
2749                 if (self == 1 && peer == 0) {
2750                         rv =  1;
2751                         break;
2752                 }
2753                 /* Else fall through to one of the other strategies... */
2754         case ASB_DISCARD_OLDER_PRI:
2755                 if (self == 0 && peer == 1) {
2756                         rv = 1;
2757                         break;
2758                 }
2759                 if (self == 1 && peer == 0) {
2760                         rv = -1;
2761                         break;
2762                 }
2763                 /* Else fall through to one of the other strategies... */
2764                 drbd_warn(device, "Discard younger/older primary did not find a decision\n"
2765                      "Using discard-least-changes instead\n");
2766         case ASB_DISCARD_ZERO_CHG:
2767                 if (ch_peer == 0 && ch_self == 0) {
2768                         rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
2769                                 ? -1 : 1;
2770                         break;
2771                 } else {
2772                         if (ch_peer == 0) { rv =  1; break; }
2773                         if (ch_self == 0) { rv = -1; break; }
2774                 }
2775                 if (after_sb_0p == ASB_DISCARD_ZERO_CHG)
2776                         break;
2777         case ASB_DISCARD_LEAST_CHG:
2778                 if      (ch_self < ch_peer)
2779                         rv = -1;
2780                 else if (ch_self > ch_peer)
2781                         rv =  1;
2782                 else /* ( ch_self == ch_peer ) */
2783                      /* Well, then use something else. */
2784                         rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
2785                                 ? -1 : 1;
2786                 break;
2787         case ASB_DISCARD_LOCAL:
2788                 rv = -1;
2789                 break;
2790         case ASB_DISCARD_REMOTE:
2791                 rv =  1;
2792         }
2793
2794         return rv;
2795 }
2796
2797 /**
2798  * drbd_asb_recover_1p  -  Recover after split-brain with one remaining primary
2799  */
2800 static int drbd_asb_recover_1p(struct drbd_peer_device *peer_device) __must_hold(local)
2801 {
2802         struct drbd_device *device = peer_device->device;
2803         int hg, rv = -100;
2804         enum drbd_after_sb_p after_sb_1p;
2805
2806         rcu_read_lock();
2807         after_sb_1p = rcu_dereference(peer_device->connection->net_conf)->after_sb_1p;
2808         rcu_read_unlock();
2809         switch (after_sb_1p) {
2810         case ASB_DISCARD_YOUNGER_PRI:
2811         case ASB_DISCARD_OLDER_PRI:
2812         case ASB_DISCARD_LEAST_CHG:
2813         case ASB_DISCARD_LOCAL:
2814         case ASB_DISCARD_REMOTE:
2815         case ASB_DISCARD_ZERO_CHG:
2816                 drbd_err(device, "Configuration error.\n");
2817                 break;
2818         case ASB_DISCONNECT:
2819                 break;
2820         case ASB_CONSENSUS:
2821                 hg = drbd_asb_recover_0p(peer_device);
2822                 if (hg == -1 && device->state.role == R_SECONDARY)
2823                         rv = hg;
2824                 if (hg == 1  && device->state.role == R_PRIMARY)
2825                         rv = hg;
2826                 break;
2827         case ASB_VIOLENTLY:
2828                 rv = drbd_asb_recover_0p(peer_device);
2829                 break;
2830         case ASB_DISCARD_SECONDARY:
2831                 return device->state.role == R_PRIMARY ? 1 : -1;
2832         case ASB_CALL_HELPER:
2833                 hg = drbd_asb_recover_0p(peer_device);
2834                 if (hg == -1 && device->state.role == R_PRIMARY) {
2835                         enum drbd_state_rv rv2;
2836
2837                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2838                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2839                           * we do not need to wait for the after state change work either. */
2840                         rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
2841                         if (rv2 != SS_SUCCESS) {
2842                                 drbd_khelper(device, "pri-lost-after-sb");
2843                         } else {
2844                                 drbd_warn(device, "Successfully gave up primary role.\n");
2845                                 rv = hg;
2846                         }
2847                 } else
2848                         rv = hg;
2849         }
2850
2851         return rv;
2852 }
2853
2854 /**
2855  * drbd_asb_recover_2p  -  Recover after split-brain with two remaining primaries
2856  */
2857 static int drbd_asb_recover_2p(struct drbd_peer_device *peer_device) __must_hold(local)
2858 {
2859         struct drbd_device *device = peer_device->device;
2860         int hg, rv = -100;
2861         enum drbd_after_sb_p after_sb_2p;
2862
2863         rcu_read_lock();
2864         after_sb_2p = rcu_dereference(peer_device->connection->net_conf)->after_sb_2p;
2865         rcu_read_unlock();
2866         switch (after_sb_2p) {
2867         case ASB_DISCARD_YOUNGER_PRI:
2868         case ASB_DISCARD_OLDER_PRI:
2869         case ASB_DISCARD_LEAST_CHG:
2870         case ASB_DISCARD_LOCAL:
2871         case ASB_DISCARD_REMOTE:
2872         case ASB_CONSENSUS:
2873         case ASB_DISCARD_SECONDARY:
2874         case ASB_DISCARD_ZERO_CHG:
2875                 drbd_err(device, "Configuration error.\n");
2876                 break;
2877         case ASB_VIOLENTLY:
2878                 rv = drbd_asb_recover_0p(peer_device);
2879                 break;
2880         case ASB_DISCONNECT:
2881                 break;
2882         case ASB_CALL_HELPER:
2883                 hg = drbd_asb_recover_0p(peer_device);
2884                 if (hg == -1) {
2885                         enum drbd_state_rv rv2;
2886
2887                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2888                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2889                           * we do not need to wait for the after state change work either. */
2890                         rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
2891                         if (rv2 != SS_SUCCESS) {
2892                                 drbd_khelper(device, "pri-lost-after-sb");
2893                         } else {
2894                                 drbd_warn(device, "Successfully gave up primary role.\n");
2895                                 rv = hg;
2896                         }
2897                 } else
2898                         rv = hg;
2899         }
2900
2901         return rv;
2902 }
2903
2904 static void drbd_uuid_dump(struct drbd_device *device, char *text, u64 *uuid,
2905                            u64 bits, u64 flags)
2906 {
2907         if (!uuid) {
2908                 drbd_info(device, "%s uuid info vanished while I was looking!\n", text);
2909                 return;
2910         }
2911         drbd_info(device, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2912              text,
2913              (unsigned long long)uuid[UI_CURRENT],
2914              (unsigned long long)uuid[UI_BITMAP],
2915              (unsigned long long)uuid[UI_HISTORY_START],
2916              (unsigned long long)uuid[UI_HISTORY_END],
2917              (unsigned long long)bits,
2918              (unsigned long long)flags);
2919 }
2920
2921 /*
2922   100   after split brain try auto recover
2923     2   C_SYNC_SOURCE set BitMap
2924     1   C_SYNC_SOURCE use BitMap
2925     0   no Sync
2926    -1   C_SYNC_TARGET use BitMap
2927    -2   C_SYNC_TARGET set BitMap
2928  -100   after split brain, disconnect
2929 -1000   unrelated data
2930 -1091   requires proto 91
2931 -1096   requires proto 96
2932  */
2933 static int drbd_uuid_compare(struct drbd_device *const device, int *rule_nr) __must_hold(local)
2934 {
2935         struct drbd_peer_device *const peer_device = first_peer_device(device);
2936         struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
2937         u64 self, peer;
2938         int i, j;
2939
2940         self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2941         peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
2942
2943         *rule_nr = 10;
2944         if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2945                 return 0;
2946
2947         *rule_nr = 20;
2948         if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2949              peer != UUID_JUST_CREATED)
2950                 return -2;
2951
2952         *rule_nr = 30;
2953         if (self != UUID_JUST_CREATED &&
2954             (peer == UUID_JUST_CREATED || peer == (u64)0))
2955                 return 2;
2956
2957         if (self == peer) {
2958                 int rct, dc; /* roles at crash time */
2959
2960                 if (device->p_uuid[UI_BITMAP] == (u64)0 && device->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2961
2962                         if (connection->agreed_pro_version < 91)
2963                                 return -1091;
2964
2965                         if ((device->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2966                             (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2967                                 drbd_info(device, "was SyncSource, missed the resync finished event, corrected myself:\n");
2968                                 drbd_uuid_move_history(device);
2969                                 device->ldev->md.uuid[UI_HISTORY_START] = device->ldev->md.uuid[UI_BITMAP];
2970                                 device->ldev->md.uuid[UI_BITMAP] = 0;
2971
2972                                 drbd_uuid_dump(device, "self", device->ldev->md.uuid,
2973                                                device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
2974                                 *rule_nr = 34;
2975                         } else {
2976                                 drbd_info(device, "was SyncSource (peer failed to write sync_uuid)\n");
2977                                 *rule_nr = 36;
2978                         }
2979
2980                         return 1;
2981                 }
2982
2983                 if (device->ldev->md.uuid[UI_BITMAP] == (u64)0 && device->p_uuid[UI_BITMAP] != (u64)0) {
2984
2985                         if (connection->agreed_pro_version < 91)
2986                                 return -1091;
2987
2988                         if ((device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2989                             (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2990                                 drbd_info(device, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2991
2992                                 device->p_uuid[UI_HISTORY_START + 1] = device->p_uuid[UI_HISTORY_START];
2993                                 device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_BITMAP];
2994                                 device->p_uuid[UI_BITMAP] = 0UL;
2995
2996                                 drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
2997                                 *rule_nr = 35;
2998                         } else {
2999                                 drbd_info(device, "was SyncTarget (failed to write sync_uuid)\n");
3000                                 *rule_nr = 37;
3001                         }
3002
3003                         return -1;
3004                 }
3005
3006                 /* Common power [off|failure] */
3007                 rct = (test_bit(CRASHED_PRIMARY, &device->flags) ? 1 : 0) +
3008                         (device->p_uuid[UI_FLAGS] & 2);
3009                 /* lowest bit is set when we were primary,
3010                  * next bit (weight 2) is set when peer was primary */
3011                 *rule_nr = 40;
3012
3013                 switch (rct) {
3014                 case 0: /* !self_pri && !peer_pri */ return 0;
3015                 case 1: /*  self_pri && !peer_pri */ return 1;
3016                 case 2: /* !self_pri &&  peer_pri */ return -1;
3017                 case 3: /*  self_pri &&  peer_pri */
3018                         dc = test_bit(RESOLVE_CONFLICTS, &connection->flags);
3019                         return dc ? -1 : 1;
3020                 }
3021         }
3022
3023         *rule_nr = 50;
3024         peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
3025         if (self == peer)
3026                 return -1;
3027
3028         *rule_nr = 51;
3029         peer = device->p_uuid[UI_HISTORY_START] & ~((u64)1);
3030         if (self == peer) {
3031                 if (connection->agreed_pro_version < 96 ?
3032                     (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
3033                     (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
3034                     peer + UUID_NEW_BM_OFFSET == (device->p_uuid[UI_BITMAP] & ~((u64)1))) {
3035                         /* The last P_SYNC_UUID did not get though. Undo the last start of
3036                            resync as sync source modifications of the peer's UUIDs. */
3037
3038                         if (connection->agreed_pro_version < 91)
3039                                 return -1091;
3040
3041                         device->p_uuid[UI_BITMAP] = device->p_uuid[UI_HISTORY_START];
3042                         device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_HISTORY_START + 1];
3043
3044                         drbd_info(device, "Lost last syncUUID packet, corrected:\n");
3045                         drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3046
3047                         return -1;
3048                 }
3049         }
3050
3051         *rule_nr = 60;
3052         self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
3053         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3054                 peer = device->p_uuid[i] & ~((u64)1);
3055                 if (self == peer)
3056                         return -2;
3057         }
3058
3059         *rule_nr = 70;
3060         self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3061         peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3062         if (self == peer)
3063                 return 1;
3064
3065         *rule_nr = 71;
3066         self = device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
3067         if (self == peer) {
3068                 if (connection->agreed_pro_version < 96 ?
3069                     (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
3070                     (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
3071                     self + UUID_NEW_BM_OFFSET == (device->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
3072                         /* The last P_SYNC_UUID did not get though. Undo the last start of
3073                            resync as sync source modifications of our UUIDs. */
3074
3075                         if (connection->agreed_pro_version < 91)
3076                                 return -1091;
3077
3078                         __drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_HISTORY_START]);
3079                         __drbd_uuid_set(device, UI_HISTORY_START, device->ldev->md.uuid[UI_HISTORY_START + 1]);
3080
3081                         drbd_info(device, "Last syncUUID did not get through, corrected:\n");
3082                         drbd_uuid_dump(device, "self", device->ldev->md.uuid,
3083                                        device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
3084
3085                         return 1;
3086                 }
3087         }
3088
3089
3090         *rule_nr = 80;
3091         peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3092         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3093                 self = device->ldev->md.uuid[i] & ~((u64)1);
3094                 if (self == peer)
3095                         return 2;
3096         }
3097
3098         *rule_nr = 90;
3099         self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3100         peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
3101         if (self == peer && self != ((u64)0))
3102                 return 100;
3103
3104         *rule_nr = 100;
3105         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3106                 self = device->ldev->md.uuid[i] & ~((u64)1);
3107                 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
3108                         peer = device->p_uuid[j] & ~((u64)1);
3109                         if (self == peer)
3110                                 return -100;
3111                 }
3112         }
3113
3114         return -1000;
3115 }
3116
3117 /* drbd_sync_handshake() returns the new conn state on success, or
3118    CONN_MASK (-1) on failure.
3119  */
3120 static enum drbd_conns drbd_sync_handshake(struct drbd_peer_device *peer_device,
3121                                            enum drbd_role peer_role,
3122                                            enum drbd_disk_state peer_disk) __must_hold(local)
3123 {
3124         struct drbd_device *device = peer_device->device;
3125         enum drbd_conns rv = C_MASK;
3126         enum drbd_disk_state mydisk;
3127         struct net_conf *nc;
3128         int hg, rule_nr, rr_conflict, tentative;
3129
3130         mydisk = device->state.disk;
3131         if (mydisk == D_NEGOTIATING)
3132                 mydisk = device->new_state_tmp.disk;
3133
3134         drbd_info(device, "drbd_sync_handshake:\n");
3135
3136         spin_lock_irq(&device->ldev->md.uuid_lock);
3137         drbd_uuid_dump(device, "self", device->ldev->md.uuid, device->comm_bm_set, 0);
3138         drbd_uuid_dump(device, "peer", device->p_uuid,
3139                        device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3140
3141         hg = drbd_uuid_compare(device, &rule_nr);
3142         spin_unlock_irq(&device->ldev->md.uuid_lock);
3143
3144         drbd_info(device, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
3145
3146         if (hg == -1000) {
3147                 drbd_alert(device, "Unrelated data, aborting!\n");
3148                 return C_MASK;
3149         }
3150         if (hg < -1000) {
3151                 drbd_alert(device, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
3152                 return C_MASK;
3153         }
3154
3155         if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
3156             (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
3157                 int f = (hg == -100) || abs(hg) == 2;
3158                 hg = mydisk > D_INCONSISTENT ? 1 : -1;
3159                 if (f)
3160                         hg = hg*2;
3161                 drbd_info(device, "Becoming sync %s due to disk states.\n",
3162                      hg > 0 ? "source" : "target");
3163         }
3164
3165         if (abs(hg) == 100)
3166                 drbd_khelper(device, "initial-split-brain");
3167
3168         rcu_read_lock();
3169         nc = rcu_dereference(peer_device->connection->net_conf);
3170
3171         if (hg == 100 || (hg == -100 && nc->always_asbp)) {
3172                 int pcount = (device->state.role == R_PRIMARY)
3173                            + (peer_role == R_PRIMARY);
3174                 int forced = (hg == -100);
3175
3176                 switch (pcount) {
3177                 case 0:
3178                         hg = drbd_asb_recover_0p(peer_device);
3179                         break;
3180                 case 1:
3181                         hg = drbd_asb_recover_1p(peer_device);
3182                         break;
3183                 case 2:
3184                         hg = drbd_asb_recover_2p(peer_device);
3185                         break;
3186                 }
3187                 if (abs(hg) < 100) {
3188                         drbd_warn(device, "Split-Brain detected, %d primaries, "
3189                              "automatically solved. Sync from %s node\n",
3190                              pcount, (hg < 0) ? "peer" : "this");
3191                         if (forced) {
3192                                 drbd_warn(device, "Doing a full sync, since"
3193                                      " UUIDs where ambiguous.\n");
3194                                 hg = hg*2;
3195                         }
3196                 }
3197         }
3198
3199         if (hg == -100) {
3200                 if (test_bit(DISCARD_MY_DATA, &device->flags) && !(device->p_uuid[UI_FLAGS]&1))
3201                         hg = -1;
3202                 if (!test_bit(DISCARD_MY_DATA, &device->flags) && (device->p_uuid[UI_FLAGS]&1))
3203                         hg = 1;
3204
3205                 if (abs(hg) < 100)
3206                         drbd_warn(device, "Split-Brain detected, manually solved. "
3207                              "Sync from %s node\n",
3208                              (hg < 0) ? "peer" : "this");
3209         }
3210         rr_conflict = nc->rr_conflict;
3211         tentative = nc->tentative;
3212         rcu_read_unlock();
3213
3214         if (hg == -100) {
3215                 /* FIXME this log message is not correct if we end up here
3216                  * after an attempted attach on a diskless node.
3217                  * We just refuse to attach -- well, we drop the "connection"
3218                  * to that disk, in a way... */
3219                 drbd_alert(device, "Split-Brain detected but unresolved, dropping connection!\n");
3220                 drbd_khelper(device, "split-brain");
3221                 return C_MASK;
3222         }
3223
3224         if (hg > 0 && mydisk <= D_INCONSISTENT) {
3225                 drbd_err(device, "I shall become SyncSource, but I am inconsistent!\n");
3226                 return C_MASK;
3227         }
3228
3229         if (hg < 0 && /* by intention we do not use mydisk here. */
3230             device->state.role == R_PRIMARY && device->state.disk >= D_CONSISTENT) {
3231                 switch (rr_conflict) {
3232                 case ASB_CALL_HELPER:
3233                         drbd_khelper(device, "pri-lost");
3234                         /* fall through */
3235                 case ASB_DISCONNECT:
3236                         drbd_err(device, "I shall become SyncTarget, but I am primary!\n");
3237                         return C_MASK;
3238                 case ASB_VIOLENTLY:
3239                         drbd_warn(device, "Becoming SyncTarget, violating the stable-data"
3240                              "assumption\n");
3241                 }
3242         }
3243
3244         if (tentative || test_bit(CONN_DRY_RUN, &peer_device->connection->flags)) {
3245                 if (hg == 0)
3246                         drbd_info(device, "dry-run connect: No resync, would become Connected immediately.\n");
3247                 else
3248                         drbd_info(device, "dry-run connect: Would become %s, doing a %s resync.",
3249                                  drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
3250                                  abs(hg) >= 2 ? "full" : "bit-map based");
3251                 return C_MASK;
3252         }
3253
3254         if (abs(hg) >= 2) {
3255                 drbd_info(device, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
3256                 if (drbd_bitmap_io(device, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
3257                                         BM_LOCKED_SET_ALLOWED))
3258                         return C_MASK;
3259         }
3260
3261         if (hg > 0) { /* become sync source. */
3262                 rv = C_WF_BITMAP_S;
3263         } else if (hg < 0) { /* become sync target */
3264                 rv = C_WF_BITMAP_T;
3265         } else {
3266                 rv = C_CONNECTED;
3267                 if (drbd_bm_total_weight(device)) {
3268                         drbd_info(device, "No resync, but %lu bits in bitmap!\n",
3269                              drbd_bm_total_weight(device));
3270                 }
3271         }
3272
3273         return rv;
3274 }
3275
3276 static enum drbd_after_sb_p convert_after_sb(enum drbd_after_sb_p peer)
3277 {
3278         /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
3279         if (peer == ASB_DISCARD_REMOTE)
3280                 return ASB_DISCARD_LOCAL;
3281
3282         /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
3283         if (peer == ASB_DISCARD_LOCAL)
3284                 return ASB_DISCARD_REMOTE;
3285
3286         /* everything else is valid if they are equal on both sides. */
3287         return peer;
3288 }
3289
3290 static int receive_protocol(struct drbd_connection *connection, struct packet_info *pi)
3291 {
3292         struct p_protocol *p = pi->data;
3293         enum drbd_after_sb_p p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
3294         int p_proto, p_discard_my_data, p_two_primaries, cf;
3295         struct net_conf *nc, *old_net_conf, *new_net_conf = NULL;
3296         char integrity_alg[SHARED_SECRET_MAX] = "";
3297         struct crypto_hash *peer_integrity_tfm = NULL;
3298         void *int_dig_in = NULL, *int_dig_vv = NULL;
3299
3300         p_proto         = be32_to_cpu(p->protocol);
3301         p_after_sb_0p   = be32_to_cpu(p->after_sb_0p);
3302         p_after_sb_1p   = be32_to_cpu(p->after_sb_1p);
3303         p_after_sb_2p   = be32_to_cpu(p->after_sb_2p);
3304         p_two_primaries = be32_to_cpu(p->two_primaries);
3305         cf              = be32_to_cpu(p->conn_flags);
3306         p_discard_my_data = cf & CF_DISCARD_MY_DATA;
3307
3308         if (connection->agreed_pro_version >= 87) {
3309                 int err;
3310
3311                 if (pi->size > sizeof(integrity_alg))
3312                         return -EIO;
3313                 err = drbd_recv_all(connection, integrity_alg, pi->size);
3314                 if (err)
3315                         return err;
3316                 integrity_alg[SHARED_SECRET_MAX - 1] = 0;
3317         }
3318
3319         if (pi->cmd != P_PROTOCOL_UPDATE) {
3320                 clear_bit(CONN_DRY_RUN, &connection->flags);
3321
3322                 if (cf & CF_DRY_RUN)
3323                         set_bit(CONN_DRY_RUN, &connection->flags);
3324
3325                 rcu_read_lock();
3326                 nc = rcu_dereference(connection->net_conf);
3327
3328                 if (p_proto != nc->wire_protocol) {
3329                         drbd_err(connection, "incompatible %s settings\n", "protocol");
3330                         goto disconnect_rcu_unlock;
3331                 }
3332
3333                 if (convert_after_sb(p_after_sb_0p) != nc->after_sb_0p) {
3334                         drbd_err(connection, "incompatible %s settings\n", "after-sb-0pri");
3335                         goto disconnect_rcu_unlock;
3336                 }
3337
3338                 if (convert_after_sb(p_after_sb_1p) != nc->after_sb_1p) {
3339                         drbd_err(connection, "incompatible %s settings\n", "after-sb-1pri");
3340                         goto disconnect_rcu_unlock;
3341                 }
3342
3343                 if (convert_after_sb(p_after_sb_2p) != nc->after_sb_2p) {
3344                         drbd_err(connection, "incompatible %s settings\n", "after-sb-2pri");
3345                         goto disconnect_rcu_unlock;
3346                 }
3347
3348                 if (p_discard_my_data && nc->discard_my_data) {
3349                         drbd_err(connection, "incompatible %s settings\n", "discard-my-data");
3350                         goto disconnect_rcu_unlock;
3351                 }
3352
3353                 if (p_two_primaries != nc->two_primaries) {
3354                         drbd_err(connection, "incompatible %s settings\n", "allow-two-primaries");
3355                         goto disconnect_rcu_unlock;
3356                 }
3357
3358                 if (strcmp(integrity_alg, nc->integrity_alg)) {
3359                         drbd_err(connection, "incompatible %s settings\n", "data-integrity-alg");
3360                         goto disconnect_rcu_unlock;
3361                 }
3362
3363                 rcu_read_unlock();
3364         }
3365
3366         if (integrity_alg[0]) {
3367                 int hash_size;
3368
3369                 /*
3370                  * We can only change the peer data integrity algorithm
3371                  * here.  Changing our own data integrity algorithm
3372                  * requires that we send a P_PROTOCOL_UPDATE packet at
3373                  * the same time; otherwise, the peer has no way to
3374                  * tell between which packets the algorithm should
3375                  * change.
3376                  */
3377
3378                 peer_integrity_tfm = crypto_alloc_hash(integrity_alg, 0, CRYPTO_ALG_ASYNC);
3379                 if (!peer_integrity_tfm) {
3380                         drbd_err(connection, "peer data-integrity-alg %s not supported\n",
3381                                  integrity_alg);
3382                         goto disconnect;
3383                 }
3384
3385                 hash_size = crypto_hash_digestsize(peer_integrity_tfm);
3386                 int_dig_in = kmalloc(hash_size, GFP_KERNEL);
3387                 int_dig_vv = kmalloc(hash_size, GFP_KERNEL);
3388                 if (!(int_dig_in && int_dig_vv)) {
3389                         drbd_err(connection, "Allocation of buffers for data integrity checking failed\n");
3390                         goto disconnect;
3391                 }
3392         }
3393
3394         new_net_conf = kmalloc(sizeof(struct net_conf), GFP_KERNEL);
3395         if (!new_net_conf) {
3396                 drbd_err(connection, "Allocation of new net_conf failed\n");
3397                 goto disconnect;
3398         }
3399
3400         mutex_lock(&connection->data.mutex);
3401         mutex_lock(&connection->resource->conf_update);
3402         old_net_conf = connection->net_conf;
3403         *new_net_conf = *old_net_conf;
3404
3405         new_net_conf->wire_protocol = p_proto;
3406         new_net_conf->after_sb_0p = convert_after_sb(p_after_sb_0p);
3407         new_net_conf->after_sb_1p = convert_after_sb(p_after_sb_1p);
3408         new_net_conf->after_sb_2p = convert_after_sb(p_after_sb_2p);
3409         new_net_conf->two_primaries = p_two_primaries;
3410
3411         rcu_assign_pointer(connection->net_conf, new_net_conf);
3412         mutex_unlock(&connection->resource->conf_update);
3413         mutex_unlock(&connection->data.mutex);
3414
3415         crypto_free_hash(connection->peer_integrity_tfm);
3416         kfree(connection->int_dig_in);
3417         kfree(connection->int_dig_vv);
3418         connection->peer_integrity_tfm = peer_integrity_tfm;
3419         connection->int_dig_in = int_dig_in;
3420         connection->int_dig_vv = int_dig_vv;
3421
3422         if (strcmp(old_net_conf->integrity_alg, integrity_alg))
3423                 drbd_info(connection, "peer data-integrity-alg: %s\n",
3424                           integrity_alg[0] ? integrity_alg : "(none)");
3425
3426         synchronize_rcu();
3427         kfree(old_net_conf);
3428         return 0;
3429
3430 disconnect_rcu_unlock:
3431         rcu_read_unlock();
3432 disconnect:
3433         crypto_free_hash(peer_integrity_tfm);
3434         kfree(int_dig_in);
3435         kfree(int_dig_vv);
3436         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
3437         return -EIO;
3438 }
3439
3440 /* helper function
3441  * input: alg name, feature name
3442  * return: NULL (alg name was "")
3443  *         ERR_PTR(error) if something goes wrong
3444  *         or the crypto hash ptr, if it worked out ok. */
3445 static struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_device *device,
3446                 const char *alg, const char *name)
3447 {
3448         struct crypto_hash *tfm;
3449
3450         if (!alg[0])
3451                 return NULL;
3452
3453         tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
3454         if (IS_ERR(tfm)) {
3455                 drbd_err(device, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3456                         alg, name, PTR_ERR(tfm));
3457                 return tfm;
3458         }
3459         return tfm;
3460 }
3461
3462 static int ignore_remaining_packet(struct drbd_connection *connection, struct packet_info *pi)
3463 {
3464         void *buffer = connection->data.rbuf;
3465         int size = pi->size;
3466
3467         while (size) {
3468                 int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3469                 s = drbd_recv(connection, buffer, s);
3470                 if (s <= 0) {
3471                         if (s < 0)
3472                                 return s;
3473                         break;
3474                 }
3475                 size -= s;
3476         }
3477         if (size)
3478                 return -EIO;
3479         return 0;
3480 }
3481
3482 /*
3483  * config_unknown_volume  -  device configuration command for unknown volume
3484  *
3485  * When a device is added to an existing connection, the node on which the
3486  * device is added first will send configuration commands to its peer but the
3487  * peer will not know about the device yet.  It will warn and ignore these
3488  * commands.  Once the device is added on the second node, the second node will
3489  * send the same device configuration commands, but in the other direction.
3490  *
3491  * (We can also end up here if drbd is misconfigured.)
3492  */
3493 static int config_unknown_volume(struct drbd_connection *connection, struct packet_info *pi)
3494 {
3495         drbd_warn(connection, "%s packet received for volume %u, which is not configured locally\n",
3496                   cmdname(pi->cmd), pi->vnr);
3497         return ignore_remaining_packet(connection, pi);
3498 }
3499
3500 static int receive_SyncParam(struct drbd_connection *connection, struct packet_info *pi)
3501 {
3502         struct drbd_peer_device *peer_device;
3503         struct drbd_device *device;
3504         struct p_rs_param_95 *p;
3505         unsigned int header_size, data_size, exp_max_sz;
3506         struct crypto_hash *verify_tfm = NULL;
3507         struct crypto_hash *csums_tfm = NULL;
3508         struct net_conf *old_net_conf, *new_net_conf = NULL;
3509         struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL;
3510         const int apv = connection->agreed_pro_version;
3511         struct fifo_buffer *old_plan = NULL, *new_plan = NULL;
3512         int fifo_size = 0;
3513         int err;
3514
3515         peer_device = conn_peer_device(connection, pi->vnr);
3516         if (!peer_device)
3517                 return config_unknown_volume(connection, pi);
3518         device = peer_device->device;
3519
3520         exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
3521                     : apv == 88 ? sizeof(struct p_rs_param)
3522                                         + SHARED_SECRET_MAX
3523                     : apv <= 94 ? sizeof(struct p_rs_param_89)
3524                     : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3525
3526         if (pi->size > exp_max_sz) {
3527                 drbd_err(device, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3528                     pi->size, exp_max_sz);
3529                 return -EIO;
3530         }
3531
3532         if (apv <= 88) {
3533                 header_size = sizeof(struct p_rs_param);
3534                 data_size = pi->size - header_size;
3535         } else if (apv <= 94) {
3536                 header_size = sizeof(struct p_rs_param_89);
3537                 data_size = pi->size - header_size;
3538                 D_ASSERT(device, data_size == 0);
3539         } else {
3540                 header_size = sizeof(struct p_rs_param_95);
3541                 data_size = pi->size - header_size;
3542                 D_ASSERT(device, data_size == 0);
3543         }
3544
3545         /* initialize verify_alg and csums_alg */
3546         p = pi->data;
3547         memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3548
3549         err = drbd_recv_all(peer_device->connection, p, header_size);
3550         if (err)
3551                 return err;
3552
3553         mutex_lock(&connection->resource->conf_update);
3554         old_net_conf = peer_device->connection->net_conf;
3555         if (get_ldev(device)) {
3556                 new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3557                 if (!new_disk_conf) {
3558                         put_ldev(device);
3559                         mutex_unlock(&connection->resource->conf_update);
3560                         drbd_err(device, "Allocation of new disk_conf failed\n");
3561                         return -ENOMEM;
3562                 }
3563
3564                 old_disk_conf = device->ldev->disk_conf;
3565                 *new_disk_conf = *old_disk_conf;
3566
3567                 new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate);
3568         }
3569
3570         if (apv >= 88) {
3571                 if (apv == 88) {
3572                         if (data_size > SHARED_SECRET_MAX || data_size == 0) {
3573                                 drbd_err(device, "verify-alg of wrong size, "
3574                                         "peer wants %u, accepting only up to %u byte\n",
3575                                         data_size, SHARED_SECRET_MAX);
3576                                 err = -EIO;
3577                                 goto reconnect;
3578                         }
3579
3580                         err = drbd_recv_all(peer_device->connection, p->verify_alg, data_size);
3581                         if (err)
3582                                 goto reconnect;
3583                         /* we expect NUL terminated string */
3584                         /* but just in case someone tries to be evil */
3585                         D_ASSERT(device, p->verify_alg[data_size-1] == 0);
3586                         p->verify_alg[data_size-1] = 0;
3587
3588                 } else /* apv >= 89 */ {
3589                         /* we still expect NUL terminated strings */
3590                         /* but just in case someone tries to be evil */
3591                         D_ASSERT(device, p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3592                         D_ASSERT(device, p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3593                         p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3594                         p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3595                 }
3596
3597                 if (strcmp(old_net_conf->verify_alg, p->verify_alg)) {
3598                         if (device->state.conn == C_WF_REPORT_PARAMS) {
3599                                 drbd_err(device, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3600                                     old_net_conf->verify_alg, p->verify_alg);
3601                                 goto disconnect;
3602                         }
3603                         verify_tfm = drbd_crypto_alloc_digest_safe(device,
3604                                         p->verify_alg, "verify-alg");
3605                         if (IS_ERR(verify_tfm)) {
3606                                 verify_tfm = NULL;
3607                                 goto disconnect;
3608                         }
3609                 }
3610
3611                 if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) {
3612                         if (device->state.conn == C_WF_REPORT_PARAMS) {
3613                                 drbd_err(device, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3614                                     old_net_conf->csums_alg, p->csums_alg);
3615                                 goto disconnect;
3616                         }
3617                         csums_tfm = drbd_crypto_alloc_digest_safe(device,
3618                                         p->csums_alg, "csums-alg");
3619                         if (IS_ERR(csums_tfm)) {
3620                                 csums_tfm = NULL;
3621                                 goto disconnect;
3622                         }
3623                 }
3624
3625                 if (apv > 94 && new_disk_conf) {
3626                         new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3627                         new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target);
3628                         new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target);
3629                         new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate);
3630
3631                         fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ;
3632                         if (fifo_size != device->rs_plan_s->size) {
3633                                 new_plan = fifo_alloc(fifo_size);
3634                                 if (!new_plan) {
3635                                         drbd_err(device, "kmalloc of fifo_buffer failed");
3636                                         put_ldev(device);
3637                                         goto disconnect;
3638                                 }
3639                         }
3640                 }
3641
3642                 if (verify_tfm || csums_tfm) {
3643                         new_net_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL);
3644                         if (!new_net_conf) {
3645                                 drbd_err(device, "Allocation of new net_conf failed\n");
3646                                 goto disconnect;
3647                         }
3648
3649                         *new_net_conf = *old_net_conf;
3650
3651                         if (verify_tfm) {
3652                                 strcpy(new_net_conf->verify_alg, p->verify_alg);
3653                                 new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3654                                 crypto_free_hash(peer_device->connection->verify_tfm);
3655                                 peer_device->connection->verify_tfm = verify_tfm;
3656                                 drbd_info(device, "using verify-alg: \"%s\"\n", p->verify_alg);
3657                         }
3658                         if (csums_tfm) {
3659                                 strcpy(new_net_conf->csums_alg, p->csums_alg);
3660                                 new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3661                                 crypto_free_hash(peer_device->connection->csums_tfm);
3662                                 peer_device->connection->csums_tfm = csums_tfm;
3663                                 drbd_info(device, "using csums-alg: \"%s\"\n", p->csums_alg);
3664                         }
3665                         rcu_assign_pointer(connection->net_conf, new_net_conf);
3666                 }
3667         }
3668
3669         if (new_disk_conf) {
3670                 rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
3671                 put_ldev(device);
3672         }
3673
3674         if (new_plan) {
3675                 old_plan = device->rs_plan_s;
3676                 rcu_assign_pointer(device->rs_plan_s, new_plan);
3677         }
3678
3679         mutex_unlock(&connection->resource->conf_update);
3680         synchronize_rcu();
3681         if (new_net_conf)
3682                 kfree(old_net_conf);
3683         kfree(old_disk_conf);
3684         kfree(old_plan);
3685
3686         return 0;
3687
3688 reconnect:
3689         if (new_disk_conf) {
3690                 put_ldev(device);
3691                 kfree(new_disk_conf);
3692         }
3693         mutex_unlock(&connection->resource->conf_update);
3694         return -EIO;
3695
3696 disconnect:
3697         kfree(new_plan);
3698         if (new_disk_conf) {
3699                 put_ldev(device);
3700                 kfree(new_disk_conf);
3701         }
3702         mutex_unlock(&connection->resource->conf_update);
3703         /* just for completeness: actually not needed,
3704          * as this is not reached if csums_tfm was ok. */
3705         crypto_free_hash(csums_tfm);
3706         /* but free the verify_tfm again, if csums_tfm did not work out */
3707         crypto_free_hash(verify_tfm);
3708         conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3709         return -EIO;
3710 }
3711
3712 /* warn if the arguments differ by more than 12.5% */
3713 static void warn_if_differ_considerably(struct drbd_device *device,
3714         const char *s, sector_t a, sector_t b)
3715 {
3716         sector_t d;
3717         if (a == 0 || b == 0)
3718                 return;
3719         d = (a > b) ? (a - b) : (b - a);
3720         if (d > (a>>3) || d > (b>>3))
3721                 drbd_warn(device, "Considerable difference in %s: %llus vs. %llus\n", s,
3722                      (unsigned long long)a, (unsigned long long)b);
3723 }
3724
3725 static int receive_sizes(struct drbd_connection *connection, struct packet_info *pi)
3726 {
3727         struct drbd_peer_device *peer_device;
3728         struct drbd_device *device;
3729         struct p_sizes *p = pi->data;
3730         enum determine_dev_size dd = DS_UNCHANGED;
3731         sector_t p_size, p_usize, p_csize, my_usize;
3732         int ldsc = 0; /* local disk size changed */
3733         enum dds_flags ddsf;
3734
3735         peer_device = conn_peer_device(connection, pi->vnr);
3736         if (!peer_device)
3737                 return config_unknown_volume(connection, pi);
3738         device = peer_device->device;
3739
3740         p_size = be64_to_cpu(p->d_size);
3741         p_usize = be64_to_cpu(p->u_size);
3742         p_csize = be64_to_cpu(p->c_size);
3743
3744         /* just store the peer's disk size for now.
3745          * we still need to figure out whether we accept that. */
3746         device->p_size = p_size;
3747
3748         if (get_ldev(device)) {
3749                 rcu_read_lock();
3750                 my_usize = rcu_dereference(device->ldev->disk_conf)->disk_size;
3751                 rcu_read_unlock();
3752
3753                 warn_if_differ_considerably(device, "lower level device sizes",
3754                            p_size, drbd_get_max_capacity(device->ldev));
3755                 warn_if_differ_considerably(device, "user requested size",
3756                                             p_usize, my_usize);
3757
3758                 /* if this is the first connect, or an otherwise expected
3759                  * param exchange, choose the minimum */
3760                 if (device->state.conn == C_WF_REPORT_PARAMS)
3761                         p_usize = min_not_zero(my_usize, p_usize);
3762
3763                 /* Never shrink a device with usable data during connect.
3764                    But allow online shrinking if we are connected. */
3765                 if (drbd_new_dev_size(device, device->ldev, p_usize, 0) <
3766                     drbd_get_capacity(device->this_bdev) &&
3767                     device->state.disk >= D_OUTDATED &&
3768                     device->state.conn < C_CONNECTED) {
3769                         drbd_err(device, "The peer's disk size is too small!\n");
3770                         conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3771                         put_ldev(device);
3772                         return -EIO;
3773                 }
3774
3775                 if (my_usize != p_usize) {
3776                         struct disk_conf *old_disk_conf, *new_disk_conf = NULL;
3777
3778                         new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3779                         if (!new_disk_conf) {
3780                                 drbd_err(device, "Allocation of new disk_conf failed\n");
3781                                 put_ldev(device);
3782                                 return -ENOMEM;
3783                         }
3784
3785                         mutex_lock(&connection->resource->conf_update);
3786                         old_disk_conf = device->ldev->disk_conf;
3787                         *new_disk_conf = *old_disk_conf;
3788                         new_disk_conf->disk_size = p_usize;
3789
3790                         rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
3791                         mutex_unlock(&connection->resource->conf_update);
3792                         synchronize_rcu();
3793                         kfree(old_disk_conf);
3794
3795                         drbd_info(device, "Peer sets u_size to %lu sectors\n",
3796                                  (unsigned long)my_usize);
3797                 }
3798
3799                 put_ldev(device);
3800         }
3801
3802         device->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3803         /* Leave drbd_reconsider_max_bio_size() before drbd_determine_dev_size().
3804            In case we cleared the QUEUE_FLAG_DISCARD from our queue in
3805            drbd_reconsider_max_bio_size(), we can be sure that after
3806            drbd_determine_dev_size() no REQ_DISCARDs are in the queue. */
3807
3808         ddsf = be16_to_cpu(p->dds_flags);
3809         if (get_ldev(device)) {
3810                 drbd_reconsider_max_bio_size(device, device->ldev);
3811                 dd = drbd_determine_dev_size(device, ddsf, NULL);
3812                 put_ldev(device);
3813                 if (dd == DS_ERROR)
3814                         return -EIO;
3815                 drbd_md_sync(device);
3816         } else {
3817                 /*
3818                  * I am diskless, need to accept the peer's *current* size.
3819                  * I must NOT accept the peers backing disk size,
3820                  * it may have been larger than mine all along...
3821                  *
3822                  * At this point, the peer knows more about my disk, or at
3823                  * least about what we last agreed upon, than myself.
3824                  * So if his c_size is less than his d_size, the most likely
3825                  * reason is that *my* d_size was smaller last time we checked.
3826                  *
3827                  * However, if he sends a zero current size,
3828                  * take his (user-capped or) backing disk size anyways.
3829                  */
3830                 drbd_reconsider_max_bio_size(device, NULL);
3831                 drbd_set_my_capacity(device, p_csize ?: p_usize ?: p_size);
3832         }
3833
3834         if (get_ldev(device)) {
3835                 if (device->ldev->known_size != drbd_get_capacity(device->ldev->backing_bdev)) {
3836                         device->ldev->known_size = drbd_get_capacity(device->ldev->backing_bdev);
3837                         ldsc = 1;
3838                 }
3839
3840                 put_ldev(device);
3841         }
3842
3843         if (device->state.conn > C_WF_REPORT_PARAMS) {
3844                 if (be64_to_cpu(p->c_size) !=
3845                     drbd_get_capacity(device->this_bdev) || ldsc) {
3846                         /* we have different sizes, probably peer
3847                          * needs to know my new size... */
3848                         drbd_send_sizes(peer_device, 0, ddsf);
3849                 }
3850                 if (test_and_clear_bit(RESIZE_PENDING, &device->flags) ||
3851                     (dd == DS_GREW && device->state.conn == C_CONNECTED)) {
3852                         if (device->state.pdsk >= D_INCONSISTENT &&
3853                             device->state.disk >= D_INCONSISTENT) {
3854                                 if (ddsf & DDSF_NO_RESYNC)
3855                                         drbd_info(device, "Resync of new storage suppressed with --assume-clean\n");
3856                                 else
3857                                         resync_after_online_grow(device);
3858                         } else
3859                                 set_bit(RESYNC_AFTER_NEG, &device->flags);
3860                 }
3861         }
3862
3863         return 0;
3864 }
3865
3866 static int receive_uuids(struct drbd_connection *connection, struct packet_info *pi)
3867 {
3868         struct drbd_peer_device *peer_device;
3869         struct drbd_device *device;
3870         struct p_uuids *p = pi->data;
3871         u64 *p_uuid;
3872         int i, updated_uuids = 0;
3873
3874         peer_device = conn_peer_device(connection, pi->vnr);
3875         if (!peer_device)
3876                 return config_unknown_volume(connection, pi);
3877         device = peer_device->device;
3878
3879         p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3880         if (!p_uuid) {
3881                 drbd_err(device, "kmalloc of p_uuid failed\n");
3882                 return false;
3883         }
3884
3885         for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3886                 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3887
3888         kfree(device->p_uuid);
3889         device->p_uuid = p_uuid;
3890
3891         if (device->state.conn < C_CONNECTED &&
3892             device->state.disk < D_INCONSISTENT &&
3893             device->state.role == R_PRIMARY &&
3894             (device->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3895                 drbd_err(device, "Can only connect to data with current UUID=%016llX\n",
3896                     (unsigned long long)device->ed_uuid);
3897                 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3898                 return -EIO;
3899         }
3900
3901         if (get_ldev(device)) {
3902                 int skip_initial_sync =
3903                         device->state.conn == C_CONNECTED &&
3904                         peer_device->connection->agreed_pro_version >= 90 &&
3905                         device->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3906                         (p_uuid[UI_FLAGS] & 8);
3907                 if (skip_initial_sync) {
3908                         drbd_info(device, "Accepted new current UUID, preparing to skip initial sync\n");
3909                         drbd_bitmap_io(device, &drbd_bmio_clear_n_write,
3910                                         "clear_n_write from receive_uuids",
3911                                         BM_LOCKED_TEST_ALLOWED);
3912                         _drbd_uuid_set(device, UI_CURRENT, p_uuid[UI_CURRENT]);
3913                         _drbd_uuid_set(device, UI_BITMAP, 0);
3914                         _drbd_set_state(_NS2(device, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3915                                         CS_VERBOSE, NULL);
3916                         drbd_md_sync(device);
3917                         updated_uuids = 1;
3918                 }
3919                 put_ldev(device);
3920         } else if (device->state.disk < D_INCONSISTENT &&
3921                    device->state.role == R_PRIMARY) {
3922                 /* I am a diskless primary, the peer just created a new current UUID
3923                    for me. */
3924                 updated_uuids = drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
3925         }
3926
3927         /* Before we test for the disk state, we should wait until an eventually
3928            ongoing cluster wide state change is finished. That is important if
3929            we are primary and are detaching from our disk. We need to see the
3930            new disk state... */
3931         mutex_lock(device->state_mutex);
3932         mutex_unlock(device->state_mutex);
3933         if (device->state.conn >= C_CONNECTED && device->state.disk < D_INCONSISTENT)
3934                 updated_uuids |= drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
3935
3936         if (updated_uuids)
3937                 drbd_print_uuids(device, "receiver updated UUIDs to");
3938
3939         return 0;
3940 }
3941
3942 /**
3943  * convert_state() - Converts the peer's view of the cluster state to our point of view
3944  * @ps:         The state as seen by the peer.
3945  */
3946 static union drbd_state convert_state(union drbd_state ps)
3947 {
3948         union drbd_state ms;
3949
3950         static enum drbd_conns c_tab[] = {
3951                 [C_WF_REPORT_PARAMS] = C_WF_REPORT_PARAMS,
3952                 [C_CONNECTED] = C_CONNECTED,
3953
3954                 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3955                 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3956                 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3957                 [C_VERIFY_S]       = C_VERIFY_T,
3958                 [C_MASK]   = C_MASK,
3959         };
3960
3961         ms.i = ps.i;
3962
3963         ms.conn = c_tab[ps.conn];
3964         ms.peer = ps.role;
3965         ms.role = ps.peer;
3966         ms.pdsk = ps.disk;
3967         ms.disk = ps.pdsk;
3968         ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3969
3970         return ms;
3971 }
3972
3973 static int receive_req_state(struct drbd_connection *connection, struct packet_info *pi)
3974 {
3975         struct drbd_peer_device *peer_device;
3976         struct drbd_device *device;
3977         struct p_req_state *p = pi->data;
3978         union drbd_state mask, val;
3979         enum drbd_state_rv rv;
3980
3981         peer_device = conn_peer_device(connection, pi->vnr);
3982         if (!peer_device)
3983                 return -EIO;
3984         device = peer_device->device;
3985
3986         mask.i = be32_to_cpu(p->mask);
3987         val.i = be32_to_cpu(p->val);
3988
3989         if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags) &&
3990             mutex_is_locked(device->state_mutex)) {
3991                 drbd_send_sr_reply(peer_device, SS_CONCURRENT_ST_CHG);
3992                 return 0;
3993         }
3994
3995         mask = convert_state(mask);
3996         val = convert_state(val);
3997
3998         rv = drbd_change_state(device, CS_VERBOSE, mask, val);
3999         drbd_send_sr_reply(peer_device, rv);
4000
4001         drbd_md_sync(device);
4002
4003         return 0;
4004 }
4005
4006 static int receive_req_conn_state(struct drbd_connection *connection, struct packet_info *pi)
4007 {
4008         struct p_req_state *p = pi->data;
4009         union drbd_state mask, val;
4010         enum drbd_state_rv rv;
4011
4012         mask.i = be32_to_cpu(p->mask);
4013         val.i = be32_to_cpu(p->val);
4014
4015         if (test_bit(RESOLVE_CONFLICTS, &connection->flags) &&
4016             mutex_is_locked(&connection->cstate_mutex)) {
4017                 conn_send_sr_reply(connection, SS_CONCURRENT_ST_CHG);
4018                 return 0;
4019         }
4020
4021         mask = convert_state(mask);
4022         val = convert_state(val);
4023
4024         rv = conn_request_state(connection, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
4025         conn_send_sr_reply(connection, rv);
4026
4027         return 0;
4028 }
4029
4030 static int receive_state(struct drbd_connection *connection, struct packet_info *pi)
4031 {
4032         struct drbd_peer_device *peer_device;
4033         struct drbd_device *device;
4034         struct p_state *p = pi->data;
4035         union drbd_state os, ns, peer_state;
4036         enum drbd_disk_state real_peer_disk;
4037         enum chg_state_flags cs_flags;
4038         int rv;
4039
4040         peer_device = conn_peer_device(connection, pi->vnr);
4041         if (!peer_device)
4042                 return config_unknown_volume(connection, pi);
4043         device = peer_device->device;
4044
4045         peer_state.i = be32_to_cpu(p->state);
4046
4047         real_peer_disk = peer_state.disk;
4048         if (peer_state.disk == D_NEGOTIATING) {
4049                 real_peer_disk = device->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
4050                 drbd_info(device, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
4051         }
4052
4053         spin_lock_irq(&device->resource->req_lock);
4054  retry:
4055         os = ns = drbd_read_state(device);
4056         spin_unlock_irq(&device->resource->req_lock);
4057
4058         /* If some other part of the code (asender thread, timeout)
4059          * already decided to close the connection again,
4060          * we must not "re-establish" it here. */
4061         if (os.conn <= C_TEAR_DOWN)
4062                 return -ECONNRESET;
4063
4064         /* If this is the "end of sync" confirmation, usually the peer disk
4065          * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits
4066          * set) resync started in PausedSyncT, or if the timing of pause-/
4067          * unpause-sync events has been "just right", the peer disk may
4068          * transition from D_CONSISTENT to D_UP_TO_DATE as well.
4069          */
4070         if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) &&
4071             real_peer_disk == D_UP_TO_DATE &&
4072             os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
4073                 /* If we are (becoming) SyncSource, but peer is still in sync
4074                  * preparation, ignore its uptodate-ness to avoid flapping, it
4075                  * will change to inconsistent once the peer reaches active
4076                  * syncing states.
4077                  * It may have changed syncer-paused flags, however, so we
4078                  * cannot ignore this completely. */
4079                 if (peer_state.conn > C_CONNECTED &&
4080                     peer_state.conn < C_SYNC_SOURCE)
4081                         real_peer_disk = D_INCONSISTENT;
4082
4083                 /* if peer_state changes to connected at the same time,
4084                  * it explicitly notifies us that it finished resync.
4085                  * Maybe we should finish it up, too? */
4086                 else if (os.conn >= C_SYNC_SOURCE &&
4087                          peer_state.conn == C_CONNECTED) {
4088                         if (drbd_bm_total_weight(device) <= device->rs_failed)
4089                                 drbd_resync_finished(device);
4090                         return 0;
4091                 }
4092         }
4093
4094         /* explicit verify finished notification, stop sector reached. */
4095         if (os.conn == C_VERIFY_T && os.disk == D_UP_TO_DATE &&
4096             peer_state.conn == C_CONNECTED && real_peer_disk == D_UP_TO_DATE) {
4097                 ov_out_of_sync_print(device);
4098                 drbd_resync_finished(device);
4099                 return 0;
4100         }
4101
4102         /* peer says his disk is inconsistent, while we think it is uptodate,
4103          * and this happens while the peer still thinks we have a sync going on,
4104          * but we think we are already done with the sync.
4105          * We ignore this to avoid flapping pdsk.
4106          * This should not happen, if the peer is a recent version of drbd. */
4107         if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
4108             os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
4109                 real_peer_disk = D_UP_TO_DATE;
4110
4111         if (ns.conn == C_WF_REPORT_PARAMS)
4112                 ns.conn = C_CONNECTED;
4113
4114         if (peer_state.conn == C_AHEAD)
4115                 ns.conn = C_BEHIND;
4116
4117         if (device->p_uuid && peer_state.disk >= D_NEGOTIATING &&
4118             get_ldev_if_state(device, D_NEGOTIATING)) {
4119                 int cr; /* consider resync */
4120
4121                 /* if we established a new connection */
4122                 cr  = (os.conn < C_CONNECTED);
4123                 /* if we had an established connection
4124                  * and one of the nodes newly attaches a disk */
4125                 cr |= (os.conn == C_CONNECTED &&
4126                        (peer_state.disk == D_NEGOTIATING ||
4127                         os.disk == D_NEGOTIATING));
4128                 /* if we have both been inconsistent, and the peer has been
4129                  * forced to be UpToDate with --overwrite-data */
4130                 cr |= test_bit(CONSIDER_RESYNC, &device->flags);
4131                 /* if we had been plain connected, and the admin requested to
4132                  * start a sync by "invalidate" or "invalidate-remote" */
4133                 cr |= (os.conn == C_CONNECTED &&
4134                                 (peer_state.conn >= C_STARTING_SYNC_S &&
4135                                  peer_state.conn <= C_WF_BITMAP_T));
4136
4137                 if (cr)
4138                         ns.conn = drbd_sync_handshake(peer_device, peer_state.role, real_peer_disk);
4139
4140                 put_ldev(device);
4141                 if (ns.conn == C_MASK) {
4142                         ns.conn = C_CONNECTED;
4143                         if (device->state.disk == D_NEGOTIATING) {
4144                                 drbd_force_state(device, NS(disk, D_FAILED));
4145                         } else if (peer_state.disk == D_NEGOTIATING) {
4146                                 drbd_err(device, "Disk attach process on the peer node was aborted.\n");
4147                                 peer_state.disk = D_DISKLESS;
4148                                 real_peer_disk = D_DISKLESS;
4149                         } else {
4150                                 if (test_and_clear_bit(CONN_DRY_RUN, &peer_device->connection->flags))
4151                                         return -EIO;
4152                                 D_ASSERT(device, os.conn == C_WF_REPORT_PARAMS);
4153                                 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4154                                 return -EIO;
4155                         }
4156                 }
4157         }
4158
4159         spin_lock_irq(&device->resource->req_lock);
4160         if (os.i != drbd_read_state(device).i)
4161                 goto retry;
4162         clear_bit(CONSIDER_RESYNC, &device->flags);
4163         ns.peer = peer_state.role;
4164         ns.pdsk = real_peer_disk;
4165         ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
4166         if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
4167                 ns.disk = device->new_state_tmp.disk;
4168         cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
4169         if (ns.pdsk == D_CONSISTENT && drbd_suspended(device) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
4170             test_bit(NEW_CUR_UUID, &device->flags)) {
4171                 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
4172                    for temporal network outages! */
4173                 spin_unlock_irq(&device->resource->req_lock);
4174                 drbd_err(device, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
4175                 tl_clear(peer_device->connection);
4176                 drbd_uuid_new_current(device);
4177                 clear_bit(NEW_CUR_UUID, &device->flags);
4178                 conn_request_state(peer_device->connection, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
4179                 return -EIO;
4180         }
4181         rv = _drbd_set_state(device, ns, cs_flags, NULL);
4182         ns = drbd_read_state(device);
4183         spin_unlock_irq(&device->resource->req_lock);
4184
4185         if (rv < SS_SUCCESS) {
4186                 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4187                 return -EIO;
4188         }
4189
4190         if (os.conn > C_WF_REPORT_PARAMS) {
4191                 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
4192                     peer_state.disk != D_NEGOTIATING ) {
4193                         /* we want resync, peer has not yet decided to sync... */
4194                         /* Nowadays only used when forcing a node into primary role and
4195                            setting its disk to UpToDate with that */
4196                         drbd_send_uuids(peer_device);
4197                         drbd_send_current_state(peer_device);
4198                 }
4199         }
4200
4201         clear_bit(DISCARD_MY_DATA, &device->flags);
4202
4203         drbd_md_sync(device); /* update connected indicator, la_size_sect, ... */
4204
4205         return 0;
4206 }
4207
4208 static int receive_sync_uuid(struct drbd_connection *connection, struct packet_info *pi)
4209 {
4210         struct drbd_peer_device *peer_device;
4211         struct drbd_device *device;
4212         struct p_rs_uuid *p = pi->data;
4213
4214         peer_device = conn_peer_device(connection, pi->vnr);
4215         if (!peer_device)
4216                 return -EIO;
4217         device = peer_device->device;
4218
4219         wait_event(device->misc_wait,
4220                    device->state.conn == C_WF_SYNC_UUID ||
4221                    device->state.conn == C_BEHIND ||
4222                    device->state.conn < C_CONNECTED ||
4223                    device->state.disk < D_NEGOTIATING);
4224
4225         /* D_ASSERT(device,  device->state.conn == C_WF_SYNC_UUID ); */
4226
4227         /* Here the _drbd_uuid_ functions are right, current should
4228            _not_ be rotated into the history */
4229         if (get_ldev_if_state(device, D_NEGOTIATING)) {
4230                 _drbd_uuid_set(device, UI_CURRENT, be64_to_cpu(p->uuid));
4231                 _drbd_uuid_set(device, UI_BITMAP, 0UL);
4232
4233                 drbd_print_uuids(device, "updated sync uuid");
4234                 drbd_start_resync(device, C_SYNC_TARGET);
4235
4236                 put_ldev(device);
4237         } else
4238                 drbd_err(device, "Ignoring SyncUUID packet!\n");
4239
4240         return 0;
4241 }
4242
4243 /**
4244  * receive_bitmap_plain
4245  *
4246  * Return 0 when done, 1 when another iteration is needed, and a negative error
4247  * code upon failure.
4248  */
4249 static int
4250 receive_bitmap_plain(struct drbd_peer_device *peer_device, unsigned int size,
4251                      unsigned long *p, struct bm_xfer_ctx *c)
4252 {
4253         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
4254                                  drbd_header_size(peer_device->connection);
4255         unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
4256                                        c->bm_words - c->word_offset);
4257         unsigned int want = num_words * sizeof(*p);
4258         int err;
4259
4260         if (want != size) {
4261                 drbd_err(peer_device, "%s:want (%u) != size (%u)\n", __func__, want, size);
4262                 return -EIO;
4263         }
4264         if (want == 0)
4265                 return 0;
4266         err = drbd_recv_all(peer_device->connection, p, want);
4267         if (err)
4268                 return err;
4269
4270         drbd_bm_merge_lel(peer_device->device, c->word_offset, num_words, p);
4271
4272         c->word_offset += num_words;
4273         c->bit_offset = c->word_offset * BITS_PER_LONG;
4274         if (c->bit_offset > c->bm_bits)
4275                 c->bit_offset = c->bm_bits;
4276
4277         return 1;
4278 }
4279
4280 static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
4281 {
4282         return (enum drbd_bitmap_code)(p->encoding & 0x0f);
4283 }
4284
4285 static int dcbp_get_start(struct p_compressed_bm *p)
4286 {
4287         return (p->encoding & 0x80) != 0;
4288 }
4289
4290 static int dcbp_get_pad_bits(struct p_compressed_bm *p)
4291 {
4292         return (p->encoding >> 4) & 0x7;
4293 }
4294
4295 /**
4296  * recv_bm_rle_bits
4297  *
4298  * Return 0 when done, 1 when another iteration is needed, and a negative error
4299  * code upon failure.
4300  */
4301 static int
4302 recv_bm_rle_bits(struct drbd_peer_device *peer_device,
4303                 struct p_compressed_bm *p,
4304                  struct bm_xfer_ctx *c,
4305                  unsigned int len)
4306 {
4307         struct bitstream bs;
4308         u64 look_ahead;
4309         u64 rl;
4310         u64 tmp;
4311         unsigned long s = c->bit_offset;
4312         unsigned long e;
4313         int toggle = dcbp_get_start(p);
4314         int have;
4315         int bits;
4316
4317         bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
4318
4319         bits = bitstream_get_bits(&bs, &look_ahead, 64);
4320         if (bits < 0)
4321                 return -EIO;
4322
4323         for (have = bits; have > 0; s += rl, toggle = !toggle) {
4324                 bits = vli_decode_bits(&rl, look_ahead);
4325                 if (bits <= 0)
4326                         return -EIO;
4327
4328                 if (toggle) {
4329                         e = s + rl -1;
4330                         if (e >= c->bm_bits) {
4331                                 drbd_err(peer_device, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
4332                                 return -EIO;
4333                         }
4334                         _drbd_bm_set_bits(peer_device->device, s, e);
4335                 }
4336
4337                 if (have < bits) {
4338                         drbd_err(peer_device, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
4339                                 have, bits, look_ahead,
4340                                 (unsigned int)(bs.cur.b - p->code),
4341                                 (unsigned int)bs.buf_len);
4342                         return -EIO;
4343                 }
4344                 /* if we consumed all 64 bits, assign 0; >> 64 is "undefined"; */
4345                 if (likely(bits < 64))
4346                         look_ahead >>= bits;
4347                 else
4348                         look_ahead = 0;
4349                 have -= bits;
4350
4351                 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
4352                 if (bits < 0)
4353                         return -EIO;
4354                 look_ahead |= tmp << have;
4355                 have += bits;
4356         }
4357
4358         c->bit_offset = s;
4359         bm_xfer_ctx_bit_to_word_offset(c);
4360
4361         return (s != c->bm_bits);
4362 }
4363
4364 /**
4365  * decode_bitmap_c
4366  *
4367  * Return 0 when done, 1 when another iteration is needed, and a negative error
4368  * code upon failure.
4369  */
4370 static int
4371 decode_bitmap_c(struct drbd_peer_device *peer_device,
4372                 struct p_compressed_bm *p,
4373                 struct bm_xfer_ctx *c,
4374                 unsigned int len)
4375 {
4376         if (dcbp_get_code(p) == RLE_VLI_Bits)
4377                 return recv_bm_rle_bits(peer_device, p, c, len - sizeof(*p));
4378
4379         /* other variants had been implemented for evaluation,
4380          * but have been dropped as this one turned out to be "best"
4381          * during all our tests. */
4382
4383         drbd_err(peer_device, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
4384         conn_request_state(peer_device->connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4385         return -EIO;
4386 }
4387
4388 void INFO_bm_xfer_stats(struct drbd_device *device,
4389                 const char *direction, struct bm_xfer_ctx *c)
4390 {
4391         /* what would it take to transfer it "plaintext" */
4392         unsigned int header_size = drbd_header_size(first_peer_device(device)->connection);
4393         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
4394         unsigned int plain =
4395                 header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
4396                 c->bm_words * sizeof(unsigned long);
4397         unsigned int total = c->bytes[0] + c->bytes[1];
4398         unsigned int r;
4399
4400         /* total can not be zero. but just in case: */
4401         if (total == 0)
4402                 return;
4403
4404         /* don't report if not compressed */
4405         if (total >= plain)
4406                 return;
4407
4408         /* total < plain. check for overflow, still */
4409         r = (total > UINT_MAX/1000) ? (total / (plain/1000))
4410                                     : (1000 * total / plain);
4411
4412         if (r > 1000)
4413                 r = 1000;
4414
4415         r = 1000 - r;
4416         drbd_info(device, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
4417              "total %u; compression: %u.%u%%\n",
4418                         direction,
4419                         c->bytes[1], c->packets[1],
4420                         c->bytes[0], c->packets[0],
4421                         total, r/10, r % 10);
4422 }
4423
4424 /* Since we are processing the bitfield from lower addresses to higher,
4425    it does not matter if the process it in 32 bit chunks or 64 bit
4426    chunks as long as it is little endian. (Understand it as byte stream,
4427    beginning with the lowest byte...) If we would use big endian
4428    we would need to process it from the highest address to the lowest,
4429    in order to be agnostic to the 32 vs 64 bits issue.
4430
4431    returns 0 on failure, 1 if we successfully received it. */
4432 static int receive_bitmap(struct drbd_connection *connection, struct packet_info *pi)
4433 {
4434         struct drbd_peer_device *peer_device;
4435         struct drbd_device *device;
4436         struct bm_xfer_ctx c;
4437         int err;
4438
4439         peer_device = conn_peer_device(connection, pi->vnr);
4440         if (!peer_device)
4441                 return -EIO;
4442         device = peer_device->device;
4443
4444         drbd_bm_lock(device, "receive bitmap", BM_LOCKED_SET_ALLOWED);
4445         /* you are supposed to send additional out-of-sync information
4446          * if you actually set bits during this phase */
4447
4448         c = (struct bm_xfer_ctx) {
4449                 .bm_bits = drbd_bm_bits(device),
4450                 .bm_words = drbd_bm_words(device),
4451         };
4452
4453         for(;;) {
4454                 if (pi->cmd == P_BITMAP)
4455                         err = receive_bitmap_plain(peer_device, pi->size, pi->data, &c);
4456                 else if (pi->cmd == P_COMPRESSED_BITMAP) {
4457                         /* MAYBE: sanity check that we speak proto >= 90,
4458                          * and the feature is enabled! */
4459                         struct p_compressed_bm *p = pi->data;
4460
4461                         if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(connection)) {
4462                                 drbd_err(device, "ReportCBitmap packet too large\n");
4463                                 err = -EIO;
4464                                 goto out;
4465                         }
4466                         if (pi->size <= sizeof(*p)) {
4467                                 drbd_err(device, "ReportCBitmap packet too small (l:%u)\n", pi->size);
4468                                 err = -EIO;
4469                                 goto out;
4470                         }
4471                         err = drbd_recv_all(peer_device->connection, p, pi->size);
4472                         if (err)
4473                                goto out;
4474                         err = decode_bitmap_c(peer_device, p, &c, pi->size);
4475                 } else {
4476                         drbd_warn(device, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
4477                         err = -EIO;
4478                         goto out;
4479                 }
4480
4481                 c.packets[pi->cmd == P_BITMAP]++;
4482                 c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(connection) + pi->size;
4483
4484                 if (err <= 0) {
4485                         if (err < 0)
4486                                 goto out;
4487                         break;
4488                 }
4489                 err = drbd_recv_header(peer_device->connection, pi);
4490                 if (err)
4491                         goto out;
4492         }
4493
4494         INFO_bm_xfer_stats(device, "receive", &c);
4495
4496         if (device->state.conn == C_WF_BITMAP_T) {
4497                 enum drbd_state_rv rv;
4498
4499                 err = drbd_send_bitmap(device);
4500                 if (err)
4501                         goto out;
4502                 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
4503                 rv = _drbd_request_state(device, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
4504                 D_ASSERT(device, rv == SS_SUCCESS);
4505         } else if (device->state.conn != C_WF_BITMAP_S) {
4506                 /* admin may have requested C_DISCONNECTING,
4507                  * other threads may have noticed network errors */
4508                 drbd_info(device, "unexpected cstate (%s) in receive_bitmap\n",
4509                     drbd_conn_str(device->state.conn));
4510         }
4511         err = 0;
4512
4513  out:
4514         drbd_bm_unlock(device);
4515         if (!err && device->state.conn == C_WF_BITMAP_S)
4516                 drbd_start_resync(device, C_SYNC_SOURCE);
4517         return err;
4518 }
4519
4520 static int receive_skip(struct drbd_connection *connection, struct packet_info *pi)
4521 {
4522         drbd_warn(connection, "skipping unknown optional packet type %d, l: %d!\n",
4523                  pi->cmd, pi->size);
4524
4525         return ignore_remaining_packet(connection, pi);
4526 }
4527
4528 static int receive_UnplugRemote(struct drbd_connection *connection, struct packet_info *pi)
4529 {
4530         /* Make sure we've acked all the TCP data associated
4531          * with the data requests being unplugged */
4532         drbd_tcp_quickack(connection->data.socket);
4533
4534         return 0;
4535 }
4536
4537 static int receive_out_of_sync(struct drbd_connection *connection, struct packet_info *pi)
4538 {
4539         struct drbd_peer_device *peer_device;
4540         struct drbd_device *device;
4541         struct p_block_desc *p = pi->data;
4542
4543         peer_device = conn_peer_device(connection, pi->vnr);
4544         if (!peer_device)
4545                 return -EIO;
4546         device = peer_device->device;
4547
4548         switch (device->state.conn) {
4549         case C_WF_SYNC_UUID:
4550         case C_WF_BITMAP_T:
4551         case C_BEHIND:
4552                         break;
4553         default:
4554                 drbd_err(device, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
4555                                 drbd_conn_str(device->state.conn));
4556         }
4557
4558         drbd_set_out_of_sync(device, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
4559
4560         return 0;
4561 }
4562
4563 struct data_cmd {
4564         int expect_payload;
4565         size_t pkt_size;
4566         int (*fn)(struct drbd_connection *, struct packet_info *);
4567 };
4568
4569 static struct data_cmd drbd_cmd_handler[] = {
4570         [P_DATA]            = { 1, sizeof(struct p_data), receive_Data },
4571         [P_DATA_REPLY]      = { 1, sizeof(struct p_data), receive_DataReply },
4572         [P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
4573         [P_BARRIER]         = { 0, sizeof(struct p_barrier), receive_Barrier } ,
4574         [P_BITMAP]          = { 1, 0, receive_bitmap } ,
4575         [P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
4576         [P_UNPLUG_REMOTE]   = { 0, 0, receive_UnplugRemote },
4577         [P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
4578         [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4579         [P_SYNC_PARAM]      = { 1, 0, receive_SyncParam },
4580         [P_SYNC_PARAM89]    = { 1, 0, receive_SyncParam },
4581         [P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
4582         [P_UUIDS]           = { 0, sizeof(struct p_uuids), receive_uuids },
4583         [P_SIZES]           = { 0, sizeof(struct p_sizes), receive_sizes },
4584         [P_STATE]           = { 0, sizeof(struct p_state), receive_state },
4585         [P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
4586         [P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
4587         [P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
4588         [P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
4589         [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4590         [P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
4591         [P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
4592         [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
4593         [P_PROTOCOL_UPDATE] = { 1, sizeof(struct p_protocol), receive_protocol },
4594         [P_TRIM]            = { 0, sizeof(struct p_trim), receive_Data },
4595 };
4596
4597 static void drbdd(struct drbd_connection *connection)
4598 {
4599         struct packet_info pi;
4600         size_t shs; /* sub header size */
4601         int err;
4602
4603         while (get_t_state(&connection->receiver) == RUNNING) {
4604                 struct data_cmd *cmd;
4605
4606                 drbd_thread_current_set_cpu(&connection->receiver);
4607                 update_receiver_timing_details(connection, drbd_recv_header);
4608                 if (drbd_recv_header(connection, &pi))
4609                         goto err_out;
4610
4611                 cmd = &drbd_cmd_handler[pi.cmd];
4612                 if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
4613                         drbd_err(connection, "Unexpected data packet %s (0x%04x)",
4614                                  cmdname(pi.cmd), pi.cmd);
4615                         goto err_out;
4616                 }
4617
4618                 shs = cmd->pkt_size;
4619                 if (pi.size > shs && !cmd->expect_payload) {
4620                         drbd_err(connection, "No payload expected %s l:%d\n",
4621                                  cmdname(pi.cmd), pi.size);
4622                         goto err_out;
4623                 }
4624
4625                 if (shs) {
4626                         update_receiver_timing_details(connection, drbd_recv_all_warn);
4627                         err = drbd_recv_all_warn(connection, pi.data, shs);
4628                         if (err)
4629                                 goto err_out;
4630                         pi.size -= shs;
4631                 }
4632
4633                 update_receiver_timing_details(connection, cmd->fn);
4634                 err = cmd->fn(connection, &pi);
4635                 if (err) {
4636                         drbd_err(connection, "error receiving %s, e: %d l: %d!\n",
4637                                  cmdname(pi.cmd), err, pi.size);
4638                         goto err_out;
4639                 }
4640         }
4641         return;
4642
4643     err_out:
4644         conn_request_state(connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4645 }
4646
4647 static void conn_disconnect(struct drbd_connection *connection)
4648 {
4649         struct drbd_peer_device *peer_device;
4650         enum drbd_conns oc;
4651         int vnr;
4652
4653         if (connection->cstate == C_STANDALONE)
4654                 return;
4655
4656         /* We are about to start the cleanup after connection loss.
4657          * Make sure drbd_make_request knows about that.
4658          * Usually we should be in some network failure state already,
4659          * but just in case we are not, we fix it up here.
4660          */
4661         conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
4662
4663         /* asender does not clean up anything. it must not interfere, either */
4664         drbd_thread_stop(&connection->asender);
4665         drbd_free_sock(connection);
4666
4667         rcu_read_lock();
4668         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
4669                 struct drbd_device *device = peer_device->device;
4670                 kref_get(&device->kref);
4671                 rcu_read_unlock();
4672                 drbd_disconnected(peer_device);
4673                 kref_put(&device->kref, drbd_destroy_device);
4674                 rcu_read_lock();
4675         }
4676         rcu_read_unlock();
4677
4678         if (!list_empty(&connection->current_epoch->list))
4679                 drbd_err(connection, "ASSERTION FAILED: connection->current_epoch->list not empty\n");
4680         /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
4681         atomic_set(&connection->current_epoch->epoch_size, 0);
4682         connection->send.seen_any_write_yet = false;
4683
4684         drbd_info(connection, "Connection closed\n");
4685
4686         if (conn_highest_role(connection) == R_PRIMARY && conn_highest_pdsk(connection) >= D_UNKNOWN)
4687                 conn_try_outdate_peer_async(connection);
4688
4689         spin_lock_irq(&connection->resource->req_lock);
4690         oc = connection->cstate;
4691         if (oc >= C_UNCONNECTED)
4692                 _conn_request_state(connection, NS(conn, C_UNCONNECTED), CS_VERBOSE);
4693
4694         spin_unlock_irq(&connection->resource->req_lock);
4695
4696         if (oc == C_DISCONNECTING)
4697                 conn_request_state(connection, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD);
4698 }
4699
4700 static int drbd_disconnected(struct drbd_peer_device *peer_device)
4701 {
4702         struct drbd_device *device = peer_device->device;
4703         unsigned int i;
4704
4705         /* wait for current activity to cease. */
4706         spin_lock_irq(&device->resource->req_lock);
4707         _drbd_wait_ee_list_empty(device, &device->active_ee);
4708         _drbd_wait_ee_list_empty(device, &device->sync_ee);
4709         _drbd_wait_ee_list_empty(device, &device->read_ee);
4710         spin_unlock_irq(&device->resource->req_lock);
4711
4712         /* We do not have data structures that would allow us to
4713          * get the rs_pending_cnt down to 0 again.
4714          *  * On C_SYNC_TARGET we do not have any data structures describing
4715          *    the pending RSDataRequest's we have sent.
4716          *  * On C_SYNC_SOURCE there is no data structure that tracks
4717          *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
4718          *  And no, it is not the sum of the reference counts in the
4719          *  resync_LRU. The resync_LRU tracks the whole operation including
4720          *  the disk-IO, while the rs_pending_cnt only tracks the blocks
4721          *  on the fly. */
4722         drbd_rs_cancel_all(device);
4723         device->rs_total = 0;
4724         device->rs_failed = 0;
4725         atomic_set(&device->rs_pending_cnt, 0);
4726         wake_up(&device->misc_wait);
4727
4728         del_timer_sync(&device->resync_timer);
4729         resync_timer_fn((unsigned long)device);
4730
4731         /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
4732          * w_make_resync_request etc. which may still be on the worker queue
4733          * to be "canceled" */
4734         drbd_flush_workqueue(&peer_device->connection->sender_work);
4735
4736         drbd_finish_peer_reqs(device);
4737
4738         /* This second workqueue flush is necessary, since drbd_finish_peer_reqs()
4739            might have issued a work again. The one before drbd_finish_peer_reqs() is
4740            necessary to reclain net_ee in drbd_finish_peer_reqs(). */
4741         drbd_flush_workqueue(&peer_device->connection->sender_work);
4742
4743         /* need to do it again, drbd_finish_peer_reqs() may have populated it
4744          * again via drbd_try_clear_on_disk_bm(). */
4745         drbd_rs_cancel_all(device);
4746
4747         kfree(device->p_uuid);
4748         device->p_uuid = NULL;
4749
4750         if (!drbd_suspended(device))
4751                 tl_clear(peer_device->connection);
4752
4753         drbd_md_sync(device);
4754
4755         /* serialize with bitmap writeout triggered by the state change,
4756          * if any. */
4757         wait_event(device->misc_wait, !test_bit(BITMAP_IO, &device->flags));
4758
4759         /* tcp_close and release of sendpage pages can be deferred.  I don't
4760          * want to use SO_LINGER, because apparently it can be deferred for
4761          * more than 20 seconds (longest time I checked).
4762          *
4763          * Actually we don't care for exactly when the network stack does its
4764          * put_page(), but release our reference on these pages right here.
4765          */
4766         i = drbd_free_peer_reqs(device, &device->net_ee);
4767         if (i)
4768                 drbd_info(device, "net_ee not empty, killed %u entries\n", i);
4769         i = atomic_read(&device->pp_in_use_by_net);
4770         if (i)
4771                 drbd_info(device, "pp_in_use_by_net = %d, expected 0\n", i);
4772         i = atomic_read(&device->pp_in_use);
4773         if (i)
4774                 drbd_info(device, "pp_in_use = %d, expected 0\n", i);
4775
4776         D_ASSERT(device, list_empty(&device->read_ee));
4777         D_ASSERT(device, list_empty(&device->active_ee));
4778         D_ASSERT(device, list_empty(&device->sync_ee));
4779         D_ASSERT(device, list_empty(&device->done_ee));
4780
4781         return 0;
4782 }
4783
4784 /*
4785  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
4786  * we can agree on is stored in agreed_pro_version.
4787  *
4788  * feature flags and the reserved array should be enough room for future
4789  * enhancements of the handshake protocol, and possible plugins...
4790  *
4791  * for now, they are expected to be zero, but ignored.
4792  */
4793 static int drbd_send_features(struct drbd_connection *connection)
4794 {
4795         struct drbd_socket *sock;
4796         struct p_connection_features *p;
4797
4798         sock = &connection->data;
4799         p = conn_prepare_command(connection, sock);
4800         if (!p)
4801                 return -EIO;
4802         memset(p, 0, sizeof(*p));
4803         p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
4804         p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
4805         p->feature_flags = cpu_to_be32(PRO_FEATURES);
4806         return conn_send_command(connection, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
4807 }
4808
4809 /*
4810  * return values:
4811  *   1 yes, we have a valid connection
4812  *   0 oops, did not work out, please try again
4813  *  -1 peer talks different language,
4814  *     no point in trying again, please go standalone.
4815  */
4816 static int drbd_do_features(struct drbd_connection *connection)
4817 {
4818         /* ASSERT current == connection->receiver ... */
4819         struct p_connection_features *p;
4820         const int expect = sizeof(struct p_connection_features);
4821         struct packet_info pi;
4822         int err;
4823
4824         err = drbd_send_features(connection);
4825         if (err)
4826                 return 0;
4827
4828         err = drbd_recv_header(connection, &pi);
4829         if (err)
4830                 return 0;
4831
4832         if (pi.cmd != P_CONNECTION_FEATURES) {
4833                 drbd_err(connection, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
4834                          cmdname(pi.cmd), pi.cmd);
4835                 return -1;
4836         }
4837
4838         if (pi.size != expect) {
4839                 drbd_err(connection, "expected ConnectionFeatures length: %u, received: %u\n",
4840                      expect, pi.size);
4841                 return -1;
4842         }
4843
4844         p = pi.data;
4845         err = drbd_recv_all_warn(connection, p, expect);
4846         if (err)
4847                 return 0;
4848
4849         p->protocol_min = be32_to_cpu(p->protocol_min);
4850         p->protocol_max = be32_to_cpu(p->protocol_max);
4851         if (p->protocol_max == 0)
4852                 p->protocol_max = p->protocol_min;
4853
4854         if (PRO_VERSION_MAX < p->protocol_min ||
4855             PRO_VERSION_MIN > p->protocol_max)
4856                 goto incompat;
4857
4858         connection->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
4859         connection->agreed_features = PRO_FEATURES & be32_to_cpu(p->feature_flags);
4860
4861         drbd_info(connection, "Handshake successful: "
4862              "Agreed network protocol version %d\n", connection->agreed_pro_version);
4863
4864         drbd_info(connection, "Agreed to%ssupport TRIM on protocol level\n",
4865                   connection->agreed_features & FF_TRIM ? " " : " not ");
4866
4867         return 1;
4868
4869  incompat:
4870         drbd_err(connection, "incompatible DRBD dialects: "
4871             "I support %d-%d, peer supports %d-%d\n",
4872             PRO_VERSION_MIN, PRO_VERSION_MAX,
4873             p->protocol_min, p->protocol_max);
4874         return -1;
4875 }
4876
4877 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4878 static int drbd_do_auth(struct drbd_connection *connection)
4879 {
4880         drbd_err(connection, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4881         drbd_err(connection, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
4882         return -1;
4883 }
4884 #else
4885 #define CHALLENGE_LEN 64
4886
4887 /* Return value:
4888         1 - auth succeeded,
4889         0 - failed, try again (network error),
4890         -1 - auth failed, don't try again.
4891 */
4892
4893 static int drbd_do_auth(struct drbd_connection *connection)
4894 {
4895         struct drbd_socket *sock;
4896         char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
4897         struct scatterlist sg;
4898         char *response = NULL;
4899         char *right_response = NULL;
4900         char *peers_ch = NULL;
4901         unsigned int key_len;
4902         char secret[SHARED_SECRET_MAX]; /* 64 byte */
4903         unsigned int resp_size;
4904         struct hash_desc desc;
4905         struct packet_info pi;
4906         struct net_conf *nc;
4907         int err, rv;
4908
4909         /* FIXME: Put the challenge/response into the preallocated socket buffer.  */
4910
4911         rcu_read_lock();
4912         nc = rcu_dereference(connection->net_conf);
4913         key_len = strlen(nc->shared_secret);
4914         memcpy(secret, nc->shared_secret, key_len);
4915         rcu_read_unlock();
4916
4917         desc.tfm = connection->cram_hmac_tfm;
4918         desc.flags = 0;
4919
4920         rv = crypto_hash_setkey(connection->cram_hmac_tfm, (u8 *)secret, key_len);
4921         if (rv) {
4922                 drbd_err(connection, "crypto_hash_setkey() failed with %d\n", rv);
4923                 rv = -1;
4924                 goto fail;
4925         }
4926
4927         get_random_bytes(my_challenge, CHALLENGE_LEN);
4928
4929         sock = &connection->data;
4930         if (!conn_prepare_command(connection, sock)) {
4931                 rv = 0;
4932                 goto fail;
4933         }
4934         rv = !conn_send_command(connection, sock, P_AUTH_CHALLENGE, 0,
4935                                 my_challenge, CHALLENGE_LEN);
4936         if (!rv)
4937                 goto fail;
4938
4939         err = drbd_recv_header(connection, &pi);
4940         if (err) {
4941                 rv = 0;
4942                 goto fail;
4943         }
4944
4945         if (pi.cmd != P_AUTH_CHALLENGE) {
4946                 drbd_err(connection, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4947                          cmdname(pi.cmd), pi.cmd);
4948                 rv = 0;
4949                 goto fail;
4950         }
4951
4952         if (pi.size > CHALLENGE_LEN * 2) {
4953                 drbd_err(connection, "expected AuthChallenge payload too big.\n");
4954                 rv = -1;
4955                 goto fail;
4956         }
4957
4958         if (pi.size < CHALLENGE_LEN) {
4959                 drbd_err(connection, "AuthChallenge payload too small.\n");
4960                 rv = -1;
4961                 goto fail;
4962         }
4963
4964         peers_ch = kmalloc(pi.size, GFP_NOIO);
4965         if (peers_ch == NULL) {
4966                 drbd_err(connection, "kmalloc of peers_ch failed\n");
4967                 rv = -1;
4968                 goto fail;
4969         }
4970
4971         err = drbd_recv_all_warn(connection, peers_ch, pi.size);
4972         if (err) {
4973                 rv = 0;
4974                 goto fail;
4975         }
4976
4977         if (!memcmp(my_challenge, peers_ch, CHALLENGE_LEN)) {
4978                 drbd_err(connection, "Peer presented the same challenge!\n");
4979                 rv = -1;
4980                 goto fail;
4981         }
4982
4983         resp_size = crypto_hash_digestsize(connection->cram_hmac_tfm);
4984         response = kmalloc(resp_size, GFP_NOIO);
4985         if (response == NULL) {
4986                 drbd_err(connection, "kmalloc of response failed\n");
4987                 rv = -1;
4988                 goto fail;
4989         }
4990
4991         sg_init_table(&sg, 1);
4992         sg_set_buf(&sg, peers_ch, pi.size);
4993
4994         rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4995         if (rv) {
4996                 drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
4997                 rv = -1;
4998                 goto fail;
4999         }
5000
5001         if (!conn_prepare_command(connection, sock)) {
5002                 rv = 0;
5003                 goto fail;
5004         }
5005         rv = !conn_send_command(connection, sock, P_AUTH_RESPONSE, 0,
5006                                 response, resp_size);
5007         if (!rv)
5008                 goto fail;
5009
5010         err = drbd_recv_header(connection, &pi);
5011         if (err) {
5012                 rv = 0;
5013                 goto fail;
5014         }
5015
5016         if (pi.cmd != P_AUTH_RESPONSE) {
5017                 drbd_err(connection, "expected AuthResponse packet, received: %s (0x%04x)\n",
5018                          cmdname(pi.cmd), pi.cmd);
5019                 rv = 0;
5020                 goto fail;
5021         }
5022
5023         if (pi.size != resp_size) {
5024                 drbd_err(connection, "expected AuthResponse payload of wrong size\n");
5025                 rv = 0;
5026                 goto fail;
5027         }
5028
5029         err = drbd_recv_all_warn(connection, response , resp_size);
5030         if (err) {
5031                 rv = 0;
5032                 goto fail;
5033         }
5034
5035         right_response = kmalloc(resp_size, GFP_NOIO);
5036         if (right_response == NULL) {
5037                 drbd_err(connection, "kmalloc of right_response failed\n");
5038                 rv = -1;
5039                 goto fail;
5040         }
5041
5042         sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
5043
5044         rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
5045         if (rv) {
5046                 drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
5047                 rv = -1;
5048                 goto fail;
5049         }
5050
5051         rv = !memcmp(response, right_response, resp_size);
5052
5053         if (rv)
5054                 drbd_info(connection, "Peer authenticated using %d bytes HMAC\n",
5055                      resp_size);
5056         else
5057                 rv = -1;
5058
5059  fail:
5060         kfree(peers_ch);
5061         kfree(response);
5062         kfree(right_response);
5063
5064         return rv;
5065 }
5066 #endif
5067
5068 int drbd_receiver(struct drbd_thread *thi)
5069 {
5070         struct drbd_connection *connection = thi->connection;
5071         int h;
5072
5073         drbd_info(connection, "receiver (re)started\n");
5074
5075         do {
5076                 h = conn_connect(connection);
5077                 if (h == 0) {
5078                         conn_disconnect(connection);
5079                         schedule_timeout_interruptible(HZ);
5080                 }
5081                 if (h == -1) {
5082                         drbd_warn(connection, "Discarding network configuration.\n");
5083                         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
5084                 }
5085         } while (h == 0);
5086
5087         if (h > 0)
5088                 drbdd(connection);
5089
5090         conn_disconnect(connection);
5091
5092         drbd_info(connection, "receiver terminated\n");
5093         return 0;
5094 }
5095
5096 /* ********* acknowledge sender ******** */
5097
5098 static int got_conn_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
5099 {
5100         struct p_req_state_reply *p = pi->data;
5101         int retcode = be32_to_cpu(p->retcode);
5102
5103         if (retcode >= SS_SUCCESS) {
5104                 set_bit(CONN_WD_ST_CHG_OKAY, &connection->flags);
5105         } else {
5106                 set_bit(CONN_WD_ST_CHG_FAIL, &connection->flags);
5107                 drbd_err(connection, "Requested state change failed by peer: %s (%d)\n",
5108                          drbd_set_st_err_str(retcode), retcode);
5109         }
5110         wake_up(&connection->ping_wait);
5111
5112         return 0;
5113 }
5114
5115 static int got_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
5116 {
5117         struct drbd_peer_device *peer_device;
5118         struct drbd_device *device;
5119         struct p_req_state_reply *p = pi->data;
5120         int retcode = be32_to_cpu(p->retcode);
5121
5122         peer_device = conn_peer_device(connection, pi->vnr);
5123         if (!peer_device)
5124                 return -EIO;
5125         device = peer_device->device;
5126
5127         if (test_bit(CONN_WD_ST_CHG_REQ, &connection->flags)) {
5128                 D_ASSERT(device, connection->agreed_pro_version < 100);
5129                 return got_conn_RqSReply(connection, pi);
5130         }
5131
5132         if (retcode >= SS_SUCCESS) {
5133                 set_bit(CL_ST_CHG_SUCCESS, &device->flags);
5134         } else {
5135                 set_bit(CL_ST_CHG_FAIL, &device->flags);
5136                 drbd_err(device, "Requested state change failed by peer: %s (%d)\n",
5137                         drbd_set_st_err_str(retcode), retcode);
5138         }
5139         wake_up(&device->state_wait);
5140
5141         return 0;
5142 }
5143
5144 static int got_Ping(struct drbd_connection *connection, struct packet_info *pi)
5145 {
5146         return drbd_send_ping_ack(connection);
5147
5148 }
5149
5150 static int got_PingAck(struct drbd_connection *connection, struct packet_info *pi)
5151 {
5152         /* restore idle timeout */
5153         connection->meta.socket->sk->sk_rcvtimeo = connection->net_conf->ping_int*HZ;
5154         if (!test_and_set_bit(GOT_PING_ACK, &connection->flags))
5155                 wake_up(&connection->ping_wait);
5156
5157         return 0;
5158 }
5159
5160 static int got_IsInSync(struct drbd_connection *connection, struct packet_info *pi)
5161 {
5162         struct drbd_peer_device *peer_device;
5163         struct drbd_device *device;
5164         struct p_block_ack *p = pi->data;
5165         sector_t sector = be64_to_cpu(p->sector);
5166         int blksize = be32_to_cpu(p->blksize);
5167
5168         peer_device = conn_peer_device(connection, pi->vnr);
5169         if (!peer_device)
5170                 return -EIO;
5171         device = peer_device->device;
5172
5173         D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
5174
5175         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5176
5177         if (get_ldev(device)) {
5178                 drbd_rs_complete_io(device, sector);
5179                 drbd_set_in_sync(device, sector, blksize);
5180                 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
5181                 device->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
5182                 put_ldev(device);
5183         }
5184         dec_rs_pending(device);
5185         atomic_add(blksize >> 9, &device->rs_sect_in);
5186
5187         return 0;
5188 }
5189
5190 static int
5191 validate_req_change_req_state(struct drbd_device *device, u64 id, sector_t sector,
5192                               struct rb_root *root, const char *func,
5193                               enum drbd_req_event what, bool missing_ok)
5194 {
5195         struct drbd_request *req;
5196         struct bio_and_error m;
5197
5198         spin_lock_irq(&device->resource->req_lock);
5199         req = find_request(device, root, id, sector, missing_ok, func);
5200         if (unlikely(!req)) {
5201                 spin_unlock_irq(&device->resource->req_lock);
5202                 return -EIO;
5203         }
5204         __req_mod(req, what, &m);
5205         spin_unlock_irq(&device->resource->req_lock);
5206
5207         if (m.bio)
5208                 complete_master_bio(device, &m);
5209         return 0;
5210 }
5211
5212 static int got_BlockAck(struct drbd_connection *connection, struct packet_info *pi)
5213 {
5214         struct drbd_peer_device *peer_device;
5215         struct drbd_device *device;
5216         struct p_block_ack *p = pi->data;
5217         sector_t sector = be64_to_cpu(p->sector);
5218         int blksize = be32_to_cpu(p->blksize);
5219         enum drbd_req_event what;
5220
5221         peer_device = conn_peer_device(connection, pi->vnr);
5222         if (!peer_device)
5223                 return -EIO;
5224         device = peer_device->device;
5225
5226         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5227
5228         if (p->block_id == ID_SYNCER) {
5229                 drbd_set_in_sync(device, sector, blksize);
5230                 dec_rs_pending(device);
5231                 return 0;
5232         }
5233         switch (pi->cmd) {
5234         case P_RS_WRITE_ACK:
5235                 what = WRITE_ACKED_BY_PEER_AND_SIS;
5236                 break;
5237         case P_WRITE_ACK:
5238                 what = WRITE_ACKED_BY_PEER;
5239                 break;
5240         case P_RECV_ACK:
5241                 what = RECV_ACKED_BY_PEER;
5242                 break;
5243         case P_SUPERSEDED:
5244                 what = CONFLICT_RESOLVED;
5245                 break;
5246         case P_RETRY_WRITE:
5247                 what = POSTPONE_WRITE;
5248                 break;
5249         default:
5250                 BUG();
5251         }
5252
5253         return validate_req_change_req_state(device, p->block_id, sector,
5254                                              &device->write_requests, __func__,
5255                                              what, false);
5256 }
5257
5258 static int got_NegAck(struct drbd_connection *connection, struct packet_info *pi)
5259 {
5260         struct drbd_peer_device *peer_device;
5261         struct drbd_device *device;
5262         struct p_block_ack *p = pi->data;
5263         sector_t sector = be64_to_cpu(p->sector);
5264         int size = be32_to_cpu(p->blksize);
5265         int err;
5266
5267         peer_device = conn_peer_device(connection, pi->vnr);
5268         if (!peer_device)
5269                 return -EIO;
5270         device = peer_device->device;
5271
5272         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5273
5274         if (p->block_id == ID_SYNCER) {
5275                 dec_rs_pending(device);
5276                 drbd_rs_failed_io(device, sector, size);
5277                 return 0;
5278         }
5279
5280         err = validate_req_change_req_state(device, p->block_id, sector,
5281                                             &device->write_requests, __func__,
5282                                             NEG_ACKED, true);
5283         if (err) {
5284                 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
5285                    The master bio might already be completed, therefore the
5286                    request is no longer in the collision hash. */
5287                 /* In Protocol B we might already have got a P_RECV_ACK
5288                    but then get a P_NEG_ACK afterwards. */
5289                 drbd_set_out_of_sync(device, sector, size);
5290         }
5291         return 0;
5292 }
5293
5294 static int got_NegDReply(struct drbd_connection *connection, struct packet_info *pi)
5295 {
5296         struct drbd_peer_device *peer_device;
5297         struct drbd_device *device;
5298         struct p_block_ack *p = pi->data;
5299         sector_t sector = be64_to_cpu(p->sector);
5300
5301         peer_device = conn_peer_device(connection, pi->vnr);
5302         if (!peer_device)
5303                 return -EIO;
5304         device = peer_device->device;
5305
5306         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5307
5308         drbd_err(device, "Got NegDReply; Sector %llus, len %u.\n",
5309             (unsigned long long)sector, be32_to_cpu(p->blksize));
5310
5311         return validate_req_change_req_state(device, p->block_id, sector,
5312                                              &device->read_requests, __func__,
5313                                              NEG_ACKED, false);
5314 }
5315
5316 static int got_NegRSDReply(struct drbd_connection *connection, struct packet_info *pi)
5317 {
5318         struct drbd_peer_device *peer_device;
5319         struct drbd_device *device;
5320         sector_t sector;
5321         int size;
5322         struct p_block_ack *p = pi->data;
5323
5324         peer_device = conn_peer_device(connection, pi->vnr);
5325         if (!peer_device)
5326                 return -EIO;
5327         device = peer_device->device;
5328
5329         sector = be64_to_cpu(p->sector);
5330         size = be32_to_cpu(p->blksize);
5331
5332         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5333
5334         dec_rs_pending(device);
5335
5336         if (get_ldev_if_state(device, D_FAILED)) {
5337                 drbd_rs_complete_io(device, sector);
5338                 switch (pi->cmd) {
5339                 case P_NEG_RS_DREPLY:
5340                         drbd_rs_failed_io(device, sector, size);
5341                 case P_RS_CANCEL:
5342                         break;
5343                 default:
5344                         BUG();
5345                 }
5346                 put_ldev(device);
5347         }
5348
5349         return 0;
5350 }
5351
5352 static int got_BarrierAck(struct drbd_connection *connection, struct packet_info *pi)
5353 {
5354         struct p_barrier_ack *p = pi->data;
5355         struct drbd_peer_device *peer_device;
5356         int vnr;
5357
5358         tl_release(connection, p->barrier, be32_to_cpu(p->set_size));
5359
5360         rcu_read_lock();
5361         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5362                 struct drbd_device *device = peer_device->device;
5363
5364                 if (device->state.conn == C_AHEAD &&
5365                     atomic_read(&device->ap_in_flight) == 0 &&
5366                     !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &device->flags)) {
5367                         device->start_resync_timer.expires = jiffies + HZ;
5368                         add_timer(&device->start_resync_timer);
5369                 }
5370         }
5371         rcu_read_unlock();
5372
5373         return 0;
5374 }
5375
5376 static int got_OVResult(struct drbd_connection *connection, struct packet_info *pi)
5377 {
5378         struct drbd_peer_device *peer_device;
5379         struct drbd_device *device;
5380         struct p_block_ack *p = pi->data;
5381         struct drbd_device_work *dw;
5382         sector_t sector;
5383         int size;
5384
5385         peer_device = conn_peer_device(connection, pi->vnr);
5386         if (!peer_device)
5387                 return -EIO;
5388         device = peer_device->device;
5389
5390         sector = be64_to_cpu(p->sector);
5391         size = be32_to_cpu(p->blksize);
5392
5393         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5394
5395         if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
5396                 drbd_ov_out_of_sync_found(device, sector, size);
5397         else
5398                 ov_out_of_sync_print(device);
5399
5400         if (!get_ldev(device))
5401                 return 0;
5402
5403         drbd_rs_complete_io(device, sector);
5404         dec_rs_pending(device);
5405
5406         --device->ov_left;
5407
5408         /* let's advance progress step marks only for every other megabyte */
5409         if ((device->ov_left & 0x200) == 0x200)
5410                 drbd_advance_rs_marks(device, device->ov_left);
5411
5412         if (device->ov_left == 0) {
5413                 dw = kmalloc(sizeof(*dw), GFP_NOIO);
5414                 if (dw) {
5415                         dw->w.cb = w_ov_finished;
5416                         dw->device = device;
5417                         drbd_queue_work(&peer_device->connection->sender_work, &dw->w);
5418                 } else {
5419                         drbd_err(device, "kmalloc(dw) failed.");
5420                         ov_out_of_sync_print(device);
5421                         drbd_resync_finished(device);
5422                 }
5423         }
5424         put_ldev(device);
5425         return 0;
5426 }
5427
5428 static int got_skip(struct drbd_connection *connection, struct packet_info *pi)
5429 {
5430         return 0;
5431 }
5432
5433 static int connection_finish_peer_reqs(struct drbd_connection *connection)
5434 {
5435         struct drbd_peer_device *peer_device;
5436         int vnr, not_empty = 0;
5437
5438         do {
5439                 clear_bit(SIGNAL_ASENDER, &connection->flags);
5440                 flush_signals(current);
5441
5442                 rcu_read_lock();
5443                 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5444                         struct drbd_device *device = peer_device->device;
5445                         kref_get(&device->kref);
5446                         rcu_read_unlock();
5447                         if (drbd_finish_peer_reqs(device)) {
5448                                 kref_put(&device->kref, drbd_destroy_device);
5449                                 return 1;
5450                         }
5451                         kref_put(&device->kref, drbd_destroy_device);
5452                         rcu_read_lock();
5453                 }
5454                 set_bit(SIGNAL_ASENDER, &connection->flags);
5455
5456                 spin_lock_irq(&connection->resource->req_lock);
5457                 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5458                         struct drbd_device *device = peer_device->device;
5459                         not_empty = !list_empty(&device->done_ee);
5460                         if (not_empty)
5461                                 break;
5462                 }
5463                 spin_unlock_irq(&connection->resource->req_lock);
5464                 rcu_read_unlock();
5465         } while (not_empty);
5466
5467         return 0;
5468 }
5469
5470 struct asender_cmd {
5471         size_t pkt_size;
5472         int (*fn)(struct drbd_connection *connection, struct packet_info *);
5473 };
5474
5475 static struct asender_cmd asender_tbl[] = {
5476         [P_PING]            = { 0, got_Ping },
5477         [P_PING_ACK]        = { 0, got_PingAck },
5478         [P_RECV_ACK]        = { sizeof(struct p_block_ack), got_BlockAck },
5479         [P_WRITE_ACK]       = { sizeof(struct p_block_ack), got_BlockAck },
5480         [P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
5481         [P_SUPERSEDED]   = { sizeof(struct p_block_ack), got_BlockAck },
5482         [P_NEG_ACK]         = { sizeof(struct p_block_ack), got_NegAck },
5483         [P_NEG_DREPLY]      = { sizeof(struct p_block_ack), got_NegDReply },
5484         [P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply },
5485         [P_OV_RESULT]       = { sizeof(struct p_block_ack), got_OVResult },
5486         [P_BARRIER_ACK]     = { sizeof(struct p_barrier_ack), got_BarrierAck },
5487         [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
5488         [P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
5489         [P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
5490         [P_RS_CANCEL]       = { sizeof(struct p_block_ack), got_NegRSDReply },
5491         [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
5492         [P_RETRY_WRITE]     = { sizeof(struct p_block_ack), got_BlockAck },
5493 };
5494
5495 int drbd_asender(struct drbd_thread *thi)
5496 {
5497         struct drbd_connection *connection = thi->connection;
5498         struct asender_cmd *cmd = NULL;
5499         struct packet_info pi;
5500         int rv;
5501         void *buf    = connection->meta.rbuf;
5502         int received = 0;
5503         unsigned int header_size = drbd_header_size(connection);
5504         int expect   = header_size;
5505         bool ping_timeout_active = false;
5506         struct net_conf *nc;
5507         int ping_timeo, tcp_cork, ping_int;
5508         struct sched_param param = { .sched_priority = 2 };
5509
5510         rv = sched_setscheduler(current, SCHED_RR, &param);
5511         if (rv < 0)
5512                 drbd_err(connection, "drbd_asender: ERROR set priority, ret=%d\n", rv);
5513
5514         while (get_t_state(thi) == RUNNING) {
5515                 drbd_thread_current_set_cpu(thi);
5516
5517                 rcu_read_lock();
5518                 nc = rcu_dereference(connection->net_conf);
5519                 ping_timeo = nc->ping_timeo;
5520                 tcp_cork = nc->tcp_cork;
5521                 ping_int = nc->ping_int;
5522                 rcu_read_unlock();
5523
5524                 if (test_and_clear_bit(SEND_PING, &connection->flags)) {
5525                         if (drbd_send_ping(connection)) {
5526                                 drbd_err(connection, "drbd_send_ping has failed\n");
5527                                 goto reconnect;
5528                         }
5529                         connection->meta.socket->sk->sk_rcvtimeo = ping_timeo * HZ / 10;
5530                         ping_timeout_active = true;
5531                 }
5532
5533                 /* TODO: conditionally cork; it may hurt latency if we cork without
5534                    much to send */
5535                 if (tcp_cork)
5536                         drbd_tcp_cork(connection->meta.socket);
5537                 if (connection_finish_peer_reqs(connection)) {
5538                         drbd_err(connection, "connection_finish_peer_reqs() failed\n");
5539                         goto reconnect;
5540                 }
5541                 /* but unconditionally uncork unless disabled */
5542                 if (tcp_cork)
5543                         drbd_tcp_uncork(connection->meta.socket);
5544
5545                 /* short circuit, recv_msg would return EINTR anyways. */
5546                 if (signal_pending(current))
5547                         continue;
5548
5549                 rv = drbd_recv_short(connection->meta.socket, buf, expect-received, 0);
5550                 clear_bit(SIGNAL_ASENDER, &connection->flags);
5551
5552                 flush_signals(current);
5553
5554                 /* Note:
5555                  * -EINTR        (on meta) we got a signal
5556                  * -EAGAIN       (on meta) rcvtimeo expired
5557                  * -ECONNRESET   other side closed the connection
5558                  * -ERESTARTSYS  (on data) we got a signal
5559                  * rv <  0       other than above: unexpected error!
5560                  * rv == expected: full header or command
5561                  * rv <  expected: "woken" by signal during receive
5562                  * rv == 0       : "connection shut down by peer"
5563                  */
5564 received_more:
5565                 if (likely(rv > 0)) {
5566                         received += rv;
5567                         buf      += rv;
5568                 } else if (rv == 0) {
5569                         if (test_bit(DISCONNECT_SENT, &connection->flags)) {
5570                                 long t;
5571                                 rcu_read_lock();
5572                                 t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
5573                                 rcu_read_unlock();
5574
5575                                 t = wait_event_timeout(connection->ping_wait,
5576                                                        connection->cstate < C_WF_REPORT_PARAMS,
5577                                                        t);
5578                                 if (t)
5579                                         break;
5580                         }
5581                         drbd_err(connection, "meta connection shut down by peer.\n");
5582                         goto reconnect;
5583                 } else if (rv == -EAGAIN) {
5584                         /* If the data socket received something meanwhile,
5585                          * that is good enough: peer is still alive. */
5586                         if (time_after(connection->last_received,
5587                                 jiffies - connection->meta.socket->sk->sk_rcvtimeo))
5588                                 continue;
5589                         if (ping_timeout_active) {
5590                                 drbd_err(connection, "PingAck did not arrive in time.\n");
5591                                 goto reconnect;
5592                         }
5593                         set_bit(SEND_PING, &connection->flags);
5594                         continue;
5595                 } else if (rv == -EINTR) {
5596                         continue;
5597                 } else {
5598                         drbd_err(connection, "sock_recvmsg returned %d\n", rv);
5599                         goto reconnect;
5600                 }
5601
5602                 if (received == expect && cmd == NULL) {
5603                         if (decode_header(connection, connection->meta.rbuf, &pi))
5604                                 goto reconnect;
5605                         cmd = &asender_tbl[pi.cmd];
5606                         if (pi.cmd >= ARRAY_SIZE(asender_tbl) || !cmd->fn) {
5607                                 drbd_err(connection, "Unexpected meta packet %s (0x%04x)\n",
5608                                          cmdname(pi.cmd), pi.cmd);
5609                                 goto disconnect;
5610                         }
5611                         expect = header_size + cmd->pkt_size;
5612                         if (pi.size != expect - header_size) {
5613                                 drbd_err(connection, "Wrong packet size on meta (c: %d, l: %d)\n",
5614                                         pi.cmd, pi.size);
5615                                 goto reconnect;
5616                         }
5617                 }
5618                 if (received == expect) {
5619                         bool err;
5620
5621                         err = cmd->fn(connection, &pi);
5622                         if (err) {
5623                                 drbd_err(connection, "%pf failed\n", cmd->fn);
5624                                 goto reconnect;
5625                         }
5626
5627                         connection->last_received = jiffies;
5628
5629                         if (cmd == &asender_tbl[P_PING_ACK]) {
5630                                 /* restore idle timeout */
5631                                 connection->meta.socket->sk->sk_rcvtimeo = ping_int * HZ;
5632                                 ping_timeout_active = false;
5633                         }
5634
5635                         buf      = connection->meta.rbuf;
5636                         received = 0;
5637                         expect   = header_size;
5638                         cmd      = NULL;
5639                 }
5640                 if (test_bit(SEND_PING, &connection->flags))
5641                         continue;
5642                 rv = drbd_recv_short(connection->meta.socket, buf, expect-received, MSG_DONTWAIT);
5643                 if (rv > 0)
5644                         goto received_more;
5645         }
5646
5647         if (0) {
5648 reconnect:
5649                 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5650                 conn_md_sync(connection);
5651         }
5652         if (0) {
5653 disconnect:
5654                 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
5655         }
5656         clear_bit(SIGNAL_ASENDER, &connection->flags);
5657
5658         drbd_info(connection, "asender terminated\n");
5659
5660         return 0;
5661 }