These changes are the raw update to linux-4.4.6-rt14. Kernel sources
[kvmfornfv.git] / kernel / drivers / block / drbd / drbd_receiver.c
1 /*
2    drbd_receiver.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25
26 #include <linux/module.h>
27
28 #include <asm/uaccess.h>
29 #include <net/sock.h>
30
31 #include <linux/drbd.h>
32 #include <linux/fs.h>
33 #include <linux/file.h>
34 #include <linux/in.h>
35 #include <linux/mm.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/pkt_sched.h>
40 #define __KERNEL_SYSCALLS__
41 #include <linux/unistd.h>
42 #include <linux/vmalloc.h>
43 #include <linux/random.h>
44 #include <linux/string.h>
45 #include <linux/scatterlist.h>
46 #include "drbd_int.h"
47 #include "drbd_protocol.h"
48 #include "drbd_req.h"
49 #include "drbd_vli.h"
50
51 #define PRO_FEATURES (FF_TRIM)
52
53 struct packet_info {
54         enum drbd_packet cmd;
55         unsigned int size;
56         unsigned int vnr;
57         void *data;
58 };
59
60 enum finish_epoch {
61         FE_STILL_LIVE,
62         FE_DESTROYED,
63         FE_RECYCLED,
64 };
65
66 static int drbd_do_features(struct drbd_connection *connection);
67 static int drbd_do_auth(struct drbd_connection *connection);
68 static int drbd_disconnected(struct drbd_peer_device *);
69 static void conn_wait_active_ee_empty(struct drbd_connection *connection);
70 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *, struct drbd_epoch *, enum epoch_event);
71 static int e_end_block(struct drbd_work *, int);
72
73
74 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
75
76 /*
77  * some helper functions to deal with single linked page lists,
78  * page->private being our "next" pointer.
79  */
80
81 /* If at least n pages are linked at head, get n pages off.
82  * Otherwise, don't modify head, and return NULL.
83  * Locking is the responsibility of the caller.
84  */
85 static struct page *page_chain_del(struct page **head, int n)
86 {
87         struct page *page;
88         struct page *tmp;
89
90         BUG_ON(!n);
91         BUG_ON(!head);
92
93         page = *head;
94
95         if (!page)
96                 return NULL;
97
98         while (page) {
99                 tmp = page_chain_next(page);
100                 if (--n == 0)
101                         break; /* found sufficient pages */
102                 if (tmp == NULL)
103                         /* insufficient pages, don't use any of them. */
104                         return NULL;
105                 page = tmp;
106         }
107
108         /* add end of list marker for the returned list */
109         set_page_private(page, 0);
110         /* actual return value, and adjustment of head */
111         page = *head;
112         *head = tmp;
113         return page;
114 }
115
116 /* may be used outside of locks to find the tail of a (usually short)
117  * "private" page chain, before adding it back to a global chain head
118  * with page_chain_add() under a spinlock. */
119 static struct page *page_chain_tail(struct page *page, int *len)
120 {
121         struct page *tmp;
122         int i = 1;
123         while ((tmp = page_chain_next(page)))
124                 ++i, page = tmp;
125         if (len)
126                 *len = i;
127         return page;
128 }
129
130 static int page_chain_free(struct page *page)
131 {
132         struct page *tmp;
133         int i = 0;
134         page_chain_for_each_safe(page, tmp) {
135                 put_page(page);
136                 ++i;
137         }
138         return i;
139 }
140
141 static void page_chain_add(struct page **head,
142                 struct page *chain_first, struct page *chain_last)
143 {
144 #if 1
145         struct page *tmp;
146         tmp = page_chain_tail(chain_first, NULL);
147         BUG_ON(tmp != chain_last);
148 #endif
149
150         /* add chain to head */
151         set_page_private(chain_last, (unsigned long)*head);
152         *head = chain_first;
153 }
154
155 static struct page *__drbd_alloc_pages(struct drbd_device *device,
156                                        unsigned int number)
157 {
158         struct page *page = NULL;
159         struct page *tmp = NULL;
160         unsigned int i = 0;
161
162         /* Yes, testing drbd_pp_vacant outside the lock is racy.
163          * So what. It saves a spin_lock. */
164         if (drbd_pp_vacant >= number) {
165                 spin_lock(&drbd_pp_lock);
166                 page = page_chain_del(&drbd_pp_pool, number);
167                 if (page)
168                         drbd_pp_vacant -= number;
169                 spin_unlock(&drbd_pp_lock);
170                 if (page)
171                         return page;
172         }
173
174         /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
175          * "criss-cross" setup, that might cause write-out on some other DRBD,
176          * which in turn might block on the other node at this very place.  */
177         for (i = 0; i < number; i++) {
178                 tmp = alloc_page(GFP_TRY);
179                 if (!tmp)
180                         break;
181                 set_page_private(tmp, (unsigned long)page);
182                 page = tmp;
183         }
184
185         if (i == number)
186                 return page;
187
188         /* Not enough pages immediately available this time.
189          * No need to jump around here, drbd_alloc_pages will retry this
190          * function "soon". */
191         if (page) {
192                 tmp = page_chain_tail(page, NULL);
193                 spin_lock(&drbd_pp_lock);
194                 page_chain_add(&drbd_pp_pool, page, tmp);
195                 drbd_pp_vacant += i;
196                 spin_unlock(&drbd_pp_lock);
197         }
198         return NULL;
199 }
200
201 static void reclaim_finished_net_peer_reqs(struct drbd_device *device,
202                                            struct list_head *to_be_freed)
203 {
204         struct drbd_peer_request *peer_req, *tmp;
205
206         /* The EEs are always appended to the end of the list. Since
207            they are sent in order over the wire, they have to finish
208            in order. As soon as we see the first not finished we can
209            stop to examine the list... */
210
211         list_for_each_entry_safe(peer_req, tmp, &device->net_ee, w.list) {
212                 if (drbd_peer_req_has_active_page(peer_req))
213                         break;
214                 list_move(&peer_req->w.list, to_be_freed);
215         }
216 }
217
218 static void drbd_kick_lo_and_reclaim_net(struct drbd_device *device)
219 {
220         LIST_HEAD(reclaimed);
221         struct drbd_peer_request *peer_req, *t;
222
223         spin_lock_irq(&device->resource->req_lock);
224         reclaim_finished_net_peer_reqs(device, &reclaimed);
225         spin_unlock_irq(&device->resource->req_lock);
226
227         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
228                 drbd_free_net_peer_req(device, peer_req);
229 }
230
231 /**
232  * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
233  * @device:     DRBD device.
234  * @number:     number of pages requested
235  * @retry:      whether to retry, if not enough pages are available right now
236  *
237  * Tries to allocate number pages, first from our own page pool, then from
238  * the kernel.
239  * Possibly retry until DRBD frees sufficient pages somewhere else.
240  *
241  * If this allocation would exceed the max_buffers setting, we throttle
242  * allocation (schedule_timeout) to give the system some room to breathe.
243  *
244  * We do not use max-buffers as hard limit, because it could lead to
245  * congestion and further to a distributed deadlock during online-verify or
246  * (checksum based) resync, if the max-buffers, socket buffer sizes and
247  * resync-rate settings are mis-configured.
248  *
249  * Returns a page chain linked via page->private.
250  */
251 struct page *drbd_alloc_pages(struct drbd_peer_device *peer_device, unsigned int number,
252                               bool retry)
253 {
254         struct drbd_device *device = peer_device->device;
255         struct page *page = NULL;
256         struct net_conf *nc;
257         DEFINE_WAIT(wait);
258         unsigned int mxb;
259
260         rcu_read_lock();
261         nc = rcu_dereference(peer_device->connection->net_conf);
262         mxb = nc ? nc->max_buffers : 1000000;
263         rcu_read_unlock();
264
265         if (atomic_read(&device->pp_in_use) < mxb)
266                 page = __drbd_alloc_pages(device, number);
267
268         while (page == NULL) {
269                 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
270
271                 drbd_kick_lo_and_reclaim_net(device);
272
273                 if (atomic_read(&device->pp_in_use) < mxb) {
274                         page = __drbd_alloc_pages(device, number);
275                         if (page)
276                                 break;
277                 }
278
279                 if (!retry)
280                         break;
281
282                 if (signal_pending(current)) {
283                         drbd_warn(device, "drbd_alloc_pages interrupted!\n");
284                         break;
285                 }
286
287                 if (schedule_timeout(HZ/10) == 0)
288                         mxb = UINT_MAX;
289         }
290         finish_wait(&drbd_pp_wait, &wait);
291
292         if (page)
293                 atomic_add(number, &device->pp_in_use);
294         return page;
295 }
296
297 /* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
298  * Is also used from inside an other spin_lock_irq(&resource->req_lock);
299  * Either links the page chain back to the global pool,
300  * or returns all pages to the system. */
301 static void drbd_free_pages(struct drbd_device *device, struct page *page, int is_net)
302 {
303         atomic_t *a = is_net ? &device->pp_in_use_by_net : &device->pp_in_use;
304         int i;
305
306         if (page == NULL)
307                 return;
308
309         if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count)
310                 i = page_chain_free(page);
311         else {
312                 struct page *tmp;
313                 tmp = page_chain_tail(page, &i);
314                 spin_lock(&drbd_pp_lock);
315                 page_chain_add(&drbd_pp_pool, page, tmp);
316                 drbd_pp_vacant += i;
317                 spin_unlock(&drbd_pp_lock);
318         }
319         i = atomic_sub_return(i, a);
320         if (i < 0)
321                 drbd_warn(device, "ASSERTION FAILED: %s: %d < 0\n",
322                         is_net ? "pp_in_use_by_net" : "pp_in_use", i);
323         wake_up(&drbd_pp_wait);
324 }
325
326 /*
327 You need to hold the req_lock:
328  _drbd_wait_ee_list_empty()
329
330 You must not have the req_lock:
331  drbd_free_peer_req()
332  drbd_alloc_peer_req()
333  drbd_free_peer_reqs()
334  drbd_ee_fix_bhs()
335  drbd_finish_peer_reqs()
336  drbd_clear_done_ee()
337  drbd_wait_ee_list_empty()
338 */
339
340 struct drbd_peer_request *
341 drbd_alloc_peer_req(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
342                     unsigned int data_size, bool has_payload, gfp_t gfp_mask) __must_hold(local)
343 {
344         struct drbd_device *device = peer_device->device;
345         struct drbd_peer_request *peer_req;
346         struct page *page = NULL;
347         unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
348
349         if (drbd_insert_fault(device, DRBD_FAULT_AL_EE))
350                 return NULL;
351
352         peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
353         if (!peer_req) {
354                 if (!(gfp_mask & __GFP_NOWARN))
355                         drbd_err(device, "%s: allocation failed\n", __func__);
356                 return NULL;
357         }
358
359         if (has_payload && data_size) {
360                 page = drbd_alloc_pages(peer_device, nr_pages,
361                                         gfpflags_allow_blocking(gfp_mask));
362                 if (!page)
363                         goto fail;
364         }
365
366         memset(peer_req, 0, sizeof(*peer_req));
367         INIT_LIST_HEAD(&peer_req->w.list);
368         drbd_clear_interval(&peer_req->i);
369         peer_req->i.size = data_size;
370         peer_req->i.sector = sector;
371         peer_req->submit_jif = jiffies;
372         peer_req->peer_device = peer_device;
373         peer_req->pages = page;
374         /*
375          * The block_id is opaque to the receiver.  It is not endianness
376          * converted, and sent back to the sender unchanged.
377          */
378         peer_req->block_id = id;
379
380         return peer_req;
381
382  fail:
383         mempool_free(peer_req, drbd_ee_mempool);
384         return NULL;
385 }
386
387 void __drbd_free_peer_req(struct drbd_device *device, struct drbd_peer_request *peer_req,
388                        int is_net)
389 {
390         might_sleep();
391         if (peer_req->flags & EE_HAS_DIGEST)
392                 kfree(peer_req->digest);
393         drbd_free_pages(device, peer_req->pages, is_net);
394         D_ASSERT(device, atomic_read(&peer_req->pending_bios) == 0);
395         D_ASSERT(device, drbd_interval_empty(&peer_req->i));
396         if (!expect(!(peer_req->flags & EE_CALL_AL_COMPLETE_IO))) {
397                 peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
398                 drbd_al_complete_io(device, &peer_req->i);
399         }
400         mempool_free(peer_req, drbd_ee_mempool);
401 }
402
403 int drbd_free_peer_reqs(struct drbd_device *device, struct list_head *list)
404 {
405         LIST_HEAD(work_list);
406         struct drbd_peer_request *peer_req, *t;
407         int count = 0;
408         int is_net = list == &device->net_ee;
409
410         spin_lock_irq(&device->resource->req_lock);
411         list_splice_init(list, &work_list);
412         spin_unlock_irq(&device->resource->req_lock);
413
414         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
415                 __drbd_free_peer_req(device, peer_req, is_net);
416                 count++;
417         }
418         return count;
419 }
420
421 /*
422  * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
423  */
424 static int drbd_finish_peer_reqs(struct drbd_device *device)
425 {
426         LIST_HEAD(work_list);
427         LIST_HEAD(reclaimed);
428         struct drbd_peer_request *peer_req, *t;
429         int err = 0;
430
431         spin_lock_irq(&device->resource->req_lock);
432         reclaim_finished_net_peer_reqs(device, &reclaimed);
433         list_splice_init(&device->done_ee, &work_list);
434         spin_unlock_irq(&device->resource->req_lock);
435
436         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
437                 drbd_free_net_peer_req(device, peer_req);
438
439         /* possible callbacks here:
440          * e_end_block, and e_end_resync_block, e_send_superseded.
441          * all ignore the last argument.
442          */
443         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
444                 int err2;
445
446                 /* list_del not necessary, next/prev members not touched */
447                 err2 = peer_req->w.cb(&peer_req->w, !!err);
448                 if (!err)
449                         err = err2;
450                 drbd_free_peer_req(device, peer_req);
451         }
452         wake_up(&device->ee_wait);
453
454         return err;
455 }
456
457 static void _drbd_wait_ee_list_empty(struct drbd_device *device,
458                                      struct list_head *head)
459 {
460         DEFINE_WAIT(wait);
461
462         /* avoids spin_lock/unlock
463          * and calling prepare_to_wait in the fast path */
464         while (!list_empty(head)) {
465                 prepare_to_wait(&device->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
466                 spin_unlock_irq(&device->resource->req_lock);
467                 io_schedule();
468                 finish_wait(&device->ee_wait, &wait);
469                 spin_lock_irq(&device->resource->req_lock);
470         }
471 }
472
473 static void drbd_wait_ee_list_empty(struct drbd_device *device,
474                                     struct list_head *head)
475 {
476         spin_lock_irq(&device->resource->req_lock);
477         _drbd_wait_ee_list_empty(device, head);
478         spin_unlock_irq(&device->resource->req_lock);
479 }
480
481 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
482 {
483         struct kvec iov = {
484                 .iov_base = buf,
485                 .iov_len = size,
486         };
487         struct msghdr msg = {
488                 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
489         };
490         return kernel_recvmsg(sock, &msg, &iov, 1, size, msg.msg_flags);
491 }
492
493 static int drbd_recv(struct drbd_connection *connection, void *buf, size_t size)
494 {
495         int rv;
496
497         rv = drbd_recv_short(connection->data.socket, buf, size, 0);
498
499         if (rv < 0) {
500                 if (rv == -ECONNRESET)
501                         drbd_info(connection, "sock was reset by peer\n");
502                 else if (rv != -ERESTARTSYS)
503                         drbd_err(connection, "sock_recvmsg returned %d\n", rv);
504         } else if (rv == 0) {
505                 if (test_bit(DISCONNECT_SENT, &connection->flags)) {
506                         long t;
507                         rcu_read_lock();
508                         t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
509                         rcu_read_unlock();
510
511                         t = wait_event_timeout(connection->ping_wait, connection->cstate < C_WF_REPORT_PARAMS, t);
512
513                         if (t)
514                                 goto out;
515                 }
516                 drbd_info(connection, "sock was shut down by peer\n");
517         }
518
519         if (rv != size)
520                 conn_request_state(connection, NS(conn, C_BROKEN_PIPE), CS_HARD);
521
522 out:
523         return rv;
524 }
525
526 static int drbd_recv_all(struct drbd_connection *connection, void *buf, size_t size)
527 {
528         int err;
529
530         err = drbd_recv(connection, buf, size);
531         if (err != size) {
532                 if (err >= 0)
533                         err = -EIO;
534         } else
535                 err = 0;
536         return err;
537 }
538
539 static int drbd_recv_all_warn(struct drbd_connection *connection, void *buf, size_t size)
540 {
541         int err;
542
543         err = drbd_recv_all(connection, buf, size);
544         if (err && !signal_pending(current))
545                 drbd_warn(connection, "short read (expected size %d)\n", (int)size);
546         return err;
547 }
548
549 /* quoting tcp(7):
550  *   On individual connections, the socket buffer size must be set prior to the
551  *   listen(2) or connect(2) calls in order to have it take effect.
552  * This is our wrapper to do so.
553  */
554 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
555                 unsigned int rcv)
556 {
557         /* open coded SO_SNDBUF, SO_RCVBUF */
558         if (snd) {
559                 sock->sk->sk_sndbuf = snd;
560                 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
561         }
562         if (rcv) {
563                 sock->sk->sk_rcvbuf = rcv;
564                 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
565         }
566 }
567
568 static struct socket *drbd_try_connect(struct drbd_connection *connection)
569 {
570         const char *what;
571         struct socket *sock;
572         struct sockaddr_in6 src_in6;
573         struct sockaddr_in6 peer_in6;
574         struct net_conf *nc;
575         int err, peer_addr_len, my_addr_len;
576         int sndbuf_size, rcvbuf_size, connect_int;
577         int disconnect_on_error = 1;
578
579         rcu_read_lock();
580         nc = rcu_dereference(connection->net_conf);
581         if (!nc) {
582                 rcu_read_unlock();
583                 return NULL;
584         }
585         sndbuf_size = nc->sndbuf_size;
586         rcvbuf_size = nc->rcvbuf_size;
587         connect_int = nc->connect_int;
588         rcu_read_unlock();
589
590         my_addr_len = min_t(int, connection->my_addr_len, sizeof(src_in6));
591         memcpy(&src_in6, &connection->my_addr, my_addr_len);
592
593         if (((struct sockaddr *)&connection->my_addr)->sa_family == AF_INET6)
594                 src_in6.sin6_port = 0;
595         else
596                 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
597
598         peer_addr_len = min_t(int, connection->peer_addr_len, sizeof(src_in6));
599         memcpy(&peer_in6, &connection->peer_addr, peer_addr_len);
600
601         what = "sock_create_kern";
602         err = sock_create_kern(&init_net, ((struct sockaddr *)&src_in6)->sa_family,
603                                SOCK_STREAM, IPPROTO_TCP, &sock);
604         if (err < 0) {
605                 sock = NULL;
606                 goto out;
607         }
608
609         sock->sk->sk_rcvtimeo =
610         sock->sk->sk_sndtimeo = connect_int * HZ;
611         drbd_setbufsize(sock, sndbuf_size, rcvbuf_size);
612
613        /* explicitly bind to the configured IP as source IP
614         *  for the outgoing connections.
615         *  This is needed for multihomed hosts and to be
616         *  able to use lo: interfaces for drbd.
617         * Make sure to use 0 as port number, so linux selects
618         *  a free one dynamically.
619         */
620         what = "bind before connect";
621         err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len);
622         if (err < 0)
623                 goto out;
624
625         /* connect may fail, peer not yet available.
626          * stay C_WF_CONNECTION, don't go Disconnecting! */
627         disconnect_on_error = 0;
628         what = "connect";
629         err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0);
630
631 out:
632         if (err < 0) {
633                 if (sock) {
634                         sock_release(sock);
635                         sock = NULL;
636                 }
637                 switch (-err) {
638                         /* timeout, busy, signal pending */
639                 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
640                 case EINTR: case ERESTARTSYS:
641                         /* peer not (yet) available, network problem */
642                 case ECONNREFUSED: case ENETUNREACH:
643                 case EHOSTDOWN:    case EHOSTUNREACH:
644                         disconnect_on_error = 0;
645                         break;
646                 default:
647                         drbd_err(connection, "%s failed, err = %d\n", what, err);
648                 }
649                 if (disconnect_on_error)
650                         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
651         }
652
653         return sock;
654 }
655
656 struct accept_wait_data {
657         struct drbd_connection *connection;
658         struct socket *s_listen;
659         struct completion door_bell;
660         void (*original_sk_state_change)(struct sock *sk);
661
662 };
663
664 static void drbd_incoming_connection(struct sock *sk)
665 {
666         struct accept_wait_data *ad = sk->sk_user_data;
667         void (*state_change)(struct sock *sk);
668
669         state_change = ad->original_sk_state_change;
670         if (sk->sk_state == TCP_ESTABLISHED)
671                 complete(&ad->door_bell);
672         state_change(sk);
673 }
674
675 static int prepare_listen_socket(struct drbd_connection *connection, struct accept_wait_data *ad)
676 {
677         int err, sndbuf_size, rcvbuf_size, my_addr_len;
678         struct sockaddr_in6 my_addr;
679         struct socket *s_listen;
680         struct net_conf *nc;
681         const char *what;
682
683         rcu_read_lock();
684         nc = rcu_dereference(connection->net_conf);
685         if (!nc) {
686                 rcu_read_unlock();
687                 return -EIO;
688         }
689         sndbuf_size = nc->sndbuf_size;
690         rcvbuf_size = nc->rcvbuf_size;
691         rcu_read_unlock();
692
693         my_addr_len = min_t(int, connection->my_addr_len, sizeof(struct sockaddr_in6));
694         memcpy(&my_addr, &connection->my_addr, my_addr_len);
695
696         what = "sock_create_kern";
697         err = sock_create_kern(&init_net, ((struct sockaddr *)&my_addr)->sa_family,
698                                SOCK_STREAM, IPPROTO_TCP, &s_listen);
699         if (err) {
700                 s_listen = NULL;
701                 goto out;
702         }
703
704         s_listen->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
705         drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size);
706
707         what = "bind before listen";
708         err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len);
709         if (err < 0)
710                 goto out;
711
712         ad->s_listen = s_listen;
713         write_lock_bh(&s_listen->sk->sk_callback_lock);
714         ad->original_sk_state_change = s_listen->sk->sk_state_change;
715         s_listen->sk->sk_state_change = drbd_incoming_connection;
716         s_listen->sk->sk_user_data = ad;
717         write_unlock_bh(&s_listen->sk->sk_callback_lock);
718
719         what = "listen";
720         err = s_listen->ops->listen(s_listen, 5);
721         if (err < 0)
722                 goto out;
723
724         return 0;
725 out:
726         if (s_listen)
727                 sock_release(s_listen);
728         if (err < 0) {
729                 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
730                         drbd_err(connection, "%s failed, err = %d\n", what, err);
731                         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
732                 }
733         }
734
735         return -EIO;
736 }
737
738 static void unregister_state_change(struct sock *sk, struct accept_wait_data *ad)
739 {
740         write_lock_bh(&sk->sk_callback_lock);
741         sk->sk_state_change = ad->original_sk_state_change;
742         sk->sk_user_data = NULL;
743         write_unlock_bh(&sk->sk_callback_lock);
744 }
745
746 static struct socket *drbd_wait_for_connect(struct drbd_connection *connection, struct accept_wait_data *ad)
747 {
748         int timeo, connect_int, err = 0;
749         struct socket *s_estab = NULL;
750         struct net_conf *nc;
751
752         rcu_read_lock();
753         nc = rcu_dereference(connection->net_conf);
754         if (!nc) {
755                 rcu_read_unlock();
756                 return NULL;
757         }
758         connect_int = nc->connect_int;
759         rcu_read_unlock();
760
761         timeo = connect_int * HZ;
762         /* 28.5% random jitter */
763         timeo += (prandom_u32() & 1) ? timeo / 7 : -timeo / 7;
764
765         err = wait_for_completion_interruptible_timeout(&ad->door_bell, timeo);
766         if (err <= 0)
767                 return NULL;
768
769         err = kernel_accept(ad->s_listen, &s_estab, 0);
770         if (err < 0) {
771                 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
772                         drbd_err(connection, "accept failed, err = %d\n", err);
773                         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
774                 }
775         }
776
777         if (s_estab)
778                 unregister_state_change(s_estab->sk, ad);
779
780         return s_estab;
781 }
782
783 static int decode_header(struct drbd_connection *, void *, struct packet_info *);
784
785 static int send_first_packet(struct drbd_connection *connection, struct drbd_socket *sock,
786                              enum drbd_packet cmd)
787 {
788         if (!conn_prepare_command(connection, sock))
789                 return -EIO;
790         return conn_send_command(connection, sock, cmd, 0, NULL, 0);
791 }
792
793 static int receive_first_packet(struct drbd_connection *connection, struct socket *sock)
794 {
795         unsigned int header_size = drbd_header_size(connection);
796         struct packet_info pi;
797         struct net_conf *nc;
798         int err;
799
800         rcu_read_lock();
801         nc = rcu_dereference(connection->net_conf);
802         if (!nc) {
803                 rcu_read_unlock();
804                 return -EIO;
805         }
806         sock->sk->sk_rcvtimeo = nc->ping_timeo * 4 * HZ / 10;
807         rcu_read_unlock();
808
809         err = drbd_recv_short(sock, connection->data.rbuf, header_size, 0);
810         if (err != header_size) {
811                 if (err >= 0)
812                         err = -EIO;
813                 return err;
814         }
815         err = decode_header(connection, connection->data.rbuf, &pi);
816         if (err)
817                 return err;
818         return pi.cmd;
819 }
820
821 /**
822  * drbd_socket_okay() - Free the socket if its connection is not okay
823  * @sock:       pointer to the pointer to the socket.
824  */
825 static bool drbd_socket_okay(struct socket **sock)
826 {
827         int rr;
828         char tb[4];
829
830         if (!*sock)
831                 return false;
832
833         rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
834
835         if (rr > 0 || rr == -EAGAIN) {
836                 return true;
837         } else {
838                 sock_release(*sock);
839                 *sock = NULL;
840                 return false;
841         }
842 }
843
844 static bool connection_established(struct drbd_connection *connection,
845                                    struct socket **sock1,
846                                    struct socket **sock2)
847 {
848         struct net_conf *nc;
849         int timeout;
850         bool ok;
851
852         if (!*sock1 || !*sock2)
853                 return false;
854
855         rcu_read_lock();
856         nc = rcu_dereference(connection->net_conf);
857         timeout = (nc->sock_check_timeo ?: nc->ping_timeo) * HZ / 10;
858         rcu_read_unlock();
859         schedule_timeout_interruptible(timeout);
860
861         ok = drbd_socket_okay(sock1);
862         ok = drbd_socket_okay(sock2) && ok;
863
864         return ok;
865 }
866
867 /* Gets called if a connection is established, or if a new minor gets created
868    in a connection */
869 int drbd_connected(struct drbd_peer_device *peer_device)
870 {
871         struct drbd_device *device = peer_device->device;
872         int err;
873
874         atomic_set(&device->packet_seq, 0);
875         device->peer_seq = 0;
876
877         device->state_mutex = peer_device->connection->agreed_pro_version < 100 ?
878                 &peer_device->connection->cstate_mutex :
879                 &device->own_state_mutex;
880
881         err = drbd_send_sync_param(peer_device);
882         if (!err)
883                 err = drbd_send_sizes(peer_device, 0, 0);
884         if (!err)
885                 err = drbd_send_uuids(peer_device);
886         if (!err)
887                 err = drbd_send_current_state(peer_device);
888         clear_bit(USE_DEGR_WFC_T, &device->flags);
889         clear_bit(RESIZE_PENDING, &device->flags);
890         atomic_set(&device->ap_in_flight, 0);
891         mod_timer(&device->request_timer, jiffies + HZ); /* just start it here. */
892         return err;
893 }
894
895 /*
896  * return values:
897  *   1 yes, we have a valid connection
898  *   0 oops, did not work out, please try again
899  *  -1 peer talks different language,
900  *     no point in trying again, please go standalone.
901  *  -2 We do not have a network config...
902  */
903 static int conn_connect(struct drbd_connection *connection)
904 {
905         struct drbd_socket sock, msock;
906         struct drbd_peer_device *peer_device;
907         struct net_conf *nc;
908         int vnr, timeout, h;
909         bool discard_my_data, ok;
910         enum drbd_state_rv rv;
911         struct accept_wait_data ad = {
912                 .connection = connection,
913                 .door_bell = COMPLETION_INITIALIZER_ONSTACK(ad.door_bell),
914         };
915
916         clear_bit(DISCONNECT_SENT, &connection->flags);
917         if (conn_request_state(connection, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
918                 return -2;
919
920         mutex_init(&sock.mutex);
921         sock.sbuf = connection->data.sbuf;
922         sock.rbuf = connection->data.rbuf;
923         sock.socket = NULL;
924         mutex_init(&msock.mutex);
925         msock.sbuf = connection->meta.sbuf;
926         msock.rbuf = connection->meta.rbuf;
927         msock.socket = NULL;
928
929         /* Assume that the peer only understands protocol 80 until we know better.  */
930         connection->agreed_pro_version = 80;
931
932         if (prepare_listen_socket(connection, &ad))
933                 return 0;
934
935         do {
936                 struct socket *s;
937
938                 s = drbd_try_connect(connection);
939                 if (s) {
940                         if (!sock.socket) {
941                                 sock.socket = s;
942                                 send_first_packet(connection, &sock, P_INITIAL_DATA);
943                         } else if (!msock.socket) {
944                                 clear_bit(RESOLVE_CONFLICTS, &connection->flags);
945                                 msock.socket = s;
946                                 send_first_packet(connection, &msock, P_INITIAL_META);
947                         } else {
948                                 drbd_err(connection, "Logic error in conn_connect()\n");
949                                 goto out_release_sockets;
950                         }
951                 }
952
953                 if (connection_established(connection, &sock.socket, &msock.socket))
954                         break;
955
956 retry:
957                 s = drbd_wait_for_connect(connection, &ad);
958                 if (s) {
959                         int fp = receive_first_packet(connection, s);
960                         drbd_socket_okay(&sock.socket);
961                         drbd_socket_okay(&msock.socket);
962                         switch (fp) {
963                         case P_INITIAL_DATA:
964                                 if (sock.socket) {
965                                         drbd_warn(connection, "initial packet S crossed\n");
966                                         sock_release(sock.socket);
967                                         sock.socket = s;
968                                         goto randomize;
969                                 }
970                                 sock.socket = s;
971                                 break;
972                         case P_INITIAL_META:
973                                 set_bit(RESOLVE_CONFLICTS, &connection->flags);
974                                 if (msock.socket) {
975                                         drbd_warn(connection, "initial packet M crossed\n");
976                                         sock_release(msock.socket);
977                                         msock.socket = s;
978                                         goto randomize;
979                                 }
980                                 msock.socket = s;
981                                 break;
982                         default:
983                                 drbd_warn(connection, "Error receiving initial packet\n");
984                                 sock_release(s);
985 randomize:
986                                 if (prandom_u32() & 1)
987                                         goto retry;
988                         }
989                 }
990
991                 if (connection->cstate <= C_DISCONNECTING)
992                         goto out_release_sockets;
993                 if (signal_pending(current)) {
994                         flush_signals(current);
995                         smp_rmb();
996                         if (get_t_state(&connection->receiver) == EXITING)
997                                 goto out_release_sockets;
998                 }
999
1000                 ok = connection_established(connection, &sock.socket, &msock.socket);
1001         } while (!ok);
1002
1003         if (ad.s_listen)
1004                 sock_release(ad.s_listen);
1005
1006         sock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
1007         msock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
1008
1009         sock.socket->sk->sk_allocation = GFP_NOIO;
1010         msock.socket->sk->sk_allocation = GFP_NOIO;
1011
1012         sock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
1013         msock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE;
1014
1015         /* NOT YET ...
1016          * sock.socket->sk->sk_sndtimeo = connection->net_conf->timeout*HZ/10;
1017          * sock.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1018          * first set it to the P_CONNECTION_FEATURES timeout,
1019          * which we set to 4x the configured ping_timeout. */
1020         rcu_read_lock();
1021         nc = rcu_dereference(connection->net_conf);
1022
1023         sock.socket->sk->sk_sndtimeo =
1024         sock.socket->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10;
1025
1026         msock.socket->sk->sk_rcvtimeo = nc->ping_int*HZ;
1027         timeout = nc->timeout * HZ / 10;
1028         discard_my_data = nc->discard_my_data;
1029         rcu_read_unlock();
1030
1031         msock.socket->sk->sk_sndtimeo = timeout;
1032
1033         /* we don't want delays.
1034          * we use TCP_CORK where appropriate, though */
1035         drbd_tcp_nodelay(sock.socket);
1036         drbd_tcp_nodelay(msock.socket);
1037
1038         connection->data.socket = sock.socket;
1039         connection->meta.socket = msock.socket;
1040         connection->last_received = jiffies;
1041
1042         h = drbd_do_features(connection);
1043         if (h <= 0)
1044                 return h;
1045
1046         if (connection->cram_hmac_tfm) {
1047                 /* drbd_request_state(device, NS(conn, WFAuth)); */
1048                 switch (drbd_do_auth(connection)) {
1049                 case -1:
1050                         drbd_err(connection, "Authentication of peer failed\n");
1051                         return -1;
1052                 case 0:
1053                         drbd_err(connection, "Authentication of peer failed, trying again.\n");
1054                         return 0;
1055                 }
1056         }
1057
1058         connection->data.socket->sk->sk_sndtimeo = timeout;
1059         connection->data.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1060
1061         if (drbd_send_protocol(connection) == -EOPNOTSUPP)
1062                 return -1;
1063
1064         /* Prevent a race between resync-handshake and
1065          * being promoted to Primary.
1066          *
1067          * Grab and release the state mutex, so we know that any current
1068          * drbd_set_role() is finished, and any incoming drbd_set_role
1069          * will see the STATE_SENT flag, and wait for it to be cleared.
1070          */
1071         idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1072                 mutex_lock(peer_device->device->state_mutex);
1073
1074         set_bit(STATE_SENT, &connection->flags);
1075
1076         idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1077                 mutex_unlock(peer_device->device->state_mutex);
1078
1079         rcu_read_lock();
1080         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1081                 struct drbd_device *device = peer_device->device;
1082                 kref_get(&device->kref);
1083                 rcu_read_unlock();
1084
1085                 if (discard_my_data)
1086                         set_bit(DISCARD_MY_DATA, &device->flags);
1087                 else
1088                         clear_bit(DISCARD_MY_DATA, &device->flags);
1089
1090                 drbd_connected(peer_device);
1091                 kref_put(&device->kref, drbd_destroy_device);
1092                 rcu_read_lock();
1093         }
1094         rcu_read_unlock();
1095
1096         rv = conn_request_state(connection, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE);
1097         if (rv < SS_SUCCESS || connection->cstate != C_WF_REPORT_PARAMS) {
1098                 clear_bit(STATE_SENT, &connection->flags);
1099                 return 0;
1100         }
1101
1102         drbd_thread_start(&connection->asender);
1103
1104         mutex_lock(&connection->resource->conf_update);
1105         /* The discard_my_data flag is a single-shot modifier to the next
1106          * connection attempt, the handshake of which is now well underway.
1107          * No need for rcu style copying of the whole struct
1108          * just to clear a single value. */
1109         connection->net_conf->discard_my_data = 0;
1110         mutex_unlock(&connection->resource->conf_update);
1111
1112         return h;
1113
1114 out_release_sockets:
1115         if (ad.s_listen)
1116                 sock_release(ad.s_listen);
1117         if (sock.socket)
1118                 sock_release(sock.socket);
1119         if (msock.socket)
1120                 sock_release(msock.socket);
1121         return -1;
1122 }
1123
1124 static int decode_header(struct drbd_connection *connection, void *header, struct packet_info *pi)
1125 {
1126         unsigned int header_size = drbd_header_size(connection);
1127
1128         if (header_size == sizeof(struct p_header100) &&
1129             *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
1130                 struct p_header100 *h = header;
1131                 if (h->pad != 0) {
1132                         drbd_err(connection, "Header padding is not zero\n");
1133                         return -EINVAL;
1134                 }
1135                 pi->vnr = be16_to_cpu(h->volume);
1136                 pi->cmd = be16_to_cpu(h->command);
1137                 pi->size = be32_to_cpu(h->length);
1138         } else if (header_size == sizeof(struct p_header95) &&
1139                    *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
1140                 struct p_header95 *h = header;
1141                 pi->cmd = be16_to_cpu(h->command);
1142                 pi->size = be32_to_cpu(h->length);
1143                 pi->vnr = 0;
1144         } else if (header_size == sizeof(struct p_header80) &&
1145                    *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1146                 struct p_header80 *h = header;
1147                 pi->cmd = be16_to_cpu(h->command);
1148                 pi->size = be16_to_cpu(h->length);
1149                 pi->vnr = 0;
1150         } else {
1151                 drbd_err(connection, "Wrong magic value 0x%08x in protocol version %d\n",
1152                          be32_to_cpu(*(__be32 *)header),
1153                          connection->agreed_pro_version);
1154                 return -EINVAL;
1155         }
1156         pi->data = header + header_size;
1157         return 0;
1158 }
1159
1160 static int drbd_recv_header(struct drbd_connection *connection, struct packet_info *pi)
1161 {
1162         void *buffer = connection->data.rbuf;
1163         int err;
1164
1165         err = drbd_recv_all_warn(connection, buffer, drbd_header_size(connection));
1166         if (err)
1167                 return err;
1168
1169         err = decode_header(connection, buffer, pi);
1170         connection->last_received = jiffies;
1171
1172         return err;
1173 }
1174
1175 static void drbd_flush(struct drbd_connection *connection)
1176 {
1177         int rv;
1178         struct drbd_peer_device *peer_device;
1179         int vnr;
1180
1181         if (connection->resource->write_ordering >= WO_bdev_flush) {
1182                 rcu_read_lock();
1183                 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1184                         struct drbd_device *device = peer_device->device;
1185
1186                         if (!get_ldev(device))
1187                                 continue;
1188                         kref_get(&device->kref);
1189                         rcu_read_unlock();
1190
1191                         /* Right now, we have only this one synchronous code path
1192                          * for flushes between request epochs.
1193                          * We may want to make those asynchronous,
1194                          * or at least parallelize the flushes to the volume devices.
1195                          */
1196                         device->flush_jif = jiffies;
1197                         set_bit(FLUSH_PENDING, &device->flags);
1198                         rv = blkdev_issue_flush(device->ldev->backing_bdev,
1199                                         GFP_NOIO, NULL);
1200                         clear_bit(FLUSH_PENDING, &device->flags);
1201                         if (rv) {
1202                                 drbd_info(device, "local disk flush failed with status %d\n", rv);
1203                                 /* would rather check on EOPNOTSUPP, but that is not reliable.
1204                                  * don't try again for ANY return value != 0
1205                                  * if (rv == -EOPNOTSUPP) */
1206                                 drbd_bump_write_ordering(connection->resource, NULL, WO_drain_io);
1207                         }
1208                         put_ldev(device);
1209                         kref_put(&device->kref, drbd_destroy_device);
1210
1211                         rcu_read_lock();
1212                         if (rv)
1213                                 break;
1214                 }
1215                 rcu_read_unlock();
1216         }
1217 }
1218
1219 /**
1220  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1221  * @device:     DRBD device.
1222  * @epoch:      Epoch object.
1223  * @ev:         Epoch event.
1224  */
1225 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *connection,
1226                                                struct drbd_epoch *epoch,
1227                                                enum epoch_event ev)
1228 {
1229         int epoch_size;
1230         struct drbd_epoch *next_epoch;
1231         enum finish_epoch rv = FE_STILL_LIVE;
1232
1233         spin_lock(&connection->epoch_lock);
1234         do {
1235                 next_epoch = NULL;
1236
1237                 epoch_size = atomic_read(&epoch->epoch_size);
1238
1239                 switch (ev & ~EV_CLEANUP) {
1240                 case EV_PUT:
1241                         atomic_dec(&epoch->active);
1242                         break;
1243                 case EV_GOT_BARRIER_NR:
1244                         set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1245                         break;
1246                 case EV_BECAME_LAST:
1247                         /* nothing to do*/
1248                         break;
1249                 }
1250
1251                 if (epoch_size != 0 &&
1252                     atomic_read(&epoch->active) == 0 &&
1253                     (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) {
1254                         if (!(ev & EV_CLEANUP)) {
1255                                 spin_unlock(&connection->epoch_lock);
1256                                 drbd_send_b_ack(epoch->connection, epoch->barrier_nr, epoch_size);
1257                                 spin_lock(&connection->epoch_lock);
1258                         }
1259 #if 0
1260                         /* FIXME: dec unacked on connection, once we have
1261                          * something to count pending connection packets in. */
1262                         if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags))
1263                                 dec_unacked(epoch->connection);
1264 #endif
1265
1266                         if (connection->current_epoch != epoch) {
1267                                 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1268                                 list_del(&epoch->list);
1269                                 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1270                                 connection->epochs--;
1271                                 kfree(epoch);
1272
1273                                 if (rv == FE_STILL_LIVE)
1274                                         rv = FE_DESTROYED;
1275                         } else {
1276                                 epoch->flags = 0;
1277                                 atomic_set(&epoch->epoch_size, 0);
1278                                 /* atomic_set(&epoch->active, 0); is already zero */
1279                                 if (rv == FE_STILL_LIVE)
1280                                         rv = FE_RECYCLED;
1281                         }
1282                 }
1283
1284                 if (!next_epoch)
1285                         break;
1286
1287                 epoch = next_epoch;
1288         } while (1);
1289
1290         spin_unlock(&connection->epoch_lock);
1291
1292         return rv;
1293 }
1294
1295 static enum write_ordering_e
1296 max_allowed_wo(struct drbd_backing_dev *bdev, enum write_ordering_e wo)
1297 {
1298         struct disk_conf *dc;
1299
1300         dc = rcu_dereference(bdev->disk_conf);
1301
1302         if (wo == WO_bdev_flush && !dc->disk_flushes)
1303                 wo = WO_drain_io;
1304         if (wo == WO_drain_io && !dc->disk_drain)
1305                 wo = WO_none;
1306
1307         return wo;
1308 }
1309
1310 /**
1311  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1312  * @connection: DRBD connection.
1313  * @wo:         Write ordering method to try.
1314  */
1315 void drbd_bump_write_ordering(struct drbd_resource *resource, struct drbd_backing_dev *bdev,
1316                               enum write_ordering_e wo)
1317 {
1318         struct drbd_device *device;
1319         enum write_ordering_e pwo;
1320         int vnr;
1321         static char *write_ordering_str[] = {
1322                 [WO_none] = "none",
1323                 [WO_drain_io] = "drain",
1324                 [WO_bdev_flush] = "flush",
1325         };
1326
1327         pwo = resource->write_ordering;
1328         if (wo != WO_bdev_flush)
1329                 wo = min(pwo, wo);
1330         rcu_read_lock();
1331         idr_for_each_entry(&resource->devices, device, vnr) {
1332                 if (get_ldev(device)) {
1333                         wo = max_allowed_wo(device->ldev, wo);
1334                         if (device->ldev == bdev)
1335                                 bdev = NULL;
1336                         put_ldev(device);
1337                 }
1338         }
1339
1340         if (bdev)
1341                 wo = max_allowed_wo(bdev, wo);
1342
1343         rcu_read_unlock();
1344
1345         resource->write_ordering = wo;
1346         if (pwo != resource->write_ordering || wo == WO_bdev_flush)
1347                 drbd_info(resource, "Method to ensure write ordering: %s\n", write_ordering_str[resource->write_ordering]);
1348 }
1349
1350 /**
1351  * drbd_submit_peer_request()
1352  * @device:     DRBD device.
1353  * @peer_req:   peer request
1354  * @rw:         flag field, see bio->bi_rw
1355  *
1356  * May spread the pages to multiple bios,
1357  * depending on bio_add_page restrictions.
1358  *
1359  * Returns 0 if all bios have been submitted,
1360  * -ENOMEM if we could not allocate enough bios,
1361  * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1362  *  single page to an empty bio (which should never happen and likely indicates
1363  *  that the lower level IO stack is in some way broken). This has been observed
1364  *  on certain Xen deployments.
1365  */
1366 /* TODO allocate from our own bio_set. */
1367 int drbd_submit_peer_request(struct drbd_device *device,
1368                              struct drbd_peer_request *peer_req,
1369                              const unsigned rw, const int fault_type)
1370 {
1371         struct bio *bios = NULL;
1372         struct bio *bio;
1373         struct page *page = peer_req->pages;
1374         sector_t sector = peer_req->i.sector;
1375         unsigned data_size = peer_req->i.size;
1376         unsigned n_bios = 0;
1377         unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
1378         int err = -ENOMEM;
1379
1380         if (peer_req->flags & EE_IS_TRIM_USE_ZEROOUT) {
1381                 /* wait for all pending IO completions, before we start
1382                  * zeroing things out. */
1383                 conn_wait_active_ee_empty(first_peer_device(device)->connection);
1384                 /* add it to the active list now,
1385                  * so we can find it to present it in debugfs */
1386                 peer_req->submit_jif = jiffies;
1387                 peer_req->flags |= EE_SUBMITTED;
1388                 spin_lock_irq(&device->resource->req_lock);
1389                 list_add_tail(&peer_req->w.list, &device->active_ee);
1390                 spin_unlock_irq(&device->resource->req_lock);
1391                 if (blkdev_issue_zeroout(device->ldev->backing_bdev,
1392                         sector, data_size >> 9, GFP_NOIO, false))
1393                         peer_req->flags |= EE_WAS_ERROR;
1394                 drbd_endio_write_sec_final(peer_req);
1395                 return 0;
1396         }
1397
1398         /* Discards don't have any payload.
1399          * But the scsi layer still expects a bio_vec it can use internally,
1400          * see sd_setup_discard_cmnd() and blk_add_request_payload(). */
1401         if (peer_req->flags & EE_IS_TRIM)
1402                 nr_pages = 1;
1403
1404         /* In most cases, we will only need one bio.  But in case the lower
1405          * level restrictions happen to be different at this offset on this
1406          * side than those of the sending peer, we may need to submit the
1407          * request in more than one bio.
1408          *
1409          * Plain bio_alloc is good enough here, this is no DRBD internally
1410          * generated bio, but a bio allocated on behalf of the peer.
1411          */
1412 next_bio:
1413         bio = bio_alloc(GFP_NOIO, nr_pages);
1414         if (!bio) {
1415                 drbd_err(device, "submit_ee: Allocation of a bio failed (nr_pages=%u)\n", nr_pages);
1416                 goto fail;
1417         }
1418         /* > peer_req->i.sector, unless this is the first bio */
1419         bio->bi_iter.bi_sector = sector;
1420         bio->bi_bdev = device->ldev->backing_bdev;
1421         bio->bi_rw = rw;
1422         bio->bi_private = peer_req;
1423         bio->bi_end_io = drbd_peer_request_endio;
1424
1425         bio->bi_next = bios;
1426         bios = bio;
1427         ++n_bios;
1428
1429         if (rw & REQ_DISCARD) {
1430                 bio->bi_iter.bi_size = data_size;
1431                 goto submit;
1432         }
1433
1434         page_chain_for_each(page) {
1435                 unsigned len = min_t(unsigned, data_size, PAGE_SIZE);
1436                 if (!bio_add_page(bio, page, len, 0)) {
1437                         /* A single page must always be possible!
1438                          * But in case it fails anyways,
1439                          * we deal with it, and complain (below). */
1440                         if (bio->bi_vcnt == 0) {
1441                                 drbd_err(device,
1442                                         "bio_add_page failed for len=%u, "
1443                                         "bi_vcnt=0 (bi_sector=%llu)\n",
1444                                         len, (uint64_t)bio->bi_iter.bi_sector);
1445                                 err = -ENOSPC;
1446                                 goto fail;
1447                         }
1448                         goto next_bio;
1449                 }
1450                 data_size -= len;
1451                 sector += len >> 9;
1452                 --nr_pages;
1453         }
1454         D_ASSERT(device, data_size == 0);
1455 submit:
1456         D_ASSERT(device, page == NULL);
1457
1458         atomic_set(&peer_req->pending_bios, n_bios);
1459         /* for debugfs: update timestamp, mark as submitted */
1460         peer_req->submit_jif = jiffies;
1461         peer_req->flags |= EE_SUBMITTED;
1462         do {
1463                 bio = bios;
1464                 bios = bios->bi_next;
1465                 bio->bi_next = NULL;
1466
1467                 drbd_generic_make_request(device, fault_type, bio);
1468         } while (bios);
1469         return 0;
1470
1471 fail:
1472         while (bios) {
1473                 bio = bios;
1474                 bios = bios->bi_next;
1475                 bio_put(bio);
1476         }
1477         return err;
1478 }
1479
1480 static void drbd_remove_epoch_entry_interval(struct drbd_device *device,
1481                                              struct drbd_peer_request *peer_req)
1482 {
1483         struct drbd_interval *i = &peer_req->i;
1484
1485         drbd_remove_interval(&device->write_requests, i);
1486         drbd_clear_interval(i);
1487
1488         /* Wake up any processes waiting for this peer request to complete.  */
1489         if (i->waiting)
1490                 wake_up(&device->misc_wait);
1491 }
1492
1493 static void conn_wait_active_ee_empty(struct drbd_connection *connection)
1494 {
1495         struct drbd_peer_device *peer_device;
1496         int vnr;
1497
1498         rcu_read_lock();
1499         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1500                 struct drbd_device *device = peer_device->device;
1501
1502                 kref_get(&device->kref);
1503                 rcu_read_unlock();
1504                 drbd_wait_ee_list_empty(device, &device->active_ee);
1505                 kref_put(&device->kref, drbd_destroy_device);
1506                 rcu_read_lock();
1507         }
1508         rcu_read_unlock();
1509 }
1510
1511 static struct drbd_peer_device *
1512 conn_peer_device(struct drbd_connection *connection, int volume_number)
1513 {
1514         return idr_find(&connection->peer_devices, volume_number);
1515 }
1516
1517 static int receive_Barrier(struct drbd_connection *connection, struct packet_info *pi)
1518 {
1519         int rv;
1520         struct p_barrier *p = pi->data;
1521         struct drbd_epoch *epoch;
1522
1523         /* FIXME these are unacked on connection,
1524          * not a specific (peer)device.
1525          */
1526         connection->current_epoch->barrier_nr = p->barrier;
1527         connection->current_epoch->connection = connection;
1528         rv = drbd_may_finish_epoch(connection, connection->current_epoch, EV_GOT_BARRIER_NR);
1529
1530         /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1531          * the activity log, which means it would not be resynced in case the
1532          * R_PRIMARY crashes now.
1533          * Therefore we must send the barrier_ack after the barrier request was
1534          * completed. */
1535         switch (connection->resource->write_ordering) {
1536         case WO_none:
1537                 if (rv == FE_RECYCLED)
1538                         return 0;
1539
1540                 /* receiver context, in the writeout path of the other node.
1541                  * avoid potential distributed deadlock */
1542                 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1543                 if (epoch)
1544                         break;
1545                 else
1546                         drbd_warn(connection, "Allocation of an epoch failed, slowing down\n");
1547                         /* Fall through */
1548
1549         case WO_bdev_flush:
1550         case WO_drain_io:
1551                 conn_wait_active_ee_empty(connection);
1552                 drbd_flush(connection);
1553
1554                 if (atomic_read(&connection->current_epoch->epoch_size)) {
1555                         epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1556                         if (epoch)
1557                                 break;
1558                 }
1559
1560                 return 0;
1561         default:
1562                 drbd_err(connection, "Strangeness in connection->write_ordering %d\n",
1563                          connection->resource->write_ordering);
1564                 return -EIO;
1565         }
1566
1567         epoch->flags = 0;
1568         atomic_set(&epoch->epoch_size, 0);
1569         atomic_set(&epoch->active, 0);
1570
1571         spin_lock(&connection->epoch_lock);
1572         if (atomic_read(&connection->current_epoch->epoch_size)) {
1573                 list_add(&epoch->list, &connection->current_epoch->list);
1574                 connection->current_epoch = epoch;
1575                 connection->epochs++;
1576         } else {
1577                 /* The current_epoch got recycled while we allocated this one... */
1578                 kfree(epoch);
1579         }
1580         spin_unlock(&connection->epoch_lock);
1581
1582         return 0;
1583 }
1584
1585 /* used from receive_RSDataReply (recv_resync_read)
1586  * and from receive_Data */
1587 static struct drbd_peer_request *
1588 read_in_block(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
1589               struct packet_info *pi) __must_hold(local)
1590 {
1591         struct drbd_device *device = peer_device->device;
1592         const sector_t capacity = drbd_get_capacity(device->this_bdev);
1593         struct drbd_peer_request *peer_req;
1594         struct page *page;
1595         int digest_size, err;
1596         unsigned int data_size = pi->size, ds;
1597         void *dig_in = peer_device->connection->int_dig_in;
1598         void *dig_vv = peer_device->connection->int_dig_vv;
1599         unsigned long *data;
1600         struct p_trim *trim = (pi->cmd == P_TRIM) ? pi->data : NULL;
1601
1602         digest_size = 0;
1603         if (!trim && peer_device->connection->peer_integrity_tfm) {
1604                 digest_size = crypto_hash_digestsize(peer_device->connection->peer_integrity_tfm);
1605                 /*
1606                  * FIXME: Receive the incoming digest into the receive buffer
1607                  *        here, together with its struct p_data?
1608                  */
1609                 err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size);
1610                 if (err)
1611                         return NULL;
1612                 data_size -= digest_size;
1613         }
1614
1615         if (trim) {
1616                 D_ASSERT(peer_device, data_size == 0);
1617                 data_size = be32_to_cpu(trim->size);
1618         }
1619
1620         if (!expect(IS_ALIGNED(data_size, 512)))
1621                 return NULL;
1622         /* prepare for larger trim requests. */
1623         if (!trim && !expect(data_size <= DRBD_MAX_BIO_SIZE))
1624                 return NULL;
1625
1626         /* even though we trust out peer,
1627          * we sometimes have to double check. */
1628         if (sector + (data_size>>9) > capacity) {
1629                 drbd_err(device, "request from peer beyond end of local disk: "
1630                         "capacity: %llus < sector: %llus + size: %u\n",
1631                         (unsigned long long)capacity,
1632                         (unsigned long long)sector, data_size);
1633                 return NULL;
1634         }
1635
1636         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1637          * "criss-cross" setup, that might cause write-out on some other DRBD,
1638          * which in turn might block on the other node at this very place.  */
1639         peer_req = drbd_alloc_peer_req(peer_device, id, sector, data_size, trim == NULL, GFP_NOIO);
1640         if (!peer_req)
1641                 return NULL;
1642
1643         peer_req->flags |= EE_WRITE;
1644         if (trim)
1645                 return peer_req;
1646
1647         ds = data_size;
1648         page = peer_req->pages;
1649         page_chain_for_each(page) {
1650                 unsigned len = min_t(int, ds, PAGE_SIZE);
1651                 data = kmap(page);
1652                 err = drbd_recv_all_warn(peer_device->connection, data, len);
1653                 if (drbd_insert_fault(device, DRBD_FAULT_RECEIVE)) {
1654                         drbd_err(device, "Fault injection: Corrupting data on receive\n");
1655                         data[0] = data[0] ^ (unsigned long)-1;
1656                 }
1657                 kunmap(page);
1658                 if (err) {
1659                         drbd_free_peer_req(device, peer_req);
1660                         return NULL;
1661                 }
1662                 ds -= len;
1663         }
1664
1665         if (digest_size) {
1666                 drbd_csum_ee(peer_device->connection->peer_integrity_tfm, peer_req, dig_vv);
1667                 if (memcmp(dig_in, dig_vv, digest_size)) {
1668                         drbd_err(device, "Digest integrity check FAILED: %llus +%u\n",
1669                                 (unsigned long long)sector, data_size);
1670                         drbd_free_peer_req(device, peer_req);
1671                         return NULL;
1672                 }
1673         }
1674         device->recv_cnt += data_size >> 9;
1675         return peer_req;
1676 }
1677
1678 /* drbd_drain_block() just takes a data block
1679  * out of the socket input buffer, and discards it.
1680  */
1681 static int drbd_drain_block(struct drbd_peer_device *peer_device, int data_size)
1682 {
1683         struct page *page;
1684         int err = 0;
1685         void *data;
1686
1687         if (!data_size)
1688                 return 0;
1689
1690         page = drbd_alloc_pages(peer_device, 1, 1);
1691
1692         data = kmap(page);
1693         while (data_size) {
1694                 unsigned int len = min_t(int, data_size, PAGE_SIZE);
1695
1696                 err = drbd_recv_all_warn(peer_device->connection, data, len);
1697                 if (err)
1698                         break;
1699                 data_size -= len;
1700         }
1701         kunmap(page);
1702         drbd_free_pages(peer_device->device, page, 0);
1703         return err;
1704 }
1705
1706 static int recv_dless_read(struct drbd_peer_device *peer_device, struct drbd_request *req,
1707                            sector_t sector, int data_size)
1708 {
1709         struct bio_vec bvec;
1710         struct bvec_iter iter;
1711         struct bio *bio;
1712         int digest_size, err, expect;
1713         void *dig_in = peer_device->connection->int_dig_in;
1714         void *dig_vv = peer_device->connection->int_dig_vv;
1715
1716         digest_size = 0;
1717         if (peer_device->connection->peer_integrity_tfm) {
1718                 digest_size = crypto_hash_digestsize(peer_device->connection->peer_integrity_tfm);
1719                 err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size);
1720                 if (err)
1721                         return err;
1722                 data_size -= digest_size;
1723         }
1724
1725         /* optimistically update recv_cnt.  if receiving fails below,
1726          * we disconnect anyways, and counters will be reset. */
1727         peer_device->device->recv_cnt += data_size>>9;
1728
1729         bio = req->master_bio;
1730         D_ASSERT(peer_device->device, sector == bio->bi_iter.bi_sector);
1731
1732         bio_for_each_segment(bvec, bio, iter) {
1733                 void *mapped = kmap(bvec.bv_page) + bvec.bv_offset;
1734                 expect = min_t(int, data_size, bvec.bv_len);
1735                 err = drbd_recv_all_warn(peer_device->connection, mapped, expect);
1736                 kunmap(bvec.bv_page);
1737                 if (err)
1738                         return err;
1739                 data_size -= expect;
1740         }
1741
1742         if (digest_size) {
1743                 drbd_csum_bio(peer_device->connection->peer_integrity_tfm, bio, dig_vv);
1744                 if (memcmp(dig_in, dig_vv, digest_size)) {
1745                         drbd_err(peer_device, "Digest integrity check FAILED. Broken NICs?\n");
1746                         return -EINVAL;
1747                 }
1748         }
1749
1750         D_ASSERT(peer_device->device, data_size == 0);
1751         return 0;
1752 }
1753
1754 /*
1755  * e_end_resync_block() is called in asender context via
1756  * drbd_finish_peer_reqs().
1757  */
1758 static int e_end_resync_block(struct drbd_work *w, int unused)
1759 {
1760         struct drbd_peer_request *peer_req =
1761                 container_of(w, struct drbd_peer_request, w);
1762         struct drbd_peer_device *peer_device = peer_req->peer_device;
1763         struct drbd_device *device = peer_device->device;
1764         sector_t sector = peer_req->i.sector;
1765         int err;
1766
1767         D_ASSERT(device, drbd_interval_empty(&peer_req->i));
1768
1769         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1770                 drbd_set_in_sync(device, sector, peer_req->i.size);
1771                 err = drbd_send_ack(peer_device, P_RS_WRITE_ACK, peer_req);
1772         } else {
1773                 /* Record failure to sync */
1774                 drbd_rs_failed_io(device, sector, peer_req->i.size);
1775
1776                 err  = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
1777         }
1778         dec_unacked(device);
1779
1780         return err;
1781 }
1782
1783 static int recv_resync_read(struct drbd_peer_device *peer_device, sector_t sector,
1784                             struct packet_info *pi) __releases(local)
1785 {
1786         struct drbd_device *device = peer_device->device;
1787         struct drbd_peer_request *peer_req;
1788
1789         peer_req = read_in_block(peer_device, ID_SYNCER, sector, pi);
1790         if (!peer_req)
1791                 goto fail;
1792
1793         dec_rs_pending(device);
1794
1795         inc_unacked(device);
1796         /* corresponding dec_unacked() in e_end_resync_block()
1797          * respective _drbd_clear_done_ee */
1798
1799         peer_req->w.cb = e_end_resync_block;
1800         peer_req->submit_jif = jiffies;
1801
1802         spin_lock_irq(&device->resource->req_lock);
1803         list_add_tail(&peer_req->w.list, &device->sync_ee);
1804         spin_unlock_irq(&device->resource->req_lock);
1805
1806         atomic_add(pi->size >> 9, &device->rs_sect_ev);
1807         if (drbd_submit_peer_request(device, peer_req, WRITE, DRBD_FAULT_RS_WR) == 0)
1808                 return 0;
1809
1810         /* don't care for the reason here */
1811         drbd_err(device, "submit failed, triggering re-connect\n");
1812         spin_lock_irq(&device->resource->req_lock);
1813         list_del(&peer_req->w.list);
1814         spin_unlock_irq(&device->resource->req_lock);
1815
1816         drbd_free_peer_req(device, peer_req);
1817 fail:
1818         put_ldev(device);
1819         return -EIO;
1820 }
1821
1822 static struct drbd_request *
1823 find_request(struct drbd_device *device, struct rb_root *root, u64 id,
1824              sector_t sector, bool missing_ok, const char *func)
1825 {
1826         struct drbd_request *req;
1827
1828         /* Request object according to our peer */
1829         req = (struct drbd_request *)(unsigned long)id;
1830         if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
1831                 return req;
1832         if (!missing_ok) {
1833                 drbd_err(device, "%s: failed to find request 0x%lx, sector %llus\n", func,
1834                         (unsigned long)id, (unsigned long long)sector);
1835         }
1836         return NULL;
1837 }
1838
1839 static int receive_DataReply(struct drbd_connection *connection, struct packet_info *pi)
1840 {
1841         struct drbd_peer_device *peer_device;
1842         struct drbd_device *device;
1843         struct drbd_request *req;
1844         sector_t sector;
1845         int err;
1846         struct p_data *p = pi->data;
1847
1848         peer_device = conn_peer_device(connection, pi->vnr);
1849         if (!peer_device)
1850                 return -EIO;
1851         device = peer_device->device;
1852
1853         sector = be64_to_cpu(p->sector);
1854
1855         spin_lock_irq(&device->resource->req_lock);
1856         req = find_request(device, &device->read_requests, p->block_id, sector, false, __func__);
1857         spin_unlock_irq(&device->resource->req_lock);
1858         if (unlikely(!req))
1859                 return -EIO;
1860
1861         /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
1862          * special casing it there for the various failure cases.
1863          * still no race with drbd_fail_pending_reads */
1864         err = recv_dless_read(peer_device, req, sector, pi->size);
1865         if (!err)
1866                 req_mod(req, DATA_RECEIVED);
1867         /* else: nothing. handled from drbd_disconnect...
1868          * I don't think we may complete this just yet
1869          * in case we are "on-disconnect: freeze" */
1870
1871         return err;
1872 }
1873
1874 static int receive_RSDataReply(struct drbd_connection *connection, struct packet_info *pi)
1875 {
1876         struct drbd_peer_device *peer_device;
1877         struct drbd_device *device;
1878         sector_t sector;
1879         int err;
1880         struct p_data *p = pi->data;
1881
1882         peer_device = conn_peer_device(connection, pi->vnr);
1883         if (!peer_device)
1884                 return -EIO;
1885         device = peer_device->device;
1886
1887         sector = be64_to_cpu(p->sector);
1888         D_ASSERT(device, p->block_id == ID_SYNCER);
1889
1890         if (get_ldev(device)) {
1891                 /* data is submitted to disk within recv_resync_read.
1892                  * corresponding put_ldev done below on error,
1893                  * or in drbd_peer_request_endio. */
1894                 err = recv_resync_read(peer_device, sector, pi);
1895         } else {
1896                 if (__ratelimit(&drbd_ratelimit_state))
1897                         drbd_err(device, "Can not write resync data to local disk.\n");
1898
1899                 err = drbd_drain_block(peer_device, pi->size);
1900
1901                 drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
1902         }
1903
1904         atomic_add(pi->size >> 9, &device->rs_sect_in);
1905
1906         return err;
1907 }
1908
1909 static void restart_conflicting_writes(struct drbd_device *device,
1910                                        sector_t sector, int size)
1911 {
1912         struct drbd_interval *i;
1913         struct drbd_request *req;
1914
1915         drbd_for_each_overlap(i, &device->write_requests, sector, size) {
1916                 if (!i->local)
1917                         continue;
1918                 req = container_of(i, struct drbd_request, i);
1919                 if (req->rq_state & RQ_LOCAL_PENDING ||
1920                     !(req->rq_state & RQ_POSTPONED))
1921                         continue;
1922                 /* as it is RQ_POSTPONED, this will cause it to
1923                  * be queued on the retry workqueue. */
1924                 __req_mod(req, CONFLICT_RESOLVED, NULL);
1925         }
1926 }
1927
1928 /*
1929  * e_end_block() is called in asender context via drbd_finish_peer_reqs().
1930  */
1931 static int e_end_block(struct drbd_work *w, int cancel)
1932 {
1933         struct drbd_peer_request *peer_req =
1934                 container_of(w, struct drbd_peer_request, w);
1935         struct drbd_peer_device *peer_device = peer_req->peer_device;
1936         struct drbd_device *device = peer_device->device;
1937         sector_t sector = peer_req->i.sector;
1938         int err = 0, pcmd;
1939
1940         if (peer_req->flags & EE_SEND_WRITE_ACK) {
1941                 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1942                         pcmd = (device->state.conn >= C_SYNC_SOURCE &&
1943                                 device->state.conn <= C_PAUSED_SYNC_T &&
1944                                 peer_req->flags & EE_MAY_SET_IN_SYNC) ?
1945                                 P_RS_WRITE_ACK : P_WRITE_ACK;
1946                         err = drbd_send_ack(peer_device, pcmd, peer_req);
1947                         if (pcmd == P_RS_WRITE_ACK)
1948                                 drbd_set_in_sync(device, sector, peer_req->i.size);
1949                 } else {
1950                         err = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
1951                         /* we expect it to be marked out of sync anyways...
1952                          * maybe assert this?  */
1953                 }
1954                 dec_unacked(device);
1955         }
1956
1957         /* we delete from the conflict detection hash _after_ we sent out the
1958          * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
1959         if (peer_req->flags & EE_IN_INTERVAL_TREE) {
1960                 spin_lock_irq(&device->resource->req_lock);
1961                 D_ASSERT(device, !drbd_interval_empty(&peer_req->i));
1962                 drbd_remove_epoch_entry_interval(device, peer_req);
1963                 if (peer_req->flags & EE_RESTART_REQUESTS)
1964                         restart_conflicting_writes(device, sector, peer_req->i.size);
1965                 spin_unlock_irq(&device->resource->req_lock);
1966         } else
1967                 D_ASSERT(device, drbd_interval_empty(&peer_req->i));
1968
1969         drbd_may_finish_epoch(first_peer_device(device)->connection, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1970
1971         return err;
1972 }
1973
1974 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
1975 {
1976         struct drbd_peer_request *peer_req =
1977                 container_of(w, struct drbd_peer_request, w);
1978         struct drbd_peer_device *peer_device = peer_req->peer_device;
1979         int err;
1980
1981         err = drbd_send_ack(peer_device, ack, peer_req);
1982         dec_unacked(peer_device->device);
1983
1984         return err;
1985 }
1986
1987 static int e_send_superseded(struct drbd_work *w, int unused)
1988 {
1989         return e_send_ack(w, P_SUPERSEDED);
1990 }
1991
1992 static int e_send_retry_write(struct drbd_work *w, int unused)
1993 {
1994         struct drbd_peer_request *peer_req =
1995                 container_of(w, struct drbd_peer_request, w);
1996         struct drbd_connection *connection = peer_req->peer_device->connection;
1997
1998         return e_send_ack(w, connection->agreed_pro_version >= 100 ?
1999                              P_RETRY_WRITE : P_SUPERSEDED);
2000 }
2001
2002 static bool seq_greater(u32 a, u32 b)
2003 {
2004         /*
2005          * We assume 32-bit wrap-around here.
2006          * For 24-bit wrap-around, we would have to shift:
2007          *  a <<= 8; b <<= 8;
2008          */
2009         return (s32)a - (s32)b > 0;
2010 }
2011
2012 static u32 seq_max(u32 a, u32 b)
2013 {
2014         return seq_greater(a, b) ? a : b;
2015 }
2016
2017 static void update_peer_seq(struct drbd_peer_device *peer_device, unsigned int peer_seq)
2018 {
2019         struct drbd_device *device = peer_device->device;
2020         unsigned int newest_peer_seq;
2021
2022         if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)) {
2023                 spin_lock(&device->peer_seq_lock);
2024                 newest_peer_seq = seq_max(device->peer_seq, peer_seq);
2025                 device->peer_seq = newest_peer_seq;
2026                 spin_unlock(&device->peer_seq_lock);
2027                 /* wake up only if we actually changed device->peer_seq */
2028                 if (peer_seq == newest_peer_seq)
2029                         wake_up(&device->seq_wait);
2030         }
2031 }
2032
2033 static inline int overlaps(sector_t s1, int l1, sector_t s2, int l2)
2034 {
2035         return !((s1 + (l1>>9) <= s2) || (s1 >= s2 + (l2>>9)));
2036 }
2037
2038 /* maybe change sync_ee into interval trees as well? */
2039 static bool overlapping_resync_write(struct drbd_device *device, struct drbd_peer_request *peer_req)
2040 {
2041         struct drbd_peer_request *rs_req;
2042         bool rv = 0;
2043
2044         spin_lock_irq(&device->resource->req_lock);
2045         list_for_each_entry(rs_req, &device->sync_ee, w.list) {
2046                 if (overlaps(peer_req->i.sector, peer_req->i.size,
2047                              rs_req->i.sector, rs_req->i.size)) {
2048                         rv = 1;
2049                         break;
2050                 }
2051         }
2052         spin_unlock_irq(&device->resource->req_lock);
2053
2054         return rv;
2055 }
2056
2057 /* Called from receive_Data.
2058  * Synchronize packets on sock with packets on msock.
2059  *
2060  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
2061  * packet traveling on msock, they are still processed in the order they have
2062  * been sent.
2063  *
2064  * Note: we don't care for Ack packets overtaking P_DATA packets.
2065  *
2066  * In case packet_seq is larger than device->peer_seq number, there are
2067  * outstanding packets on the msock. We wait for them to arrive.
2068  * In case we are the logically next packet, we update device->peer_seq
2069  * ourselves. Correctly handles 32bit wrap around.
2070  *
2071  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
2072  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
2073  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
2074  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
2075  *
2076  * returns 0 if we may process the packet,
2077  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
2078 static int wait_for_and_update_peer_seq(struct drbd_peer_device *peer_device, const u32 peer_seq)
2079 {
2080         struct drbd_device *device = peer_device->device;
2081         DEFINE_WAIT(wait);
2082         long timeout;
2083         int ret = 0, tp;
2084
2085         if (!test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags))
2086                 return 0;
2087
2088         spin_lock(&device->peer_seq_lock);
2089         for (;;) {
2090                 if (!seq_greater(peer_seq - 1, device->peer_seq)) {
2091                         device->peer_seq = seq_max(device->peer_seq, peer_seq);
2092                         break;
2093                 }
2094
2095                 if (signal_pending(current)) {
2096                         ret = -ERESTARTSYS;
2097                         break;
2098                 }
2099
2100                 rcu_read_lock();
2101                 tp = rcu_dereference(first_peer_device(device)->connection->net_conf)->two_primaries;
2102                 rcu_read_unlock();
2103
2104                 if (!tp)
2105                         break;
2106
2107                 /* Only need to wait if two_primaries is enabled */
2108                 prepare_to_wait(&device->seq_wait, &wait, TASK_INTERRUPTIBLE);
2109                 spin_unlock(&device->peer_seq_lock);
2110                 rcu_read_lock();
2111                 timeout = rcu_dereference(peer_device->connection->net_conf)->ping_timeo*HZ/10;
2112                 rcu_read_unlock();
2113                 timeout = schedule_timeout(timeout);
2114                 spin_lock(&device->peer_seq_lock);
2115                 if (!timeout) {
2116                         ret = -ETIMEDOUT;
2117                         drbd_err(device, "Timed out waiting for missing ack packets; disconnecting\n");
2118                         break;
2119                 }
2120         }
2121         spin_unlock(&device->peer_seq_lock);
2122         finish_wait(&device->seq_wait, &wait);
2123         return ret;
2124 }
2125
2126 /* see also bio_flags_to_wire()
2127  * DRBD_REQ_*, because we need to semantically map the flags to data packet
2128  * flags and back. We may replicate to other kernel versions. */
2129 static unsigned long wire_flags_to_bio(u32 dpf)
2130 {
2131         return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
2132                 (dpf & DP_FUA ? REQ_FUA : 0) |
2133                 (dpf & DP_FLUSH ? REQ_FLUSH : 0) |
2134                 (dpf & DP_DISCARD ? REQ_DISCARD : 0);
2135 }
2136
2137 static void fail_postponed_requests(struct drbd_device *device, sector_t sector,
2138                                     unsigned int size)
2139 {
2140         struct drbd_interval *i;
2141
2142     repeat:
2143         drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2144                 struct drbd_request *req;
2145                 struct bio_and_error m;
2146
2147                 if (!i->local)
2148                         continue;
2149                 req = container_of(i, struct drbd_request, i);
2150                 if (!(req->rq_state & RQ_POSTPONED))
2151                         continue;
2152                 req->rq_state &= ~RQ_POSTPONED;
2153                 __req_mod(req, NEG_ACKED, &m);
2154                 spin_unlock_irq(&device->resource->req_lock);
2155                 if (m.bio)
2156                         complete_master_bio(device, &m);
2157                 spin_lock_irq(&device->resource->req_lock);
2158                 goto repeat;
2159         }
2160 }
2161
2162 static int handle_write_conflicts(struct drbd_device *device,
2163                                   struct drbd_peer_request *peer_req)
2164 {
2165         struct drbd_connection *connection = peer_req->peer_device->connection;
2166         bool resolve_conflicts = test_bit(RESOLVE_CONFLICTS, &connection->flags);
2167         sector_t sector = peer_req->i.sector;
2168         const unsigned int size = peer_req->i.size;
2169         struct drbd_interval *i;
2170         bool equal;
2171         int err;
2172
2173         /*
2174          * Inserting the peer request into the write_requests tree will prevent
2175          * new conflicting local requests from being added.
2176          */
2177         drbd_insert_interval(&device->write_requests, &peer_req->i);
2178
2179     repeat:
2180         drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2181                 if (i == &peer_req->i)
2182                         continue;
2183                 if (i->completed)
2184                         continue;
2185
2186                 if (!i->local) {
2187                         /*
2188                          * Our peer has sent a conflicting remote request; this
2189                          * should not happen in a two-node setup.  Wait for the
2190                          * earlier peer request to complete.
2191                          */
2192                         err = drbd_wait_misc(device, i);
2193                         if (err)
2194                                 goto out;
2195                         goto repeat;
2196                 }
2197
2198                 equal = i->sector == sector && i->size == size;
2199                 if (resolve_conflicts) {
2200                         /*
2201                          * If the peer request is fully contained within the
2202                          * overlapping request, it can be considered overwritten
2203                          * and thus superseded; otherwise, it will be retried
2204                          * once all overlapping requests have completed.
2205                          */
2206                         bool superseded = i->sector <= sector && i->sector +
2207                                        (i->size >> 9) >= sector + (size >> 9);
2208
2209                         if (!equal)
2210                                 drbd_alert(device, "Concurrent writes detected: "
2211                                                "local=%llus +%u, remote=%llus +%u, "
2212                                                "assuming %s came first\n",
2213                                           (unsigned long long)i->sector, i->size,
2214                                           (unsigned long long)sector, size,
2215                                           superseded ? "local" : "remote");
2216
2217                         peer_req->w.cb = superseded ? e_send_superseded :
2218                                                    e_send_retry_write;
2219                         list_add_tail(&peer_req->w.list, &device->done_ee);
2220                         wake_asender(connection);
2221
2222                         err = -ENOENT;
2223                         goto out;
2224                 } else {
2225                         struct drbd_request *req =
2226                                 container_of(i, struct drbd_request, i);
2227
2228                         if (!equal)
2229                                 drbd_alert(device, "Concurrent writes detected: "
2230                                                "local=%llus +%u, remote=%llus +%u\n",
2231                                           (unsigned long long)i->sector, i->size,
2232                                           (unsigned long long)sector, size);
2233
2234                         if (req->rq_state & RQ_LOCAL_PENDING ||
2235                             !(req->rq_state & RQ_POSTPONED)) {
2236                                 /*
2237                                  * Wait for the node with the discard flag to
2238                                  * decide if this request has been superseded
2239                                  * or needs to be retried.
2240                                  * Requests that have been superseded will
2241                                  * disappear from the write_requests tree.
2242                                  *
2243                                  * In addition, wait for the conflicting
2244                                  * request to finish locally before submitting
2245                                  * the conflicting peer request.
2246                                  */
2247                                 err = drbd_wait_misc(device, &req->i);
2248                                 if (err) {
2249                                         _conn_request_state(connection, NS(conn, C_TIMEOUT), CS_HARD);
2250                                         fail_postponed_requests(device, sector, size);
2251                                         goto out;
2252                                 }
2253                                 goto repeat;
2254                         }
2255                         /*
2256                          * Remember to restart the conflicting requests after
2257                          * the new peer request has completed.
2258                          */
2259                         peer_req->flags |= EE_RESTART_REQUESTS;
2260                 }
2261         }
2262         err = 0;
2263
2264     out:
2265         if (err)
2266                 drbd_remove_epoch_entry_interval(device, peer_req);
2267         return err;
2268 }
2269
2270 /* mirrored write */
2271 static int receive_Data(struct drbd_connection *connection, struct packet_info *pi)
2272 {
2273         struct drbd_peer_device *peer_device;
2274         struct drbd_device *device;
2275         struct net_conf *nc;
2276         sector_t sector;
2277         struct drbd_peer_request *peer_req;
2278         struct p_data *p = pi->data;
2279         u32 peer_seq = be32_to_cpu(p->seq_num);
2280         int rw = WRITE;
2281         u32 dp_flags;
2282         int err, tp;
2283
2284         peer_device = conn_peer_device(connection, pi->vnr);
2285         if (!peer_device)
2286                 return -EIO;
2287         device = peer_device->device;
2288
2289         if (!get_ldev(device)) {
2290                 int err2;
2291
2292                 err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2293                 drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
2294                 atomic_inc(&connection->current_epoch->epoch_size);
2295                 err2 = drbd_drain_block(peer_device, pi->size);
2296                 if (!err)
2297                         err = err2;
2298                 return err;
2299         }
2300
2301         /*
2302          * Corresponding put_ldev done either below (on various errors), or in
2303          * drbd_peer_request_endio, if we successfully submit the data at the
2304          * end of this function.
2305          */
2306
2307         sector = be64_to_cpu(p->sector);
2308         peer_req = read_in_block(peer_device, p->block_id, sector, pi);
2309         if (!peer_req) {
2310                 put_ldev(device);
2311                 return -EIO;
2312         }
2313
2314         peer_req->w.cb = e_end_block;
2315         peer_req->submit_jif = jiffies;
2316         peer_req->flags |= EE_APPLICATION;
2317
2318         dp_flags = be32_to_cpu(p->dp_flags);
2319         rw |= wire_flags_to_bio(dp_flags);
2320         if (pi->cmd == P_TRIM) {
2321                 struct request_queue *q = bdev_get_queue(device->ldev->backing_bdev);
2322                 peer_req->flags |= EE_IS_TRIM;
2323                 if (!blk_queue_discard(q))
2324                         peer_req->flags |= EE_IS_TRIM_USE_ZEROOUT;
2325                 D_ASSERT(peer_device, peer_req->i.size > 0);
2326                 D_ASSERT(peer_device, rw & REQ_DISCARD);
2327                 D_ASSERT(peer_device, peer_req->pages == NULL);
2328         } else if (peer_req->pages == NULL) {
2329                 D_ASSERT(device, peer_req->i.size == 0);
2330                 D_ASSERT(device, dp_flags & DP_FLUSH);
2331         }
2332
2333         if (dp_flags & DP_MAY_SET_IN_SYNC)
2334                 peer_req->flags |= EE_MAY_SET_IN_SYNC;
2335
2336         spin_lock(&connection->epoch_lock);
2337         peer_req->epoch = connection->current_epoch;
2338         atomic_inc(&peer_req->epoch->epoch_size);
2339         atomic_inc(&peer_req->epoch->active);
2340         spin_unlock(&connection->epoch_lock);
2341
2342         rcu_read_lock();
2343         nc = rcu_dereference(peer_device->connection->net_conf);
2344         tp = nc->two_primaries;
2345         if (peer_device->connection->agreed_pro_version < 100) {
2346                 switch (nc->wire_protocol) {
2347                 case DRBD_PROT_C:
2348                         dp_flags |= DP_SEND_WRITE_ACK;
2349                         break;
2350                 case DRBD_PROT_B:
2351                         dp_flags |= DP_SEND_RECEIVE_ACK;
2352                         break;
2353                 }
2354         }
2355         rcu_read_unlock();
2356
2357         if (dp_flags & DP_SEND_WRITE_ACK) {
2358                 peer_req->flags |= EE_SEND_WRITE_ACK;
2359                 inc_unacked(device);
2360                 /* corresponding dec_unacked() in e_end_block()
2361                  * respective _drbd_clear_done_ee */
2362         }
2363
2364         if (dp_flags & DP_SEND_RECEIVE_ACK) {
2365                 /* I really don't like it that the receiver thread
2366                  * sends on the msock, but anyways */
2367                 drbd_send_ack(first_peer_device(device), P_RECV_ACK, peer_req);
2368         }
2369
2370         if (tp) {
2371                 /* two primaries implies protocol C */
2372                 D_ASSERT(device, dp_flags & DP_SEND_WRITE_ACK);
2373                 peer_req->flags |= EE_IN_INTERVAL_TREE;
2374                 err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2375                 if (err)
2376                         goto out_interrupted;
2377                 spin_lock_irq(&device->resource->req_lock);
2378                 err = handle_write_conflicts(device, peer_req);
2379                 if (err) {
2380                         spin_unlock_irq(&device->resource->req_lock);
2381                         if (err == -ENOENT) {
2382                                 put_ldev(device);
2383                                 return 0;
2384                         }
2385                         goto out_interrupted;
2386                 }
2387         } else {
2388                 update_peer_seq(peer_device, peer_seq);
2389                 spin_lock_irq(&device->resource->req_lock);
2390         }
2391         /* if we use the zeroout fallback code, we process synchronously
2392          * and we wait for all pending requests, respectively wait for
2393          * active_ee to become empty in drbd_submit_peer_request();
2394          * better not add ourselves here. */
2395         if ((peer_req->flags & EE_IS_TRIM_USE_ZEROOUT) == 0)
2396                 list_add_tail(&peer_req->w.list, &device->active_ee);
2397         spin_unlock_irq(&device->resource->req_lock);
2398
2399         if (device->state.conn == C_SYNC_TARGET)
2400                 wait_event(device->ee_wait, !overlapping_resync_write(device, peer_req));
2401
2402         if (device->state.pdsk < D_INCONSISTENT) {
2403                 /* In case we have the only disk of the cluster, */
2404                 drbd_set_out_of_sync(device, peer_req->i.sector, peer_req->i.size);
2405                 peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2406                 drbd_al_begin_io(device, &peer_req->i);
2407                 peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2408         }
2409
2410         err = drbd_submit_peer_request(device, peer_req, rw, DRBD_FAULT_DT_WR);
2411         if (!err)
2412                 return 0;
2413
2414         /* don't care for the reason here */
2415         drbd_err(device, "submit failed, triggering re-connect\n");
2416         spin_lock_irq(&device->resource->req_lock);
2417         list_del(&peer_req->w.list);
2418         drbd_remove_epoch_entry_interval(device, peer_req);
2419         spin_unlock_irq(&device->resource->req_lock);
2420         if (peer_req->flags & EE_CALL_AL_COMPLETE_IO) {
2421                 peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
2422                 drbd_al_complete_io(device, &peer_req->i);
2423         }
2424
2425 out_interrupted:
2426         drbd_may_finish_epoch(connection, peer_req->epoch, EV_PUT + EV_CLEANUP);
2427         put_ldev(device);
2428         drbd_free_peer_req(device, peer_req);
2429         return err;
2430 }
2431
2432 /* We may throttle resync, if the lower device seems to be busy,
2433  * and current sync rate is above c_min_rate.
2434  *
2435  * To decide whether or not the lower device is busy, we use a scheme similar
2436  * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2437  * (more than 64 sectors) of activity we cannot account for with our own resync
2438  * activity, it obviously is "busy".
2439  *
2440  * The current sync rate used here uses only the most recent two step marks,
2441  * to have a short time average so we can react faster.
2442  */
2443 bool drbd_rs_should_slow_down(struct drbd_device *device, sector_t sector,
2444                 bool throttle_if_app_is_waiting)
2445 {
2446         struct lc_element *tmp;
2447         bool throttle = drbd_rs_c_min_rate_throttle(device);
2448
2449         if (!throttle || throttle_if_app_is_waiting)
2450                 return throttle;
2451
2452         spin_lock_irq(&device->al_lock);
2453         tmp = lc_find(device->resync, BM_SECT_TO_EXT(sector));
2454         if (tmp) {
2455                 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2456                 if (test_bit(BME_PRIORITY, &bm_ext->flags))
2457                         throttle = false;
2458                 /* Do not slow down if app IO is already waiting for this extent,
2459                  * and our progress is necessary for application IO to complete. */
2460         }
2461         spin_unlock_irq(&device->al_lock);
2462
2463         return throttle;
2464 }
2465
2466 bool drbd_rs_c_min_rate_throttle(struct drbd_device *device)
2467 {
2468         struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk;
2469         unsigned long db, dt, dbdt;
2470         unsigned int c_min_rate;
2471         int curr_events;
2472
2473         rcu_read_lock();
2474         c_min_rate = rcu_dereference(device->ldev->disk_conf)->c_min_rate;
2475         rcu_read_unlock();
2476
2477         /* feature disabled? */
2478         if (c_min_rate == 0)
2479                 return false;
2480
2481         curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2482                       (int)part_stat_read(&disk->part0, sectors[1]) -
2483                         atomic_read(&device->rs_sect_ev);
2484
2485         if (atomic_read(&device->ap_actlog_cnt)
2486             || curr_events - device->rs_last_events > 64) {
2487                 unsigned long rs_left;
2488                 int i;
2489
2490                 device->rs_last_events = curr_events;
2491
2492                 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2493                  * approx. */
2494                 i = (device->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2495
2496                 if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
2497                         rs_left = device->ov_left;
2498                 else
2499                         rs_left = drbd_bm_total_weight(device) - device->rs_failed;
2500
2501                 dt = ((long)jiffies - (long)device->rs_mark_time[i]) / HZ;
2502                 if (!dt)
2503                         dt++;
2504                 db = device->rs_mark_left[i] - rs_left;
2505                 dbdt = Bit2KB(db/dt);
2506
2507                 if (dbdt > c_min_rate)
2508                         return true;
2509         }
2510         return false;
2511 }
2512
2513 static int receive_DataRequest(struct drbd_connection *connection, struct packet_info *pi)
2514 {
2515         struct drbd_peer_device *peer_device;
2516         struct drbd_device *device;
2517         sector_t sector;
2518         sector_t capacity;
2519         struct drbd_peer_request *peer_req;
2520         struct digest_info *di = NULL;
2521         int size, verb;
2522         unsigned int fault_type;
2523         struct p_block_req *p = pi->data;
2524
2525         peer_device = conn_peer_device(connection, pi->vnr);
2526         if (!peer_device)
2527                 return -EIO;
2528         device = peer_device->device;
2529         capacity = drbd_get_capacity(device->this_bdev);
2530
2531         sector = be64_to_cpu(p->sector);
2532         size   = be32_to_cpu(p->blksize);
2533
2534         if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2535                 drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2536                                 (unsigned long long)sector, size);
2537                 return -EINVAL;
2538         }
2539         if (sector + (size>>9) > capacity) {
2540                 drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2541                                 (unsigned long long)sector, size);
2542                 return -EINVAL;
2543         }
2544
2545         if (!get_ldev_if_state(device, D_UP_TO_DATE)) {
2546                 verb = 1;
2547                 switch (pi->cmd) {
2548                 case P_DATA_REQUEST:
2549                         drbd_send_ack_rp(peer_device, P_NEG_DREPLY, p);
2550                         break;
2551                 case P_RS_DATA_REQUEST:
2552                 case P_CSUM_RS_REQUEST:
2553                 case P_OV_REQUEST:
2554                         drbd_send_ack_rp(peer_device, P_NEG_RS_DREPLY , p);
2555                         break;
2556                 case P_OV_REPLY:
2557                         verb = 0;
2558                         dec_rs_pending(device);
2559                         drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size, ID_IN_SYNC);
2560                         break;
2561                 default:
2562                         BUG();
2563                 }
2564                 if (verb && __ratelimit(&drbd_ratelimit_state))
2565                         drbd_err(device, "Can not satisfy peer's read request, "
2566                             "no local data.\n");
2567
2568                 /* drain possibly payload */
2569                 return drbd_drain_block(peer_device, pi->size);
2570         }
2571
2572         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2573          * "criss-cross" setup, that might cause write-out on some other DRBD,
2574          * which in turn might block on the other node at this very place.  */
2575         peer_req = drbd_alloc_peer_req(peer_device, p->block_id, sector, size,
2576                         true /* has real payload */, GFP_NOIO);
2577         if (!peer_req) {
2578                 put_ldev(device);
2579                 return -ENOMEM;
2580         }
2581
2582         switch (pi->cmd) {
2583         case P_DATA_REQUEST:
2584                 peer_req->w.cb = w_e_end_data_req;
2585                 fault_type = DRBD_FAULT_DT_RD;
2586                 /* application IO, don't drbd_rs_begin_io */
2587                 peer_req->flags |= EE_APPLICATION;
2588                 goto submit;
2589
2590         case P_RS_DATA_REQUEST:
2591                 peer_req->w.cb = w_e_end_rsdata_req;
2592                 fault_type = DRBD_FAULT_RS_RD;
2593                 /* used in the sector offset progress display */
2594                 device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2595                 break;
2596
2597         case P_OV_REPLY:
2598         case P_CSUM_RS_REQUEST:
2599                 fault_type = DRBD_FAULT_RS_RD;
2600                 di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2601                 if (!di)
2602                         goto out_free_e;
2603
2604                 di->digest_size = pi->size;
2605                 di->digest = (((char *)di)+sizeof(struct digest_info));
2606
2607                 peer_req->digest = di;
2608                 peer_req->flags |= EE_HAS_DIGEST;
2609
2610                 if (drbd_recv_all(peer_device->connection, di->digest, pi->size))
2611                         goto out_free_e;
2612
2613                 if (pi->cmd == P_CSUM_RS_REQUEST) {
2614                         D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
2615                         peer_req->w.cb = w_e_end_csum_rs_req;
2616                         /* used in the sector offset progress display */
2617                         device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2618                         /* remember to report stats in drbd_resync_finished */
2619                         device->use_csums = true;
2620                 } else if (pi->cmd == P_OV_REPLY) {
2621                         /* track progress, we may need to throttle */
2622                         atomic_add(size >> 9, &device->rs_sect_in);
2623                         peer_req->w.cb = w_e_end_ov_reply;
2624                         dec_rs_pending(device);
2625                         /* drbd_rs_begin_io done when we sent this request,
2626                          * but accounting still needs to be done. */
2627                         goto submit_for_resync;
2628                 }
2629                 break;
2630
2631         case P_OV_REQUEST:
2632                 if (device->ov_start_sector == ~(sector_t)0 &&
2633                     peer_device->connection->agreed_pro_version >= 90) {
2634                         unsigned long now = jiffies;
2635                         int i;
2636                         device->ov_start_sector = sector;
2637                         device->ov_position = sector;
2638                         device->ov_left = drbd_bm_bits(device) - BM_SECT_TO_BIT(sector);
2639                         device->rs_total = device->ov_left;
2640                         for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2641                                 device->rs_mark_left[i] = device->ov_left;
2642                                 device->rs_mark_time[i] = now;
2643                         }
2644                         drbd_info(device, "Online Verify start sector: %llu\n",
2645                                         (unsigned long long)sector);
2646                 }
2647                 peer_req->w.cb = w_e_end_ov_req;
2648                 fault_type = DRBD_FAULT_RS_RD;
2649                 break;
2650
2651         default:
2652                 BUG();
2653         }
2654
2655         /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2656          * wrt the receiver, but it is not as straightforward as it may seem.
2657          * Various places in the resync start and stop logic assume resync
2658          * requests are processed in order, requeuing this on the worker thread
2659          * introduces a bunch of new code for synchronization between threads.
2660          *
2661          * Unlimited throttling before drbd_rs_begin_io may stall the resync
2662          * "forever", throttling after drbd_rs_begin_io will lock that extent
2663          * for application writes for the same time.  For now, just throttle
2664          * here, where the rest of the code expects the receiver to sleep for
2665          * a while, anyways.
2666          */
2667
2668         /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2669          * this defers syncer requests for some time, before letting at least
2670          * on request through.  The resync controller on the receiving side
2671          * will adapt to the incoming rate accordingly.
2672          *
2673          * We cannot throttle here if remote is Primary/SyncTarget:
2674          * we would also throttle its application reads.
2675          * In that case, throttling is done on the SyncTarget only.
2676          */
2677
2678         /* Even though this may be a resync request, we do add to "read_ee";
2679          * "sync_ee" is only used for resync WRITEs.
2680          * Add to list early, so debugfs can find this request
2681          * even if we have to sleep below. */
2682         spin_lock_irq(&device->resource->req_lock);
2683         list_add_tail(&peer_req->w.list, &device->read_ee);
2684         spin_unlock_irq(&device->resource->req_lock);
2685
2686         update_receiver_timing_details(connection, drbd_rs_should_slow_down);
2687         if (device->state.peer != R_PRIMARY
2688         && drbd_rs_should_slow_down(device, sector, false))
2689                 schedule_timeout_uninterruptible(HZ/10);
2690         update_receiver_timing_details(connection, drbd_rs_begin_io);
2691         if (drbd_rs_begin_io(device, sector))
2692                 goto out_free_e;
2693
2694 submit_for_resync:
2695         atomic_add(size >> 9, &device->rs_sect_ev);
2696
2697 submit:
2698         update_receiver_timing_details(connection, drbd_submit_peer_request);
2699         inc_unacked(device);
2700         if (drbd_submit_peer_request(device, peer_req, READ, fault_type) == 0)
2701                 return 0;
2702
2703         /* don't care for the reason here */
2704         drbd_err(device, "submit failed, triggering re-connect\n");
2705
2706 out_free_e:
2707         spin_lock_irq(&device->resource->req_lock);
2708         list_del(&peer_req->w.list);
2709         spin_unlock_irq(&device->resource->req_lock);
2710         /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2711
2712         put_ldev(device);
2713         drbd_free_peer_req(device, peer_req);
2714         return -EIO;
2715 }
2716
2717 /**
2718  * drbd_asb_recover_0p  -  Recover after split-brain with no remaining primaries
2719  */
2720 static int drbd_asb_recover_0p(struct drbd_peer_device *peer_device) __must_hold(local)
2721 {
2722         struct drbd_device *device = peer_device->device;
2723         int self, peer, rv = -100;
2724         unsigned long ch_self, ch_peer;
2725         enum drbd_after_sb_p after_sb_0p;
2726
2727         self = device->ldev->md.uuid[UI_BITMAP] & 1;
2728         peer = device->p_uuid[UI_BITMAP] & 1;
2729
2730         ch_peer = device->p_uuid[UI_SIZE];
2731         ch_self = device->comm_bm_set;
2732
2733         rcu_read_lock();
2734         after_sb_0p = rcu_dereference(peer_device->connection->net_conf)->after_sb_0p;
2735         rcu_read_unlock();
2736         switch (after_sb_0p) {
2737         case ASB_CONSENSUS:
2738         case ASB_DISCARD_SECONDARY:
2739         case ASB_CALL_HELPER:
2740         case ASB_VIOLENTLY:
2741                 drbd_err(device, "Configuration error.\n");
2742                 break;
2743         case ASB_DISCONNECT:
2744                 break;
2745         case ASB_DISCARD_YOUNGER_PRI:
2746                 if (self == 0 && peer == 1) {
2747                         rv = -1;
2748                         break;
2749                 }
2750                 if (self == 1 && peer == 0) {
2751                         rv =  1;
2752                         break;
2753                 }
2754                 /* Else fall through to one of the other strategies... */
2755         case ASB_DISCARD_OLDER_PRI:
2756                 if (self == 0 && peer == 1) {
2757                         rv = 1;
2758                         break;
2759                 }
2760                 if (self == 1 && peer == 0) {
2761                         rv = -1;
2762                         break;
2763                 }
2764                 /* Else fall through to one of the other strategies... */
2765                 drbd_warn(device, "Discard younger/older primary did not find a decision\n"
2766                      "Using discard-least-changes instead\n");
2767         case ASB_DISCARD_ZERO_CHG:
2768                 if (ch_peer == 0 && ch_self == 0) {
2769                         rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
2770                                 ? -1 : 1;
2771                         break;
2772                 } else {
2773                         if (ch_peer == 0) { rv =  1; break; }
2774                         if (ch_self == 0) { rv = -1; break; }
2775                 }
2776                 if (after_sb_0p == ASB_DISCARD_ZERO_CHG)
2777                         break;
2778         case ASB_DISCARD_LEAST_CHG:
2779                 if      (ch_self < ch_peer)
2780                         rv = -1;
2781                 else if (ch_self > ch_peer)
2782                         rv =  1;
2783                 else /* ( ch_self == ch_peer ) */
2784                      /* Well, then use something else. */
2785                         rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
2786                                 ? -1 : 1;
2787                 break;
2788         case ASB_DISCARD_LOCAL:
2789                 rv = -1;
2790                 break;
2791         case ASB_DISCARD_REMOTE:
2792                 rv =  1;
2793         }
2794
2795         return rv;
2796 }
2797
2798 /**
2799  * drbd_asb_recover_1p  -  Recover after split-brain with one remaining primary
2800  */
2801 static int drbd_asb_recover_1p(struct drbd_peer_device *peer_device) __must_hold(local)
2802 {
2803         struct drbd_device *device = peer_device->device;
2804         int hg, rv = -100;
2805         enum drbd_after_sb_p after_sb_1p;
2806
2807         rcu_read_lock();
2808         after_sb_1p = rcu_dereference(peer_device->connection->net_conf)->after_sb_1p;
2809         rcu_read_unlock();
2810         switch (after_sb_1p) {
2811         case ASB_DISCARD_YOUNGER_PRI:
2812         case ASB_DISCARD_OLDER_PRI:
2813         case ASB_DISCARD_LEAST_CHG:
2814         case ASB_DISCARD_LOCAL:
2815         case ASB_DISCARD_REMOTE:
2816         case ASB_DISCARD_ZERO_CHG:
2817                 drbd_err(device, "Configuration error.\n");
2818                 break;
2819         case ASB_DISCONNECT:
2820                 break;
2821         case ASB_CONSENSUS:
2822                 hg = drbd_asb_recover_0p(peer_device);
2823                 if (hg == -1 && device->state.role == R_SECONDARY)
2824                         rv = hg;
2825                 if (hg == 1  && device->state.role == R_PRIMARY)
2826                         rv = hg;
2827                 break;
2828         case ASB_VIOLENTLY:
2829                 rv = drbd_asb_recover_0p(peer_device);
2830                 break;
2831         case ASB_DISCARD_SECONDARY:
2832                 return device->state.role == R_PRIMARY ? 1 : -1;
2833         case ASB_CALL_HELPER:
2834                 hg = drbd_asb_recover_0p(peer_device);
2835                 if (hg == -1 && device->state.role == R_PRIMARY) {
2836                         enum drbd_state_rv rv2;
2837
2838                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2839                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2840                           * we do not need to wait for the after state change work either. */
2841                         rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
2842                         if (rv2 != SS_SUCCESS) {
2843                                 drbd_khelper(device, "pri-lost-after-sb");
2844                         } else {
2845                                 drbd_warn(device, "Successfully gave up primary role.\n");
2846                                 rv = hg;
2847                         }
2848                 } else
2849                         rv = hg;
2850         }
2851
2852         return rv;
2853 }
2854
2855 /**
2856  * drbd_asb_recover_2p  -  Recover after split-brain with two remaining primaries
2857  */
2858 static int drbd_asb_recover_2p(struct drbd_peer_device *peer_device) __must_hold(local)
2859 {
2860         struct drbd_device *device = peer_device->device;
2861         int hg, rv = -100;
2862         enum drbd_after_sb_p after_sb_2p;
2863
2864         rcu_read_lock();
2865         after_sb_2p = rcu_dereference(peer_device->connection->net_conf)->after_sb_2p;
2866         rcu_read_unlock();
2867         switch (after_sb_2p) {
2868         case ASB_DISCARD_YOUNGER_PRI:
2869         case ASB_DISCARD_OLDER_PRI:
2870         case ASB_DISCARD_LEAST_CHG:
2871         case ASB_DISCARD_LOCAL:
2872         case ASB_DISCARD_REMOTE:
2873         case ASB_CONSENSUS:
2874         case ASB_DISCARD_SECONDARY:
2875         case ASB_DISCARD_ZERO_CHG:
2876                 drbd_err(device, "Configuration error.\n");
2877                 break;
2878         case ASB_VIOLENTLY:
2879                 rv = drbd_asb_recover_0p(peer_device);
2880                 break;
2881         case ASB_DISCONNECT:
2882                 break;
2883         case ASB_CALL_HELPER:
2884                 hg = drbd_asb_recover_0p(peer_device);
2885                 if (hg == -1) {
2886                         enum drbd_state_rv rv2;
2887
2888                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2889                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2890                           * we do not need to wait for the after state change work either. */
2891                         rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
2892                         if (rv2 != SS_SUCCESS) {
2893                                 drbd_khelper(device, "pri-lost-after-sb");
2894                         } else {
2895                                 drbd_warn(device, "Successfully gave up primary role.\n");
2896                                 rv = hg;
2897                         }
2898                 } else
2899                         rv = hg;
2900         }
2901
2902         return rv;
2903 }
2904
2905 static void drbd_uuid_dump(struct drbd_device *device, char *text, u64 *uuid,
2906                            u64 bits, u64 flags)
2907 {
2908         if (!uuid) {
2909                 drbd_info(device, "%s uuid info vanished while I was looking!\n", text);
2910                 return;
2911         }
2912         drbd_info(device, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2913              text,
2914              (unsigned long long)uuid[UI_CURRENT],
2915              (unsigned long long)uuid[UI_BITMAP],
2916              (unsigned long long)uuid[UI_HISTORY_START],
2917              (unsigned long long)uuid[UI_HISTORY_END],
2918              (unsigned long long)bits,
2919              (unsigned long long)flags);
2920 }
2921
2922 /*
2923   100   after split brain try auto recover
2924     2   C_SYNC_SOURCE set BitMap
2925     1   C_SYNC_SOURCE use BitMap
2926     0   no Sync
2927    -1   C_SYNC_TARGET use BitMap
2928    -2   C_SYNC_TARGET set BitMap
2929  -100   after split brain, disconnect
2930 -1000   unrelated data
2931 -1091   requires proto 91
2932 -1096   requires proto 96
2933  */
2934 static int drbd_uuid_compare(struct drbd_device *const device, int *rule_nr) __must_hold(local)
2935 {
2936         struct drbd_peer_device *const peer_device = first_peer_device(device);
2937         struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
2938         u64 self, peer;
2939         int i, j;
2940
2941         self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2942         peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
2943
2944         *rule_nr = 10;
2945         if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2946                 return 0;
2947
2948         *rule_nr = 20;
2949         if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2950              peer != UUID_JUST_CREATED)
2951                 return -2;
2952
2953         *rule_nr = 30;
2954         if (self != UUID_JUST_CREATED &&
2955             (peer == UUID_JUST_CREATED || peer == (u64)0))
2956                 return 2;
2957
2958         if (self == peer) {
2959                 int rct, dc; /* roles at crash time */
2960
2961                 if (device->p_uuid[UI_BITMAP] == (u64)0 && device->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2962
2963                         if (connection->agreed_pro_version < 91)
2964                                 return -1091;
2965
2966                         if ((device->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2967                             (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2968                                 drbd_info(device, "was SyncSource, missed the resync finished event, corrected myself:\n");
2969                                 drbd_uuid_move_history(device);
2970                                 device->ldev->md.uuid[UI_HISTORY_START] = device->ldev->md.uuid[UI_BITMAP];
2971                                 device->ldev->md.uuid[UI_BITMAP] = 0;
2972
2973                                 drbd_uuid_dump(device, "self", device->ldev->md.uuid,
2974                                                device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
2975                                 *rule_nr = 34;
2976                         } else {
2977                                 drbd_info(device, "was SyncSource (peer failed to write sync_uuid)\n");
2978                                 *rule_nr = 36;
2979                         }
2980
2981                         return 1;
2982                 }
2983
2984                 if (device->ldev->md.uuid[UI_BITMAP] == (u64)0 && device->p_uuid[UI_BITMAP] != (u64)0) {
2985
2986                         if (connection->agreed_pro_version < 91)
2987                                 return -1091;
2988
2989                         if ((device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2990                             (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2991                                 drbd_info(device, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2992
2993                                 device->p_uuid[UI_HISTORY_START + 1] = device->p_uuid[UI_HISTORY_START];
2994                                 device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_BITMAP];
2995                                 device->p_uuid[UI_BITMAP] = 0UL;
2996
2997                                 drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
2998                                 *rule_nr = 35;
2999                         } else {
3000                                 drbd_info(device, "was SyncTarget (failed to write sync_uuid)\n");
3001                                 *rule_nr = 37;
3002                         }
3003
3004                         return -1;
3005                 }
3006
3007                 /* Common power [off|failure] */
3008                 rct = (test_bit(CRASHED_PRIMARY, &device->flags) ? 1 : 0) +
3009                         (device->p_uuid[UI_FLAGS] & 2);
3010                 /* lowest bit is set when we were primary,
3011                  * next bit (weight 2) is set when peer was primary */
3012                 *rule_nr = 40;
3013
3014                 switch (rct) {
3015                 case 0: /* !self_pri && !peer_pri */ return 0;
3016                 case 1: /*  self_pri && !peer_pri */ return 1;
3017                 case 2: /* !self_pri &&  peer_pri */ return -1;
3018                 case 3: /*  self_pri &&  peer_pri */
3019                         dc = test_bit(RESOLVE_CONFLICTS, &connection->flags);
3020                         return dc ? -1 : 1;
3021                 }
3022         }
3023
3024         *rule_nr = 50;
3025         peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
3026         if (self == peer)
3027                 return -1;
3028
3029         *rule_nr = 51;
3030         peer = device->p_uuid[UI_HISTORY_START] & ~((u64)1);
3031         if (self == peer) {
3032                 if (connection->agreed_pro_version < 96 ?
3033                     (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
3034                     (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
3035                     peer + UUID_NEW_BM_OFFSET == (device->p_uuid[UI_BITMAP] & ~((u64)1))) {
3036                         /* The last P_SYNC_UUID did not get though. Undo the last start of
3037                            resync as sync source modifications of the peer's UUIDs. */
3038
3039                         if (connection->agreed_pro_version < 91)
3040                                 return -1091;
3041
3042                         device->p_uuid[UI_BITMAP] = device->p_uuid[UI_HISTORY_START];
3043                         device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_HISTORY_START + 1];
3044
3045                         drbd_info(device, "Lost last syncUUID packet, corrected:\n");
3046                         drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3047
3048                         return -1;
3049                 }
3050         }
3051
3052         *rule_nr = 60;
3053         self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
3054         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3055                 peer = device->p_uuid[i] & ~((u64)1);
3056                 if (self == peer)
3057                         return -2;
3058         }
3059
3060         *rule_nr = 70;
3061         self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3062         peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3063         if (self == peer)
3064                 return 1;
3065
3066         *rule_nr = 71;
3067         self = device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
3068         if (self == peer) {
3069                 if (connection->agreed_pro_version < 96 ?
3070                     (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
3071                     (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
3072                     self + UUID_NEW_BM_OFFSET == (device->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
3073                         /* The last P_SYNC_UUID did not get though. Undo the last start of
3074                            resync as sync source modifications of our UUIDs. */
3075
3076                         if (connection->agreed_pro_version < 91)
3077                                 return -1091;
3078
3079                         __drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_HISTORY_START]);
3080                         __drbd_uuid_set(device, UI_HISTORY_START, device->ldev->md.uuid[UI_HISTORY_START + 1]);
3081
3082                         drbd_info(device, "Last syncUUID did not get through, corrected:\n");
3083                         drbd_uuid_dump(device, "self", device->ldev->md.uuid,
3084                                        device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
3085
3086                         return 1;
3087                 }
3088         }
3089
3090
3091         *rule_nr = 80;
3092         peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3093         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3094                 self = device->ldev->md.uuid[i] & ~((u64)1);
3095                 if (self == peer)
3096                         return 2;
3097         }
3098
3099         *rule_nr = 90;
3100         self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3101         peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
3102         if (self == peer && self != ((u64)0))
3103                 return 100;
3104
3105         *rule_nr = 100;
3106         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3107                 self = device->ldev->md.uuid[i] & ~((u64)1);
3108                 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
3109                         peer = device->p_uuid[j] & ~((u64)1);
3110                         if (self == peer)
3111                                 return -100;
3112                 }
3113         }
3114
3115         return -1000;
3116 }
3117
3118 /* drbd_sync_handshake() returns the new conn state on success, or
3119    CONN_MASK (-1) on failure.
3120  */
3121 static enum drbd_conns drbd_sync_handshake(struct drbd_peer_device *peer_device,
3122                                            enum drbd_role peer_role,
3123                                            enum drbd_disk_state peer_disk) __must_hold(local)
3124 {
3125         struct drbd_device *device = peer_device->device;
3126         enum drbd_conns rv = C_MASK;
3127         enum drbd_disk_state mydisk;
3128         struct net_conf *nc;
3129         int hg, rule_nr, rr_conflict, tentative;
3130
3131         mydisk = device->state.disk;
3132         if (mydisk == D_NEGOTIATING)
3133                 mydisk = device->new_state_tmp.disk;
3134
3135         drbd_info(device, "drbd_sync_handshake:\n");
3136
3137         spin_lock_irq(&device->ldev->md.uuid_lock);
3138         drbd_uuid_dump(device, "self", device->ldev->md.uuid, device->comm_bm_set, 0);
3139         drbd_uuid_dump(device, "peer", device->p_uuid,
3140                        device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3141
3142         hg = drbd_uuid_compare(device, &rule_nr);
3143         spin_unlock_irq(&device->ldev->md.uuid_lock);
3144
3145         drbd_info(device, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
3146
3147         if (hg == -1000) {
3148                 drbd_alert(device, "Unrelated data, aborting!\n");
3149                 return C_MASK;
3150         }
3151         if (hg < -1000) {
3152                 drbd_alert(device, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
3153                 return C_MASK;
3154         }
3155
3156         if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
3157             (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
3158                 int f = (hg == -100) || abs(hg) == 2;
3159                 hg = mydisk > D_INCONSISTENT ? 1 : -1;
3160                 if (f)
3161                         hg = hg*2;
3162                 drbd_info(device, "Becoming sync %s due to disk states.\n",
3163                      hg > 0 ? "source" : "target");
3164         }
3165
3166         if (abs(hg) == 100)
3167                 drbd_khelper(device, "initial-split-brain");
3168
3169         rcu_read_lock();
3170         nc = rcu_dereference(peer_device->connection->net_conf);
3171
3172         if (hg == 100 || (hg == -100 && nc->always_asbp)) {
3173                 int pcount = (device->state.role == R_PRIMARY)
3174                            + (peer_role == R_PRIMARY);
3175                 int forced = (hg == -100);
3176
3177                 switch (pcount) {
3178                 case 0:
3179                         hg = drbd_asb_recover_0p(peer_device);
3180                         break;
3181                 case 1:
3182                         hg = drbd_asb_recover_1p(peer_device);
3183                         break;
3184                 case 2:
3185                         hg = drbd_asb_recover_2p(peer_device);
3186                         break;
3187                 }
3188                 if (abs(hg) < 100) {
3189                         drbd_warn(device, "Split-Brain detected, %d primaries, "
3190                              "automatically solved. Sync from %s node\n",
3191                              pcount, (hg < 0) ? "peer" : "this");
3192                         if (forced) {
3193                                 drbd_warn(device, "Doing a full sync, since"
3194                                      " UUIDs where ambiguous.\n");
3195                                 hg = hg*2;
3196                         }
3197                 }
3198         }
3199
3200         if (hg == -100) {
3201                 if (test_bit(DISCARD_MY_DATA, &device->flags) && !(device->p_uuid[UI_FLAGS]&1))
3202                         hg = -1;
3203                 if (!test_bit(DISCARD_MY_DATA, &device->flags) && (device->p_uuid[UI_FLAGS]&1))
3204                         hg = 1;
3205
3206                 if (abs(hg) < 100)
3207                         drbd_warn(device, "Split-Brain detected, manually solved. "
3208                              "Sync from %s node\n",
3209                              (hg < 0) ? "peer" : "this");
3210         }
3211         rr_conflict = nc->rr_conflict;
3212         tentative = nc->tentative;
3213         rcu_read_unlock();
3214
3215         if (hg == -100) {
3216                 /* FIXME this log message is not correct if we end up here
3217                  * after an attempted attach on a diskless node.
3218                  * We just refuse to attach -- well, we drop the "connection"
3219                  * to that disk, in a way... */
3220                 drbd_alert(device, "Split-Brain detected but unresolved, dropping connection!\n");
3221                 drbd_khelper(device, "split-brain");
3222                 return C_MASK;
3223         }
3224
3225         if (hg > 0 && mydisk <= D_INCONSISTENT) {
3226                 drbd_err(device, "I shall become SyncSource, but I am inconsistent!\n");
3227                 return C_MASK;
3228         }
3229
3230         if (hg < 0 && /* by intention we do not use mydisk here. */
3231             device->state.role == R_PRIMARY && device->state.disk >= D_CONSISTENT) {
3232                 switch (rr_conflict) {
3233                 case ASB_CALL_HELPER:
3234                         drbd_khelper(device, "pri-lost");
3235                         /* fall through */
3236                 case ASB_DISCONNECT:
3237                         drbd_err(device, "I shall become SyncTarget, but I am primary!\n");
3238                         return C_MASK;
3239                 case ASB_VIOLENTLY:
3240                         drbd_warn(device, "Becoming SyncTarget, violating the stable-data"
3241                              "assumption\n");
3242                 }
3243         }
3244
3245         if (tentative || test_bit(CONN_DRY_RUN, &peer_device->connection->flags)) {
3246                 if (hg == 0)
3247                         drbd_info(device, "dry-run connect: No resync, would become Connected immediately.\n");
3248                 else
3249                         drbd_info(device, "dry-run connect: Would become %s, doing a %s resync.",
3250                                  drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
3251                                  abs(hg) >= 2 ? "full" : "bit-map based");
3252                 return C_MASK;
3253         }
3254
3255         if (abs(hg) >= 2) {
3256                 drbd_info(device, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
3257                 if (drbd_bitmap_io(device, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
3258                                         BM_LOCKED_SET_ALLOWED))
3259                         return C_MASK;
3260         }
3261
3262         if (hg > 0) { /* become sync source. */
3263                 rv = C_WF_BITMAP_S;
3264         } else if (hg < 0) { /* become sync target */
3265                 rv = C_WF_BITMAP_T;
3266         } else {
3267                 rv = C_CONNECTED;
3268                 if (drbd_bm_total_weight(device)) {
3269                         drbd_info(device, "No resync, but %lu bits in bitmap!\n",
3270                              drbd_bm_total_weight(device));
3271                 }
3272         }
3273
3274         return rv;
3275 }
3276
3277 static enum drbd_after_sb_p convert_after_sb(enum drbd_after_sb_p peer)
3278 {
3279         /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
3280         if (peer == ASB_DISCARD_REMOTE)
3281                 return ASB_DISCARD_LOCAL;
3282
3283         /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
3284         if (peer == ASB_DISCARD_LOCAL)
3285                 return ASB_DISCARD_REMOTE;
3286
3287         /* everything else is valid if they are equal on both sides. */
3288         return peer;
3289 }
3290
3291 static int receive_protocol(struct drbd_connection *connection, struct packet_info *pi)
3292 {
3293         struct p_protocol *p = pi->data;
3294         enum drbd_after_sb_p p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
3295         int p_proto, p_discard_my_data, p_two_primaries, cf;
3296         struct net_conf *nc, *old_net_conf, *new_net_conf = NULL;
3297         char integrity_alg[SHARED_SECRET_MAX] = "";
3298         struct crypto_hash *peer_integrity_tfm = NULL;
3299         void *int_dig_in = NULL, *int_dig_vv = NULL;
3300
3301         p_proto         = be32_to_cpu(p->protocol);
3302         p_after_sb_0p   = be32_to_cpu(p->after_sb_0p);
3303         p_after_sb_1p   = be32_to_cpu(p->after_sb_1p);
3304         p_after_sb_2p   = be32_to_cpu(p->after_sb_2p);
3305         p_two_primaries = be32_to_cpu(p->two_primaries);
3306         cf              = be32_to_cpu(p->conn_flags);
3307         p_discard_my_data = cf & CF_DISCARD_MY_DATA;
3308
3309         if (connection->agreed_pro_version >= 87) {
3310                 int err;
3311
3312                 if (pi->size > sizeof(integrity_alg))
3313                         return -EIO;
3314                 err = drbd_recv_all(connection, integrity_alg, pi->size);
3315                 if (err)
3316                         return err;
3317                 integrity_alg[SHARED_SECRET_MAX - 1] = 0;
3318         }
3319
3320         if (pi->cmd != P_PROTOCOL_UPDATE) {
3321                 clear_bit(CONN_DRY_RUN, &connection->flags);
3322
3323                 if (cf & CF_DRY_RUN)
3324                         set_bit(CONN_DRY_RUN, &connection->flags);
3325
3326                 rcu_read_lock();
3327                 nc = rcu_dereference(connection->net_conf);
3328
3329                 if (p_proto != nc->wire_protocol) {
3330                         drbd_err(connection, "incompatible %s settings\n", "protocol");
3331                         goto disconnect_rcu_unlock;
3332                 }
3333
3334                 if (convert_after_sb(p_after_sb_0p) != nc->after_sb_0p) {
3335                         drbd_err(connection, "incompatible %s settings\n", "after-sb-0pri");
3336                         goto disconnect_rcu_unlock;
3337                 }
3338
3339                 if (convert_after_sb(p_after_sb_1p) != nc->after_sb_1p) {
3340                         drbd_err(connection, "incompatible %s settings\n", "after-sb-1pri");
3341                         goto disconnect_rcu_unlock;
3342                 }
3343
3344                 if (convert_after_sb(p_after_sb_2p) != nc->after_sb_2p) {
3345                         drbd_err(connection, "incompatible %s settings\n", "after-sb-2pri");
3346                         goto disconnect_rcu_unlock;
3347                 }
3348
3349                 if (p_discard_my_data && nc->discard_my_data) {
3350                         drbd_err(connection, "incompatible %s settings\n", "discard-my-data");
3351                         goto disconnect_rcu_unlock;
3352                 }
3353
3354                 if (p_two_primaries != nc->two_primaries) {
3355                         drbd_err(connection, "incompatible %s settings\n", "allow-two-primaries");
3356                         goto disconnect_rcu_unlock;
3357                 }
3358
3359                 if (strcmp(integrity_alg, nc->integrity_alg)) {
3360                         drbd_err(connection, "incompatible %s settings\n", "data-integrity-alg");
3361                         goto disconnect_rcu_unlock;
3362                 }
3363
3364                 rcu_read_unlock();
3365         }
3366
3367         if (integrity_alg[0]) {
3368                 int hash_size;
3369
3370                 /*
3371                  * We can only change the peer data integrity algorithm
3372                  * here.  Changing our own data integrity algorithm
3373                  * requires that we send a P_PROTOCOL_UPDATE packet at
3374                  * the same time; otherwise, the peer has no way to
3375                  * tell between which packets the algorithm should
3376                  * change.
3377                  */
3378
3379                 peer_integrity_tfm = crypto_alloc_hash(integrity_alg, 0, CRYPTO_ALG_ASYNC);
3380                 if (!peer_integrity_tfm) {
3381                         drbd_err(connection, "peer data-integrity-alg %s not supported\n",
3382                                  integrity_alg);
3383                         goto disconnect;
3384                 }
3385
3386                 hash_size = crypto_hash_digestsize(peer_integrity_tfm);
3387                 int_dig_in = kmalloc(hash_size, GFP_KERNEL);
3388                 int_dig_vv = kmalloc(hash_size, GFP_KERNEL);
3389                 if (!(int_dig_in && int_dig_vv)) {
3390                         drbd_err(connection, "Allocation of buffers for data integrity checking failed\n");
3391                         goto disconnect;
3392                 }
3393         }
3394
3395         new_net_conf = kmalloc(sizeof(struct net_conf), GFP_KERNEL);
3396         if (!new_net_conf) {
3397                 drbd_err(connection, "Allocation of new net_conf failed\n");
3398                 goto disconnect;
3399         }
3400
3401         mutex_lock(&connection->data.mutex);
3402         mutex_lock(&connection->resource->conf_update);
3403         old_net_conf = connection->net_conf;
3404         *new_net_conf = *old_net_conf;
3405
3406         new_net_conf->wire_protocol = p_proto;
3407         new_net_conf->after_sb_0p = convert_after_sb(p_after_sb_0p);
3408         new_net_conf->after_sb_1p = convert_after_sb(p_after_sb_1p);
3409         new_net_conf->after_sb_2p = convert_after_sb(p_after_sb_2p);
3410         new_net_conf->two_primaries = p_two_primaries;
3411
3412         rcu_assign_pointer(connection->net_conf, new_net_conf);
3413         mutex_unlock(&connection->resource->conf_update);
3414         mutex_unlock(&connection->data.mutex);
3415
3416         crypto_free_hash(connection->peer_integrity_tfm);
3417         kfree(connection->int_dig_in);
3418         kfree(connection->int_dig_vv);
3419         connection->peer_integrity_tfm = peer_integrity_tfm;
3420         connection->int_dig_in = int_dig_in;
3421         connection->int_dig_vv = int_dig_vv;
3422
3423         if (strcmp(old_net_conf->integrity_alg, integrity_alg))
3424                 drbd_info(connection, "peer data-integrity-alg: %s\n",
3425                           integrity_alg[0] ? integrity_alg : "(none)");
3426
3427         synchronize_rcu();
3428         kfree(old_net_conf);
3429         return 0;
3430
3431 disconnect_rcu_unlock:
3432         rcu_read_unlock();
3433 disconnect:
3434         crypto_free_hash(peer_integrity_tfm);
3435         kfree(int_dig_in);
3436         kfree(int_dig_vv);
3437         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
3438         return -EIO;
3439 }
3440
3441 /* helper function
3442  * input: alg name, feature name
3443  * return: NULL (alg name was "")
3444  *         ERR_PTR(error) if something goes wrong
3445  *         or the crypto hash ptr, if it worked out ok. */
3446 static struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_device *device,
3447                 const char *alg, const char *name)
3448 {
3449         struct crypto_hash *tfm;
3450
3451         if (!alg[0])
3452                 return NULL;
3453
3454         tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
3455         if (IS_ERR(tfm)) {
3456                 drbd_err(device, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3457                         alg, name, PTR_ERR(tfm));
3458                 return tfm;
3459         }
3460         return tfm;
3461 }
3462
3463 static int ignore_remaining_packet(struct drbd_connection *connection, struct packet_info *pi)
3464 {
3465         void *buffer = connection->data.rbuf;
3466         int size = pi->size;
3467
3468         while (size) {
3469                 int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3470                 s = drbd_recv(connection, buffer, s);
3471                 if (s <= 0) {
3472                         if (s < 0)
3473                                 return s;
3474                         break;
3475                 }
3476                 size -= s;
3477         }
3478         if (size)
3479                 return -EIO;
3480         return 0;
3481 }
3482
3483 /*
3484  * config_unknown_volume  -  device configuration command for unknown volume
3485  *
3486  * When a device is added to an existing connection, the node on which the
3487  * device is added first will send configuration commands to its peer but the
3488  * peer will not know about the device yet.  It will warn and ignore these
3489  * commands.  Once the device is added on the second node, the second node will
3490  * send the same device configuration commands, but in the other direction.
3491  *
3492  * (We can also end up here if drbd is misconfigured.)
3493  */
3494 static int config_unknown_volume(struct drbd_connection *connection, struct packet_info *pi)
3495 {
3496         drbd_warn(connection, "%s packet received for volume %u, which is not configured locally\n",
3497                   cmdname(pi->cmd), pi->vnr);
3498         return ignore_remaining_packet(connection, pi);
3499 }
3500
3501 static int receive_SyncParam(struct drbd_connection *connection, struct packet_info *pi)
3502 {
3503         struct drbd_peer_device *peer_device;
3504         struct drbd_device *device;
3505         struct p_rs_param_95 *p;
3506         unsigned int header_size, data_size, exp_max_sz;
3507         struct crypto_hash *verify_tfm = NULL;
3508         struct crypto_hash *csums_tfm = NULL;
3509         struct net_conf *old_net_conf, *new_net_conf = NULL;
3510         struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL;
3511         const int apv = connection->agreed_pro_version;
3512         struct fifo_buffer *old_plan = NULL, *new_plan = NULL;
3513         int fifo_size = 0;
3514         int err;
3515
3516         peer_device = conn_peer_device(connection, pi->vnr);
3517         if (!peer_device)
3518                 return config_unknown_volume(connection, pi);
3519         device = peer_device->device;
3520
3521         exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
3522                     : apv == 88 ? sizeof(struct p_rs_param)
3523                                         + SHARED_SECRET_MAX
3524                     : apv <= 94 ? sizeof(struct p_rs_param_89)
3525                     : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3526
3527         if (pi->size > exp_max_sz) {
3528                 drbd_err(device, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3529                     pi->size, exp_max_sz);
3530                 return -EIO;
3531         }
3532
3533         if (apv <= 88) {
3534                 header_size = sizeof(struct p_rs_param);
3535                 data_size = pi->size - header_size;
3536         } else if (apv <= 94) {
3537                 header_size = sizeof(struct p_rs_param_89);
3538                 data_size = pi->size - header_size;
3539                 D_ASSERT(device, data_size == 0);
3540         } else {
3541                 header_size = sizeof(struct p_rs_param_95);
3542                 data_size = pi->size - header_size;
3543                 D_ASSERT(device, data_size == 0);
3544         }
3545
3546         /* initialize verify_alg and csums_alg */
3547         p = pi->data;
3548         memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3549
3550         err = drbd_recv_all(peer_device->connection, p, header_size);
3551         if (err)
3552                 return err;
3553
3554         mutex_lock(&connection->resource->conf_update);
3555         old_net_conf = peer_device->connection->net_conf;
3556         if (get_ldev(device)) {
3557                 new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3558                 if (!new_disk_conf) {
3559                         put_ldev(device);
3560                         mutex_unlock(&connection->resource->conf_update);
3561                         drbd_err(device, "Allocation of new disk_conf failed\n");
3562                         return -ENOMEM;
3563                 }
3564
3565                 old_disk_conf = device->ldev->disk_conf;
3566                 *new_disk_conf = *old_disk_conf;
3567
3568                 new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate);
3569         }
3570
3571         if (apv >= 88) {
3572                 if (apv == 88) {
3573                         if (data_size > SHARED_SECRET_MAX || data_size == 0) {
3574                                 drbd_err(device, "verify-alg of wrong size, "
3575                                         "peer wants %u, accepting only up to %u byte\n",
3576                                         data_size, SHARED_SECRET_MAX);
3577                                 err = -EIO;
3578                                 goto reconnect;
3579                         }
3580
3581                         err = drbd_recv_all(peer_device->connection, p->verify_alg, data_size);
3582                         if (err)
3583                                 goto reconnect;
3584                         /* we expect NUL terminated string */
3585                         /* but just in case someone tries to be evil */
3586                         D_ASSERT(device, p->verify_alg[data_size-1] == 0);
3587                         p->verify_alg[data_size-1] = 0;
3588
3589                 } else /* apv >= 89 */ {
3590                         /* we still expect NUL terminated strings */
3591                         /* but just in case someone tries to be evil */
3592                         D_ASSERT(device, p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3593                         D_ASSERT(device, p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3594                         p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3595                         p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3596                 }
3597
3598                 if (strcmp(old_net_conf->verify_alg, p->verify_alg)) {
3599                         if (device->state.conn == C_WF_REPORT_PARAMS) {
3600                                 drbd_err(device, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3601                                     old_net_conf->verify_alg, p->verify_alg);
3602                                 goto disconnect;
3603                         }
3604                         verify_tfm = drbd_crypto_alloc_digest_safe(device,
3605                                         p->verify_alg, "verify-alg");
3606                         if (IS_ERR(verify_tfm)) {
3607                                 verify_tfm = NULL;
3608                                 goto disconnect;
3609                         }
3610                 }
3611
3612                 if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) {
3613                         if (device->state.conn == C_WF_REPORT_PARAMS) {
3614                                 drbd_err(device, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3615                                     old_net_conf->csums_alg, p->csums_alg);
3616                                 goto disconnect;
3617                         }
3618                         csums_tfm = drbd_crypto_alloc_digest_safe(device,
3619                                         p->csums_alg, "csums-alg");
3620                         if (IS_ERR(csums_tfm)) {
3621                                 csums_tfm = NULL;
3622                                 goto disconnect;
3623                         }
3624                 }
3625
3626                 if (apv > 94 && new_disk_conf) {
3627                         new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3628                         new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target);
3629                         new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target);
3630                         new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate);
3631
3632                         fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ;
3633                         if (fifo_size != device->rs_plan_s->size) {
3634                                 new_plan = fifo_alloc(fifo_size);
3635                                 if (!new_plan) {
3636                                         drbd_err(device, "kmalloc of fifo_buffer failed");
3637                                         put_ldev(device);
3638                                         goto disconnect;
3639                                 }
3640                         }
3641                 }
3642
3643                 if (verify_tfm || csums_tfm) {
3644                         new_net_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL);
3645                         if (!new_net_conf) {
3646                                 drbd_err(device, "Allocation of new net_conf failed\n");
3647                                 goto disconnect;
3648                         }
3649
3650                         *new_net_conf = *old_net_conf;
3651
3652                         if (verify_tfm) {
3653                                 strcpy(new_net_conf->verify_alg, p->verify_alg);
3654                                 new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3655                                 crypto_free_hash(peer_device->connection->verify_tfm);
3656                                 peer_device->connection->verify_tfm = verify_tfm;
3657                                 drbd_info(device, "using verify-alg: \"%s\"\n", p->verify_alg);
3658                         }
3659                         if (csums_tfm) {
3660                                 strcpy(new_net_conf->csums_alg, p->csums_alg);
3661                                 new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3662                                 crypto_free_hash(peer_device->connection->csums_tfm);
3663                                 peer_device->connection->csums_tfm = csums_tfm;
3664                                 drbd_info(device, "using csums-alg: \"%s\"\n", p->csums_alg);
3665                         }
3666                         rcu_assign_pointer(connection->net_conf, new_net_conf);
3667                 }
3668         }
3669
3670         if (new_disk_conf) {
3671                 rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
3672                 put_ldev(device);
3673         }
3674
3675         if (new_plan) {
3676                 old_plan = device->rs_plan_s;
3677                 rcu_assign_pointer(device->rs_plan_s, new_plan);
3678         }
3679
3680         mutex_unlock(&connection->resource->conf_update);
3681         synchronize_rcu();
3682         if (new_net_conf)
3683                 kfree(old_net_conf);
3684         kfree(old_disk_conf);
3685         kfree(old_plan);
3686
3687         return 0;
3688
3689 reconnect:
3690         if (new_disk_conf) {
3691                 put_ldev(device);
3692                 kfree(new_disk_conf);
3693         }
3694         mutex_unlock(&connection->resource->conf_update);
3695         return -EIO;
3696
3697 disconnect:
3698         kfree(new_plan);
3699         if (new_disk_conf) {
3700                 put_ldev(device);
3701                 kfree(new_disk_conf);
3702         }
3703         mutex_unlock(&connection->resource->conf_update);
3704         /* just for completeness: actually not needed,
3705          * as this is not reached if csums_tfm was ok. */
3706         crypto_free_hash(csums_tfm);
3707         /* but free the verify_tfm again, if csums_tfm did not work out */
3708         crypto_free_hash(verify_tfm);
3709         conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3710         return -EIO;
3711 }
3712
3713 /* warn if the arguments differ by more than 12.5% */
3714 static void warn_if_differ_considerably(struct drbd_device *device,
3715         const char *s, sector_t a, sector_t b)
3716 {
3717         sector_t d;
3718         if (a == 0 || b == 0)
3719                 return;
3720         d = (a > b) ? (a - b) : (b - a);
3721         if (d > (a>>3) || d > (b>>3))
3722                 drbd_warn(device, "Considerable difference in %s: %llus vs. %llus\n", s,
3723                      (unsigned long long)a, (unsigned long long)b);
3724 }
3725
3726 static int receive_sizes(struct drbd_connection *connection, struct packet_info *pi)
3727 {
3728         struct drbd_peer_device *peer_device;
3729         struct drbd_device *device;
3730         struct p_sizes *p = pi->data;
3731         enum determine_dev_size dd = DS_UNCHANGED;
3732         sector_t p_size, p_usize, p_csize, my_usize;
3733         int ldsc = 0; /* local disk size changed */
3734         enum dds_flags ddsf;
3735
3736         peer_device = conn_peer_device(connection, pi->vnr);
3737         if (!peer_device)
3738                 return config_unknown_volume(connection, pi);
3739         device = peer_device->device;
3740
3741         p_size = be64_to_cpu(p->d_size);
3742         p_usize = be64_to_cpu(p->u_size);
3743         p_csize = be64_to_cpu(p->c_size);
3744
3745         /* just store the peer's disk size for now.
3746          * we still need to figure out whether we accept that. */
3747         device->p_size = p_size;
3748
3749         if (get_ldev(device)) {
3750                 rcu_read_lock();
3751                 my_usize = rcu_dereference(device->ldev->disk_conf)->disk_size;
3752                 rcu_read_unlock();
3753
3754                 warn_if_differ_considerably(device, "lower level device sizes",
3755                            p_size, drbd_get_max_capacity(device->ldev));
3756                 warn_if_differ_considerably(device, "user requested size",
3757                                             p_usize, my_usize);
3758
3759                 /* if this is the first connect, or an otherwise expected
3760                  * param exchange, choose the minimum */
3761                 if (device->state.conn == C_WF_REPORT_PARAMS)
3762                         p_usize = min_not_zero(my_usize, p_usize);
3763
3764                 /* Never shrink a device with usable data during connect.
3765                    But allow online shrinking if we are connected. */
3766                 if (drbd_new_dev_size(device, device->ldev, p_usize, 0) <
3767                     drbd_get_capacity(device->this_bdev) &&
3768                     device->state.disk >= D_OUTDATED &&
3769                     device->state.conn < C_CONNECTED) {
3770                         drbd_err(device, "The peer's disk size is too small!\n");
3771                         conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3772                         put_ldev(device);
3773                         return -EIO;
3774                 }
3775
3776                 if (my_usize != p_usize) {
3777                         struct disk_conf *old_disk_conf, *new_disk_conf = NULL;
3778
3779                         new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3780                         if (!new_disk_conf) {
3781                                 drbd_err(device, "Allocation of new disk_conf failed\n");
3782                                 put_ldev(device);
3783                                 return -ENOMEM;
3784                         }
3785
3786                         mutex_lock(&connection->resource->conf_update);
3787                         old_disk_conf = device->ldev->disk_conf;
3788                         *new_disk_conf = *old_disk_conf;
3789                         new_disk_conf->disk_size = p_usize;
3790
3791                         rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
3792                         mutex_unlock(&connection->resource->conf_update);
3793                         synchronize_rcu();
3794                         kfree(old_disk_conf);
3795
3796                         drbd_info(device, "Peer sets u_size to %lu sectors\n",
3797                                  (unsigned long)my_usize);
3798                 }
3799
3800                 put_ldev(device);
3801         }
3802
3803         device->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3804         /* Leave drbd_reconsider_max_bio_size() before drbd_determine_dev_size().
3805            In case we cleared the QUEUE_FLAG_DISCARD from our queue in
3806            drbd_reconsider_max_bio_size(), we can be sure that after
3807            drbd_determine_dev_size() no REQ_DISCARDs are in the queue. */
3808
3809         ddsf = be16_to_cpu(p->dds_flags);
3810         if (get_ldev(device)) {
3811                 drbd_reconsider_max_bio_size(device, device->ldev);
3812                 dd = drbd_determine_dev_size(device, ddsf, NULL);
3813                 put_ldev(device);
3814                 if (dd == DS_ERROR)
3815                         return -EIO;
3816                 drbd_md_sync(device);
3817         } else {
3818                 /*
3819                  * I am diskless, need to accept the peer's *current* size.
3820                  * I must NOT accept the peers backing disk size,
3821                  * it may have been larger than mine all along...
3822                  *
3823                  * At this point, the peer knows more about my disk, or at
3824                  * least about what we last agreed upon, than myself.
3825                  * So if his c_size is less than his d_size, the most likely
3826                  * reason is that *my* d_size was smaller last time we checked.
3827                  *
3828                  * However, if he sends a zero current size,
3829                  * take his (user-capped or) backing disk size anyways.
3830                  */
3831                 drbd_reconsider_max_bio_size(device, NULL);
3832                 drbd_set_my_capacity(device, p_csize ?: p_usize ?: p_size);
3833         }
3834
3835         if (get_ldev(device)) {
3836                 if (device->ldev->known_size != drbd_get_capacity(device->ldev->backing_bdev)) {
3837                         device->ldev->known_size = drbd_get_capacity(device->ldev->backing_bdev);
3838                         ldsc = 1;
3839                 }
3840
3841                 put_ldev(device);
3842         }
3843
3844         if (device->state.conn > C_WF_REPORT_PARAMS) {
3845                 if (be64_to_cpu(p->c_size) !=
3846                     drbd_get_capacity(device->this_bdev) || ldsc) {
3847                         /* we have different sizes, probably peer
3848                          * needs to know my new size... */
3849                         drbd_send_sizes(peer_device, 0, ddsf);
3850                 }
3851                 if (test_and_clear_bit(RESIZE_PENDING, &device->flags) ||
3852                     (dd == DS_GREW && device->state.conn == C_CONNECTED)) {
3853                         if (device->state.pdsk >= D_INCONSISTENT &&
3854                             device->state.disk >= D_INCONSISTENT) {
3855                                 if (ddsf & DDSF_NO_RESYNC)
3856                                         drbd_info(device, "Resync of new storage suppressed with --assume-clean\n");
3857                                 else
3858                                         resync_after_online_grow(device);
3859                         } else
3860                                 set_bit(RESYNC_AFTER_NEG, &device->flags);
3861                 }
3862         }
3863
3864         return 0;
3865 }
3866
3867 static int receive_uuids(struct drbd_connection *connection, struct packet_info *pi)
3868 {
3869         struct drbd_peer_device *peer_device;
3870         struct drbd_device *device;
3871         struct p_uuids *p = pi->data;
3872         u64 *p_uuid;
3873         int i, updated_uuids = 0;
3874
3875         peer_device = conn_peer_device(connection, pi->vnr);
3876         if (!peer_device)
3877                 return config_unknown_volume(connection, pi);
3878         device = peer_device->device;
3879
3880         p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3881         if (!p_uuid) {
3882                 drbd_err(device, "kmalloc of p_uuid failed\n");
3883                 return false;
3884         }
3885
3886         for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3887                 p_uuid[i] = be64_to_cpu(p->uuid[i]);
3888
3889         kfree(device->p_uuid);
3890         device->p_uuid = p_uuid;
3891
3892         if (device->state.conn < C_CONNECTED &&
3893             device->state.disk < D_INCONSISTENT &&
3894             device->state.role == R_PRIMARY &&
3895             (device->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3896                 drbd_err(device, "Can only connect to data with current UUID=%016llX\n",
3897                     (unsigned long long)device->ed_uuid);
3898                 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
3899                 return -EIO;
3900         }
3901
3902         if (get_ldev(device)) {
3903                 int skip_initial_sync =
3904                         device->state.conn == C_CONNECTED &&
3905                         peer_device->connection->agreed_pro_version >= 90 &&
3906                         device->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3907                         (p_uuid[UI_FLAGS] & 8);
3908                 if (skip_initial_sync) {
3909                         drbd_info(device, "Accepted new current UUID, preparing to skip initial sync\n");
3910                         drbd_bitmap_io(device, &drbd_bmio_clear_n_write,
3911                                         "clear_n_write from receive_uuids",
3912                                         BM_LOCKED_TEST_ALLOWED);
3913                         _drbd_uuid_set(device, UI_CURRENT, p_uuid[UI_CURRENT]);
3914                         _drbd_uuid_set(device, UI_BITMAP, 0);
3915                         _drbd_set_state(_NS2(device, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3916                                         CS_VERBOSE, NULL);
3917                         drbd_md_sync(device);
3918                         updated_uuids = 1;
3919                 }
3920                 put_ldev(device);
3921         } else if (device->state.disk < D_INCONSISTENT &&
3922                    device->state.role == R_PRIMARY) {
3923                 /* I am a diskless primary, the peer just created a new current UUID
3924                    for me. */
3925                 updated_uuids = drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
3926         }
3927
3928         /* Before we test for the disk state, we should wait until an eventually
3929            ongoing cluster wide state change is finished. That is important if
3930            we are primary and are detaching from our disk. We need to see the
3931            new disk state... */
3932         mutex_lock(device->state_mutex);
3933         mutex_unlock(device->state_mutex);
3934         if (device->state.conn >= C_CONNECTED && device->state.disk < D_INCONSISTENT)
3935                 updated_uuids |= drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
3936
3937         if (updated_uuids)
3938                 drbd_print_uuids(device, "receiver updated UUIDs to");
3939
3940         return 0;
3941 }
3942
3943 /**
3944  * convert_state() - Converts the peer's view of the cluster state to our point of view
3945  * @ps:         The state as seen by the peer.
3946  */
3947 static union drbd_state convert_state(union drbd_state ps)
3948 {
3949         union drbd_state ms;
3950
3951         static enum drbd_conns c_tab[] = {
3952                 [C_WF_REPORT_PARAMS] = C_WF_REPORT_PARAMS,
3953                 [C_CONNECTED] = C_CONNECTED,
3954
3955                 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3956                 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3957                 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3958                 [C_VERIFY_S]       = C_VERIFY_T,
3959                 [C_MASK]   = C_MASK,
3960         };
3961
3962         ms.i = ps.i;
3963
3964         ms.conn = c_tab[ps.conn];
3965         ms.peer = ps.role;
3966         ms.role = ps.peer;
3967         ms.pdsk = ps.disk;
3968         ms.disk = ps.pdsk;
3969         ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3970
3971         return ms;
3972 }
3973
3974 static int receive_req_state(struct drbd_connection *connection, struct packet_info *pi)
3975 {
3976         struct drbd_peer_device *peer_device;
3977         struct drbd_device *device;
3978         struct p_req_state *p = pi->data;
3979         union drbd_state mask, val;
3980         enum drbd_state_rv rv;
3981
3982         peer_device = conn_peer_device(connection, pi->vnr);
3983         if (!peer_device)
3984                 return -EIO;
3985         device = peer_device->device;
3986
3987         mask.i = be32_to_cpu(p->mask);
3988         val.i = be32_to_cpu(p->val);
3989
3990         if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags) &&
3991             mutex_is_locked(device->state_mutex)) {
3992                 drbd_send_sr_reply(peer_device, SS_CONCURRENT_ST_CHG);
3993                 return 0;
3994         }
3995
3996         mask = convert_state(mask);
3997         val = convert_state(val);
3998
3999         rv = drbd_change_state(device, CS_VERBOSE, mask, val);
4000         drbd_send_sr_reply(peer_device, rv);
4001
4002         drbd_md_sync(device);
4003
4004         return 0;
4005 }
4006
4007 static int receive_req_conn_state(struct drbd_connection *connection, struct packet_info *pi)
4008 {
4009         struct p_req_state *p = pi->data;
4010         union drbd_state mask, val;
4011         enum drbd_state_rv rv;
4012
4013         mask.i = be32_to_cpu(p->mask);
4014         val.i = be32_to_cpu(p->val);
4015
4016         if (test_bit(RESOLVE_CONFLICTS, &connection->flags) &&
4017             mutex_is_locked(&connection->cstate_mutex)) {
4018                 conn_send_sr_reply(connection, SS_CONCURRENT_ST_CHG);
4019                 return 0;
4020         }
4021
4022         mask = convert_state(mask);
4023         val = convert_state(val);
4024
4025         rv = conn_request_state(connection, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
4026         conn_send_sr_reply(connection, rv);
4027
4028         return 0;
4029 }
4030
4031 static int receive_state(struct drbd_connection *connection, struct packet_info *pi)
4032 {
4033         struct drbd_peer_device *peer_device;
4034         struct drbd_device *device;
4035         struct p_state *p = pi->data;
4036         union drbd_state os, ns, peer_state;
4037         enum drbd_disk_state real_peer_disk;
4038         enum chg_state_flags cs_flags;
4039         int rv;
4040
4041         peer_device = conn_peer_device(connection, pi->vnr);
4042         if (!peer_device)
4043                 return config_unknown_volume(connection, pi);
4044         device = peer_device->device;
4045
4046         peer_state.i = be32_to_cpu(p->state);
4047
4048         real_peer_disk = peer_state.disk;
4049         if (peer_state.disk == D_NEGOTIATING) {
4050                 real_peer_disk = device->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
4051                 drbd_info(device, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
4052         }
4053
4054         spin_lock_irq(&device->resource->req_lock);
4055  retry:
4056         os = ns = drbd_read_state(device);
4057         spin_unlock_irq(&device->resource->req_lock);
4058
4059         /* If some other part of the code (asender thread, timeout)
4060          * already decided to close the connection again,
4061          * we must not "re-establish" it here. */
4062         if (os.conn <= C_TEAR_DOWN)
4063                 return -ECONNRESET;
4064
4065         /* If this is the "end of sync" confirmation, usually the peer disk
4066          * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits
4067          * set) resync started in PausedSyncT, or if the timing of pause-/
4068          * unpause-sync events has been "just right", the peer disk may
4069          * transition from D_CONSISTENT to D_UP_TO_DATE as well.
4070          */
4071         if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) &&
4072             real_peer_disk == D_UP_TO_DATE &&
4073             os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
4074                 /* If we are (becoming) SyncSource, but peer is still in sync
4075                  * preparation, ignore its uptodate-ness to avoid flapping, it
4076                  * will change to inconsistent once the peer reaches active
4077                  * syncing states.
4078                  * It may have changed syncer-paused flags, however, so we
4079                  * cannot ignore this completely. */
4080                 if (peer_state.conn > C_CONNECTED &&
4081                     peer_state.conn < C_SYNC_SOURCE)
4082                         real_peer_disk = D_INCONSISTENT;
4083
4084                 /* if peer_state changes to connected at the same time,
4085                  * it explicitly notifies us that it finished resync.
4086                  * Maybe we should finish it up, too? */
4087                 else if (os.conn >= C_SYNC_SOURCE &&
4088                          peer_state.conn == C_CONNECTED) {
4089                         if (drbd_bm_total_weight(device) <= device->rs_failed)
4090                                 drbd_resync_finished(device);
4091                         return 0;
4092                 }
4093         }
4094
4095         /* explicit verify finished notification, stop sector reached. */
4096         if (os.conn == C_VERIFY_T && os.disk == D_UP_TO_DATE &&
4097             peer_state.conn == C_CONNECTED && real_peer_disk == D_UP_TO_DATE) {
4098                 ov_out_of_sync_print(device);
4099                 drbd_resync_finished(device);
4100                 return 0;
4101         }
4102
4103         /* peer says his disk is inconsistent, while we think it is uptodate,
4104          * and this happens while the peer still thinks we have a sync going on,
4105          * but we think we are already done with the sync.
4106          * We ignore this to avoid flapping pdsk.
4107          * This should not happen, if the peer is a recent version of drbd. */
4108         if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
4109             os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
4110                 real_peer_disk = D_UP_TO_DATE;
4111
4112         if (ns.conn == C_WF_REPORT_PARAMS)
4113                 ns.conn = C_CONNECTED;
4114
4115         if (peer_state.conn == C_AHEAD)
4116                 ns.conn = C_BEHIND;
4117
4118         if (device->p_uuid && peer_state.disk >= D_NEGOTIATING &&
4119             get_ldev_if_state(device, D_NEGOTIATING)) {
4120                 int cr; /* consider resync */
4121
4122                 /* if we established a new connection */
4123                 cr  = (os.conn < C_CONNECTED);
4124                 /* if we had an established connection
4125                  * and one of the nodes newly attaches a disk */
4126                 cr |= (os.conn == C_CONNECTED &&
4127                        (peer_state.disk == D_NEGOTIATING ||
4128                         os.disk == D_NEGOTIATING));
4129                 /* if we have both been inconsistent, and the peer has been
4130                  * forced to be UpToDate with --overwrite-data */
4131                 cr |= test_bit(CONSIDER_RESYNC, &device->flags);
4132                 /* if we had been plain connected, and the admin requested to
4133                  * start a sync by "invalidate" or "invalidate-remote" */
4134                 cr |= (os.conn == C_CONNECTED &&
4135                                 (peer_state.conn >= C_STARTING_SYNC_S &&
4136                                  peer_state.conn <= C_WF_BITMAP_T));
4137
4138                 if (cr)
4139                         ns.conn = drbd_sync_handshake(peer_device, peer_state.role, real_peer_disk);
4140
4141                 put_ldev(device);
4142                 if (ns.conn == C_MASK) {
4143                         ns.conn = C_CONNECTED;
4144                         if (device->state.disk == D_NEGOTIATING) {
4145                                 drbd_force_state(device, NS(disk, D_FAILED));
4146                         } else if (peer_state.disk == D_NEGOTIATING) {
4147                                 drbd_err(device, "Disk attach process on the peer node was aborted.\n");
4148                                 peer_state.disk = D_DISKLESS;
4149                                 real_peer_disk = D_DISKLESS;
4150                         } else {
4151                                 if (test_and_clear_bit(CONN_DRY_RUN, &peer_device->connection->flags))
4152                                         return -EIO;
4153                                 D_ASSERT(device, os.conn == C_WF_REPORT_PARAMS);
4154                                 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4155                                 return -EIO;
4156                         }
4157                 }
4158         }
4159
4160         spin_lock_irq(&device->resource->req_lock);
4161         if (os.i != drbd_read_state(device).i)
4162                 goto retry;
4163         clear_bit(CONSIDER_RESYNC, &device->flags);
4164         ns.peer = peer_state.role;
4165         ns.pdsk = real_peer_disk;
4166         ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
4167         if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
4168                 ns.disk = device->new_state_tmp.disk;
4169         cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
4170         if (ns.pdsk == D_CONSISTENT && drbd_suspended(device) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
4171             test_bit(NEW_CUR_UUID, &device->flags)) {
4172                 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
4173                    for temporal network outages! */
4174                 spin_unlock_irq(&device->resource->req_lock);
4175                 drbd_err(device, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
4176                 tl_clear(peer_device->connection);
4177                 drbd_uuid_new_current(device);
4178                 clear_bit(NEW_CUR_UUID, &device->flags);
4179                 conn_request_state(peer_device->connection, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
4180                 return -EIO;
4181         }
4182         rv = _drbd_set_state(device, ns, cs_flags, NULL);
4183         ns = drbd_read_state(device);
4184         spin_unlock_irq(&device->resource->req_lock);
4185
4186         if (rv < SS_SUCCESS) {
4187                 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4188                 return -EIO;
4189         }
4190
4191         if (os.conn > C_WF_REPORT_PARAMS) {
4192                 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
4193                     peer_state.disk != D_NEGOTIATING ) {
4194                         /* we want resync, peer has not yet decided to sync... */
4195                         /* Nowadays only used when forcing a node into primary role and
4196                            setting its disk to UpToDate with that */
4197                         drbd_send_uuids(peer_device);
4198                         drbd_send_current_state(peer_device);
4199                 }
4200         }
4201
4202         clear_bit(DISCARD_MY_DATA, &device->flags);
4203
4204         drbd_md_sync(device); /* update connected indicator, la_size_sect, ... */
4205
4206         return 0;
4207 }
4208
4209 static int receive_sync_uuid(struct drbd_connection *connection, struct packet_info *pi)
4210 {
4211         struct drbd_peer_device *peer_device;
4212         struct drbd_device *device;
4213         struct p_rs_uuid *p = pi->data;
4214
4215         peer_device = conn_peer_device(connection, pi->vnr);
4216         if (!peer_device)
4217                 return -EIO;
4218         device = peer_device->device;
4219
4220         wait_event(device->misc_wait,
4221                    device->state.conn == C_WF_SYNC_UUID ||
4222                    device->state.conn == C_BEHIND ||
4223                    device->state.conn < C_CONNECTED ||
4224                    device->state.disk < D_NEGOTIATING);
4225
4226         /* D_ASSERT(device,  device->state.conn == C_WF_SYNC_UUID ); */
4227
4228         /* Here the _drbd_uuid_ functions are right, current should
4229            _not_ be rotated into the history */
4230         if (get_ldev_if_state(device, D_NEGOTIATING)) {
4231                 _drbd_uuid_set(device, UI_CURRENT, be64_to_cpu(p->uuid));
4232                 _drbd_uuid_set(device, UI_BITMAP, 0UL);
4233
4234                 drbd_print_uuids(device, "updated sync uuid");
4235                 drbd_start_resync(device, C_SYNC_TARGET);
4236
4237                 put_ldev(device);
4238         } else
4239                 drbd_err(device, "Ignoring SyncUUID packet!\n");
4240
4241         return 0;
4242 }
4243
4244 /**
4245  * receive_bitmap_plain
4246  *
4247  * Return 0 when done, 1 when another iteration is needed, and a negative error
4248  * code upon failure.
4249  */
4250 static int
4251 receive_bitmap_plain(struct drbd_peer_device *peer_device, unsigned int size,
4252                      unsigned long *p, struct bm_xfer_ctx *c)
4253 {
4254         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
4255                                  drbd_header_size(peer_device->connection);
4256         unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
4257                                        c->bm_words - c->word_offset);
4258         unsigned int want = num_words * sizeof(*p);
4259         int err;
4260
4261         if (want != size) {
4262                 drbd_err(peer_device, "%s:want (%u) != size (%u)\n", __func__, want, size);
4263                 return -EIO;
4264         }
4265         if (want == 0)
4266                 return 0;
4267         err = drbd_recv_all(peer_device->connection, p, want);
4268         if (err)
4269                 return err;
4270
4271         drbd_bm_merge_lel(peer_device->device, c->word_offset, num_words, p);
4272
4273         c->word_offset += num_words;
4274         c->bit_offset = c->word_offset * BITS_PER_LONG;
4275         if (c->bit_offset > c->bm_bits)
4276                 c->bit_offset = c->bm_bits;
4277
4278         return 1;
4279 }
4280
4281 static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
4282 {
4283         return (enum drbd_bitmap_code)(p->encoding & 0x0f);
4284 }
4285
4286 static int dcbp_get_start(struct p_compressed_bm *p)
4287 {
4288         return (p->encoding & 0x80) != 0;
4289 }
4290
4291 static int dcbp_get_pad_bits(struct p_compressed_bm *p)
4292 {
4293         return (p->encoding >> 4) & 0x7;
4294 }
4295
4296 /**
4297  * recv_bm_rle_bits
4298  *
4299  * Return 0 when done, 1 when another iteration is needed, and a negative error
4300  * code upon failure.
4301  */
4302 static int
4303 recv_bm_rle_bits(struct drbd_peer_device *peer_device,
4304                 struct p_compressed_bm *p,
4305                  struct bm_xfer_ctx *c,
4306                  unsigned int len)
4307 {
4308         struct bitstream bs;
4309         u64 look_ahead;
4310         u64 rl;
4311         u64 tmp;
4312         unsigned long s = c->bit_offset;
4313         unsigned long e;
4314         int toggle = dcbp_get_start(p);
4315         int have;
4316         int bits;
4317
4318         bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
4319
4320         bits = bitstream_get_bits(&bs, &look_ahead, 64);
4321         if (bits < 0)
4322                 return -EIO;
4323
4324         for (have = bits; have > 0; s += rl, toggle = !toggle) {
4325                 bits = vli_decode_bits(&rl, look_ahead);
4326                 if (bits <= 0)
4327                         return -EIO;
4328
4329                 if (toggle) {
4330                         e = s + rl -1;
4331                         if (e >= c->bm_bits) {
4332                                 drbd_err(peer_device, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
4333                                 return -EIO;
4334                         }
4335                         _drbd_bm_set_bits(peer_device->device, s, e);
4336                 }
4337
4338                 if (have < bits) {
4339                         drbd_err(peer_device, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
4340                                 have, bits, look_ahead,
4341                                 (unsigned int)(bs.cur.b - p->code),
4342                                 (unsigned int)bs.buf_len);
4343                         return -EIO;
4344                 }
4345                 /* if we consumed all 64 bits, assign 0; >> 64 is "undefined"; */
4346                 if (likely(bits < 64))
4347                         look_ahead >>= bits;
4348                 else
4349                         look_ahead = 0;
4350                 have -= bits;
4351
4352                 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
4353                 if (bits < 0)
4354                         return -EIO;
4355                 look_ahead |= tmp << have;
4356                 have += bits;
4357         }
4358
4359         c->bit_offset = s;
4360         bm_xfer_ctx_bit_to_word_offset(c);
4361
4362         return (s != c->bm_bits);
4363 }
4364
4365 /**
4366  * decode_bitmap_c
4367  *
4368  * Return 0 when done, 1 when another iteration is needed, and a negative error
4369  * code upon failure.
4370  */
4371 static int
4372 decode_bitmap_c(struct drbd_peer_device *peer_device,
4373                 struct p_compressed_bm *p,
4374                 struct bm_xfer_ctx *c,
4375                 unsigned int len)
4376 {
4377         if (dcbp_get_code(p) == RLE_VLI_Bits)
4378                 return recv_bm_rle_bits(peer_device, p, c, len - sizeof(*p));
4379
4380         /* other variants had been implemented for evaluation,
4381          * but have been dropped as this one turned out to be "best"
4382          * during all our tests. */
4383
4384         drbd_err(peer_device, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
4385         conn_request_state(peer_device->connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4386         return -EIO;
4387 }
4388
4389 void INFO_bm_xfer_stats(struct drbd_device *device,
4390                 const char *direction, struct bm_xfer_ctx *c)
4391 {
4392         /* what would it take to transfer it "plaintext" */
4393         unsigned int header_size = drbd_header_size(first_peer_device(device)->connection);
4394         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
4395         unsigned int plain =
4396                 header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
4397                 c->bm_words * sizeof(unsigned long);
4398         unsigned int total = c->bytes[0] + c->bytes[1];
4399         unsigned int r;
4400
4401         /* total can not be zero. but just in case: */
4402         if (total == 0)
4403                 return;
4404
4405         /* don't report if not compressed */
4406         if (total >= plain)
4407                 return;
4408
4409         /* total < plain. check for overflow, still */
4410         r = (total > UINT_MAX/1000) ? (total / (plain/1000))
4411                                     : (1000 * total / plain);
4412
4413         if (r > 1000)
4414                 r = 1000;
4415
4416         r = 1000 - r;
4417         drbd_info(device, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
4418              "total %u; compression: %u.%u%%\n",
4419                         direction,
4420                         c->bytes[1], c->packets[1],
4421                         c->bytes[0], c->packets[0],
4422                         total, r/10, r % 10);
4423 }
4424
4425 /* Since we are processing the bitfield from lower addresses to higher,
4426    it does not matter if the process it in 32 bit chunks or 64 bit
4427    chunks as long as it is little endian. (Understand it as byte stream,
4428    beginning with the lowest byte...) If we would use big endian
4429    we would need to process it from the highest address to the lowest,
4430    in order to be agnostic to the 32 vs 64 bits issue.
4431
4432    returns 0 on failure, 1 if we successfully received it. */
4433 static int receive_bitmap(struct drbd_connection *connection, struct packet_info *pi)
4434 {
4435         struct drbd_peer_device *peer_device;
4436         struct drbd_device *device;
4437         struct bm_xfer_ctx c;
4438         int err;
4439
4440         peer_device = conn_peer_device(connection, pi->vnr);
4441         if (!peer_device)
4442                 return -EIO;
4443         device = peer_device->device;
4444
4445         drbd_bm_lock(device, "receive bitmap", BM_LOCKED_SET_ALLOWED);
4446         /* you are supposed to send additional out-of-sync information
4447          * if you actually set bits during this phase */
4448
4449         c = (struct bm_xfer_ctx) {
4450                 .bm_bits = drbd_bm_bits(device),
4451                 .bm_words = drbd_bm_words(device),
4452         };
4453
4454         for(;;) {
4455                 if (pi->cmd == P_BITMAP)
4456                         err = receive_bitmap_plain(peer_device, pi->size, pi->data, &c);
4457                 else if (pi->cmd == P_COMPRESSED_BITMAP) {
4458                         /* MAYBE: sanity check that we speak proto >= 90,
4459                          * and the feature is enabled! */
4460                         struct p_compressed_bm *p = pi->data;
4461
4462                         if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(connection)) {
4463                                 drbd_err(device, "ReportCBitmap packet too large\n");
4464                                 err = -EIO;
4465                                 goto out;
4466                         }
4467                         if (pi->size <= sizeof(*p)) {
4468                                 drbd_err(device, "ReportCBitmap packet too small (l:%u)\n", pi->size);
4469                                 err = -EIO;
4470                                 goto out;
4471                         }
4472                         err = drbd_recv_all(peer_device->connection, p, pi->size);
4473                         if (err)
4474                                goto out;
4475                         err = decode_bitmap_c(peer_device, p, &c, pi->size);
4476                 } else {
4477                         drbd_warn(device, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
4478                         err = -EIO;
4479                         goto out;
4480                 }
4481
4482                 c.packets[pi->cmd == P_BITMAP]++;
4483                 c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(connection) + pi->size;
4484
4485                 if (err <= 0) {
4486                         if (err < 0)
4487                                 goto out;
4488                         break;
4489                 }
4490                 err = drbd_recv_header(peer_device->connection, pi);
4491                 if (err)
4492                         goto out;
4493         }
4494
4495         INFO_bm_xfer_stats(device, "receive", &c);
4496
4497         if (device->state.conn == C_WF_BITMAP_T) {
4498                 enum drbd_state_rv rv;
4499
4500                 err = drbd_send_bitmap(device);
4501                 if (err)
4502                         goto out;
4503                 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
4504                 rv = _drbd_request_state(device, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
4505                 D_ASSERT(device, rv == SS_SUCCESS);
4506         } else if (device->state.conn != C_WF_BITMAP_S) {
4507                 /* admin may have requested C_DISCONNECTING,
4508                  * other threads may have noticed network errors */
4509                 drbd_info(device, "unexpected cstate (%s) in receive_bitmap\n",
4510                     drbd_conn_str(device->state.conn));
4511         }
4512         err = 0;
4513
4514  out:
4515         drbd_bm_unlock(device);
4516         if (!err && device->state.conn == C_WF_BITMAP_S)
4517                 drbd_start_resync(device, C_SYNC_SOURCE);
4518         return err;
4519 }
4520
4521 static int receive_skip(struct drbd_connection *connection, struct packet_info *pi)
4522 {
4523         drbd_warn(connection, "skipping unknown optional packet type %d, l: %d!\n",
4524                  pi->cmd, pi->size);
4525
4526         return ignore_remaining_packet(connection, pi);
4527 }
4528
4529 static int receive_UnplugRemote(struct drbd_connection *connection, struct packet_info *pi)
4530 {
4531         /* Make sure we've acked all the TCP data associated
4532          * with the data requests being unplugged */
4533         drbd_tcp_quickack(connection->data.socket);
4534
4535         return 0;
4536 }
4537
4538 static int receive_out_of_sync(struct drbd_connection *connection, struct packet_info *pi)
4539 {
4540         struct drbd_peer_device *peer_device;
4541         struct drbd_device *device;
4542         struct p_block_desc *p = pi->data;
4543
4544         peer_device = conn_peer_device(connection, pi->vnr);
4545         if (!peer_device)
4546                 return -EIO;
4547         device = peer_device->device;
4548
4549         switch (device->state.conn) {
4550         case C_WF_SYNC_UUID:
4551         case C_WF_BITMAP_T:
4552         case C_BEHIND:
4553                         break;
4554         default:
4555                 drbd_err(device, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
4556                                 drbd_conn_str(device->state.conn));
4557         }
4558
4559         drbd_set_out_of_sync(device, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
4560
4561         return 0;
4562 }
4563
4564 struct data_cmd {
4565         int expect_payload;
4566         size_t pkt_size;
4567         int (*fn)(struct drbd_connection *, struct packet_info *);
4568 };
4569
4570 static struct data_cmd drbd_cmd_handler[] = {
4571         [P_DATA]            = { 1, sizeof(struct p_data), receive_Data },
4572         [P_DATA_REPLY]      = { 1, sizeof(struct p_data), receive_DataReply },
4573         [P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
4574         [P_BARRIER]         = { 0, sizeof(struct p_barrier), receive_Barrier } ,
4575         [P_BITMAP]          = { 1, 0, receive_bitmap } ,
4576         [P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
4577         [P_UNPLUG_REMOTE]   = { 0, 0, receive_UnplugRemote },
4578         [P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
4579         [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4580         [P_SYNC_PARAM]      = { 1, 0, receive_SyncParam },
4581         [P_SYNC_PARAM89]    = { 1, 0, receive_SyncParam },
4582         [P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
4583         [P_UUIDS]           = { 0, sizeof(struct p_uuids), receive_uuids },
4584         [P_SIZES]           = { 0, sizeof(struct p_sizes), receive_sizes },
4585         [P_STATE]           = { 0, sizeof(struct p_state), receive_state },
4586         [P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
4587         [P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
4588         [P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
4589         [P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
4590         [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4591         [P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
4592         [P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
4593         [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
4594         [P_PROTOCOL_UPDATE] = { 1, sizeof(struct p_protocol), receive_protocol },
4595         [P_TRIM]            = { 0, sizeof(struct p_trim), receive_Data },
4596 };
4597
4598 static void drbdd(struct drbd_connection *connection)
4599 {
4600         struct packet_info pi;
4601         size_t shs; /* sub header size */
4602         int err;
4603
4604         while (get_t_state(&connection->receiver) == RUNNING) {
4605                 struct data_cmd *cmd;
4606
4607                 drbd_thread_current_set_cpu(&connection->receiver);
4608                 update_receiver_timing_details(connection, drbd_recv_header);
4609                 if (drbd_recv_header(connection, &pi))
4610                         goto err_out;
4611
4612                 cmd = &drbd_cmd_handler[pi.cmd];
4613                 if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
4614                         drbd_err(connection, "Unexpected data packet %s (0x%04x)",
4615                                  cmdname(pi.cmd), pi.cmd);
4616                         goto err_out;
4617                 }
4618
4619                 shs = cmd->pkt_size;
4620                 if (pi.size > shs && !cmd->expect_payload) {
4621                         drbd_err(connection, "No payload expected %s l:%d\n",
4622                                  cmdname(pi.cmd), pi.size);
4623                         goto err_out;
4624                 }
4625
4626                 if (shs) {
4627                         update_receiver_timing_details(connection, drbd_recv_all_warn);
4628                         err = drbd_recv_all_warn(connection, pi.data, shs);
4629                         if (err)
4630                                 goto err_out;
4631                         pi.size -= shs;
4632                 }
4633
4634                 update_receiver_timing_details(connection, cmd->fn);
4635                 err = cmd->fn(connection, &pi);
4636                 if (err) {
4637                         drbd_err(connection, "error receiving %s, e: %d l: %d!\n",
4638                                  cmdname(pi.cmd), err, pi.size);
4639                         goto err_out;
4640                 }
4641         }
4642         return;
4643
4644     err_out:
4645         conn_request_state(connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4646 }
4647
4648 static void conn_disconnect(struct drbd_connection *connection)
4649 {
4650         struct drbd_peer_device *peer_device;
4651         enum drbd_conns oc;
4652         int vnr;
4653
4654         if (connection->cstate == C_STANDALONE)
4655                 return;
4656
4657         /* We are about to start the cleanup after connection loss.
4658          * Make sure drbd_make_request knows about that.
4659          * Usually we should be in some network failure state already,
4660          * but just in case we are not, we fix it up here.
4661          */
4662         conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
4663
4664         /* asender does not clean up anything. it must not interfere, either */
4665         drbd_thread_stop(&connection->asender);
4666         drbd_free_sock(connection);
4667
4668         rcu_read_lock();
4669         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
4670                 struct drbd_device *device = peer_device->device;
4671                 kref_get(&device->kref);
4672                 rcu_read_unlock();
4673                 drbd_disconnected(peer_device);
4674                 kref_put(&device->kref, drbd_destroy_device);
4675                 rcu_read_lock();
4676         }
4677         rcu_read_unlock();
4678
4679         if (!list_empty(&connection->current_epoch->list))
4680                 drbd_err(connection, "ASSERTION FAILED: connection->current_epoch->list not empty\n");
4681         /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
4682         atomic_set(&connection->current_epoch->epoch_size, 0);
4683         connection->send.seen_any_write_yet = false;
4684
4685         drbd_info(connection, "Connection closed\n");
4686
4687         if (conn_highest_role(connection) == R_PRIMARY && conn_highest_pdsk(connection) >= D_UNKNOWN)
4688                 conn_try_outdate_peer_async(connection);
4689
4690         spin_lock_irq(&connection->resource->req_lock);
4691         oc = connection->cstate;
4692         if (oc >= C_UNCONNECTED)
4693                 _conn_request_state(connection, NS(conn, C_UNCONNECTED), CS_VERBOSE);
4694
4695         spin_unlock_irq(&connection->resource->req_lock);
4696
4697         if (oc == C_DISCONNECTING)
4698                 conn_request_state(connection, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD);
4699 }
4700
4701 static int drbd_disconnected(struct drbd_peer_device *peer_device)
4702 {
4703         struct drbd_device *device = peer_device->device;
4704         unsigned int i;
4705
4706         /* wait for current activity to cease. */
4707         spin_lock_irq(&device->resource->req_lock);
4708         _drbd_wait_ee_list_empty(device, &device->active_ee);
4709         _drbd_wait_ee_list_empty(device, &device->sync_ee);
4710         _drbd_wait_ee_list_empty(device, &device->read_ee);
4711         spin_unlock_irq(&device->resource->req_lock);
4712
4713         /* We do not have data structures that would allow us to
4714          * get the rs_pending_cnt down to 0 again.
4715          *  * On C_SYNC_TARGET we do not have any data structures describing
4716          *    the pending RSDataRequest's we have sent.
4717          *  * On C_SYNC_SOURCE there is no data structure that tracks
4718          *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
4719          *  And no, it is not the sum of the reference counts in the
4720          *  resync_LRU. The resync_LRU tracks the whole operation including
4721          *  the disk-IO, while the rs_pending_cnt only tracks the blocks
4722          *  on the fly. */
4723         drbd_rs_cancel_all(device);
4724         device->rs_total = 0;
4725         device->rs_failed = 0;
4726         atomic_set(&device->rs_pending_cnt, 0);
4727         wake_up(&device->misc_wait);
4728
4729         del_timer_sync(&device->resync_timer);
4730         resync_timer_fn((unsigned long)device);
4731
4732         /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
4733          * w_make_resync_request etc. which may still be on the worker queue
4734          * to be "canceled" */
4735         drbd_flush_workqueue(&peer_device->connection->sender_work);
4736
4737         drbd_finish_peer_reqs(device);
4738
4739         /* This second workqueue flush is necessary, since drbd_finish_peer_reqs()
4740            might have issued a work again. The one before drbd_finish_peer_reqs() is
4741            necessary to reclain net_ee in drbd_finish_peer_reqs(). */
4742         drbd_flush_workqueue(&peer_device->connection->sender_work);
4743
4744         /* need to do it again, drbd_finish_peer_reqs() may have populated it
4745          * again via drbd_try_clear_on_disk_bm(). */
4746         drbd_rs_cancel_all(device);
4747
4748         kfree(device->p_uuid);
4749         device->p_uuid = NULL;
4750
4751         if (!drbd_suspended(device))
4752                 tl_clear(peer_device->connection);
4753
4754         drbd_md_sync(device);
4755
4756         /* serialize with bitmap writeout triggered by the state change,
4757          * if any. */
4758         wait_event(device->misc_wait, !test_bit(BITMAP_IO, &device->flags));
4759
4760         /* tcp_close and release of sendpage pages can be deferred.  I don't
4761          * want to use SO_LINGER, because apparently it can be deferred for
4762          * more than 20 seconds (longest time I checked).
4763          *
4764          * Actually we don't care for exactly when the network stack does its
4765          * put_page(), but release our reference on these pages right here.
4766          */
4767         i = drbd_free_peer_reqs(device, &device->net_ee);
4768         if (i)
4769                 drbd_info(device, "net_ee not empty, killed %u entries\n", i);
4770         i = atomic_read(&device->pp_in_use_by_net);
4771         if (i)
4772                 drbd_info(device, "pp_in_use_by_net = %d, expected 0\n", i);
4773         i = atomic_read(&device->pp_in_use);
4774         if (i)
4775                 drbd_info(device, "pp_in_use = %d, expected 0\n", i);
4776
4777         D_ASSERT(device, list_empty(&device->read_ee));
4778         D_ASSERT(device, list_empty(&device->active_ee));
4779         D_ASSERT(device, list_empty(&device->sync_ee));
4780         D_ASSERT(device, list_empty(&device->done_ee));
4781
4782         return 0;
4783 }
4784
4785 /*
4786  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
4787  * we can agree on is stored in agreed_pro_version.
4788  *
4789  * feature flags and the reserved array should be enough room for future
4790  * enhancements of the handshake protocol, and possible plugins...
4791  *
4792  * for now, they are expected to be zero, but ignored.
4793  */
4794 static int drbd_send_features(struct drbd_connection *connection)
4795 {
4796         struct drbd_socket *sock;
4797         struct p_connection_features *p;
4798
4799         sock = &connection->data;
4800         p = conn_prepare_command(connection, sock);
4801         if (!p)
4802                 return -EIO;
4803         memset(p, 0, sizeof(*p));
4804         p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
4805         p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
4806         p->feature_flags = cpu_to_be32(PRO_FEATURES);
4807         return conn_send_command(connection, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
4808 }
4809
4810 /*
4811  * return values:
4812  *   1 yes, we have a valid connection
4813  *   0 oops, did not work out, please try again
4814  *  -1 peer talks different language,
4815  *     no point in trying again, please go standalone.
4816  */
4817 static int drbd_do_features(struct drbd_connection *connection)
4818 {
4819         /* ASSERT current == connection->receiver ... */
4820         struct p_connection_features *p;
4821         const int expect = sizeof(struct p_connection_features);
4822         struct packet_info pi;
4823         int err;
4824
4825         err = drbd_send_features(connection);
4826         if (err)
4827                 return 0;
4828
4829         err = drbd_recv_header(connection, &pi);
4830         if (err)
4831                 return 0;
4832
4833         if (pi.cmd != P_CONNECTION_FEATURES) {
4834                 drbd_err(connection, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
4835                          cmdname(pi.cmd), pi.cmd);
4836                 return -1;
4837         }
4838
4839         if (pi.size != expect) {
4840                 drbd_err(connection, "expected ConnectionFeatures length: %u, received: %u\n",
4841                      expect, pi.size);
4842                 return -1;
4843         }
4844
4845         p = pi.data;
4846         err = drbd_recv_all_warn(connection, p, expect);
4847         if (err)
4848                 return 0;
4849
4850         p->protocol_min = be32_to_cpu(p->protocol_min);
4851         p->protocol_max = be32_to_cpu(p->protocol_max);
4852         if (p->protocol_max == 0)
4853                 p->protocol_max = p->protocol_min;
4854
4855         if (PRO_VERSION_MAX < p->protocol_min ||
4856             PRO_VERSION_MIN > p->protocol_max)
4857                 goto incompat;
4858
4859         connection->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
4860         connection->agreed_features = PRO_FEATURES & be32_to_cpu(p->feature_flags);
4861
4862         drbd_info(connection, "Handshake successful: "
4863              "Agreed network protocol version %d\n", connection->agreed_pro_version);
4864
4865         drbd_info(connection, "Agreed to%ssupport TRIM on protocol level\n",
4866                   connection->agreed_features & FF_TRIM ? " " : " not ");
4867
4868         return 1;
4869
4870  incompat:
4871         drbd_err(connection, "incompatible DRBD dialects: "
4872             "I support %d-%d, peer supports %d-%d\n",
4873             PRO_VERSION_MIN, PRO_VERSION_MAX,
4874             p->protocol_min, p->protocol_max);
4875         return -1;
4876 }
4877
4878 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4879 static int drbd_do_auth(struct drbd_connection *connection)
4880 {
4881         drbd_err(connection, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4882         drbd_err(connection, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
4883         return -1;
4884 }
4885 #else
4886 #define CHALLENGE_LEN 64
4887
4888 /* Return value:
4889         1 - auth succeeded,
4890         0 - failed, try again (network error),
4891         -1 - auth failed, don't try again.
4892 */
4893
4894 static int drbd_do_auth(struct drbd_connection *connection)
4895 {
4896         struct drbd_socket *sock;
4897         char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
4898         struct scatterlist sg;
4899         char *response = NULL;
4900         char *right_response = NULL;
4901         char *peers_ch = NULL;
4902         unsigned int key_len;
4903         char secret[SHARED_SECRET_MAX]; /* 64 byte */
4904         unsigned int resp_size;
4905         struct hash_desc desc;
4906         struct packet_info pi;
4907         struct net_conf *nc;
4908         int err, rv;
4909
4910         /* FIXME: Put the challenge/response into the preallocated socket buffer.  */
4911
4912         rcu_read_lock();
4913         nc = rcu_dereference(connection->net_conf);
4914         key_len = strlen(nc->shared_secret);
4915         memcpy(secret, nc->shared_secret, key_len);
4916         rcu_read_unlock();
4917
4918         desc.tfm = connection->cram_hmac_tfm;
4919         desc.flags = 0;
4920
4921         rv = crypto_hash_setkey(connection->cram_hmac_tfm, (u8 *)secret, key_len);
4922         if (rv) {
4923                 drbd_err(connection, "crypto_hash_setkey() failed with %d\n", rv);
4924                 rv = -1;
4925                 goto fail;
4926         }
4927
4928         get_random_bytes(my_challenge, CHALLENGE_LEN);
4929
4930         sock = &connection->data;
4931         if (!conn_prepare_command(connection, sock)) {
4932                 rv = 0;
4933                 goto fail;
4934         }
4935         rv = !conn_send_command(connection, sock, P_AUTH_CHALLENGE, 0,
4936                                 my_challenge, CHALLENGE_LEN);
4937         if (!rv)
4938                 goto fail;
4939
4940         err = drbd_recv_header(connection, &pi);
4941         if (err) {
4942                 rv = 0;
4943                 goto fail;
4944         }
4945
4946         if (pi.cmd != P_AUTH_CHALLENGE) {
4947                 drbd_err(connection, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4948                          cmdname(pi.cmd), pi.cmd);
4949                 rv = 0;
4950                 goto fail;
4951         }
4952
4953         if (pi.size > CHALLENGE_LEN * 2) {
4954                 drbd_err(connection, "expected AuthChallenge payload too big.\n");
4955                 rv = -1;
4956                 goto fail;
4957         }
4958
4959         if (pi.size < CHALLENGE_LEN) {
4960                 drbd_err(connection, "AuthChallenge payload too small.\n");
4961                 rv = -1;
4962                 goto fail;
4963         }
4964
4965         peers_ch = kmalloc(pi.size, GFP_NOIO);
4966         if (peers_ch == NULL) {
4967                 drbd_err(connection, "kmalloc of peers_ch failed\n");
4968                 rv = -1;
4969                 goto fail;
4970         }
4971
4972         err = drbd_recv_all_warn(connection, peers_ch, pi.size);
4973         if (err) {
4974                 rv = 0;
4975                 goto fail;
4976         }
4977
4978         if (!memcmp(my_challenge, peers_ch, CHALLENGE_LEN)) {
4979                 drbd_err(connection, "Peer presented the same challenge!\n");
4980                 rv = -1;
4981                 goto fail;
4982         }
4983
4984         resp_size = crypto_hash_digestsize(connection->cram_hmac_tfm);
4985         response = kmalloc(resp_size, GFP_NOIO);
4986         if (response == NULL) {
4987                 drbd_err(connection, "kmalloc of response failed\n");
4988                 rv = -1;
4989                 goto fail;
4990         }
4991
4992         sg_init_table(&sg, 1);
4993         sg_set_buf(&sg, peers_ch, pi.size);
4994
4995         rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4996         if (rv) {
4997                 drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
4998                 rv = -1;
4999                 goto fail;
5000         }
5001
5002         if (!conn_prepare_command(connection, sock)) {
5003                 rv = 0;
5004                 goto fail;
5005         }
5006         rv = !conn_send_command(connection, sock, P_AUTH_RESPONSE, 0,
5007                                 response, resp_size);
5008         if (!rv)
5009                 goto fail;
5010
5011         err = drbd_recv_header(connection, &pi);
5012         if (err) {
5013                 rv = 0;
5014                 goto fail;
5015         }
5016
5017         if (pi.cmd != P_AUTH_RESPONSE) {
5018                 drbd_err(connection, "expected AuthResponse packet, received: %s (0x%04x)\n",
5019                          cmdname(pi.cmd), pi.cmd);
5020                 rv = 0;
5021                 goto fail;
5022         }
5023
5024         if (pi.size != resp_size) {
5025                 drbd_err(connection, "expected AuthResponse payload of wrong size\n");
5026                 rv = 0;
5027                 goto fail;
5028         }
5029
5030         err = drbd_recv_all_warn(connection, response , resp_size);
5031         if (err) {
5032                 rv = 0;
5033                 goto fail;
5034         }
5035
5036         right_response = kmalloc(resp_size, GFP_NOIO);
5037         if (right_response == NULL) {
5038                 drbd_err(connection, "kmalloc of right_response failed\n");
5039                 rv = -1;
5040                 goto fail;
5041         }
5042
5043         sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
5044
5045         rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
5046         if (rv) {
5047                 drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
5048                 rv = -1;
5049                 goto fail;
5050         }
5051
5052         rv = !memcmp(response, right_response, resp_size);
5053
5054         if (rv)
5055                 drbd_info(connection, "Peer authenticated using %d bytes HMAC\n",
5056                      resp_size);
5057         else
5058                 rv = -1;
5059
5060  fail:
5061         kfree(peers_ch);
5062         kfree(response);
5063         kfree(right_response);
5064
5065         return rv;
5066 }
5067 #endif
5068
5069 int drbd_receiver(struct drbd_thread *thi)
5070 {
5071         struct drbd_connection *connection = thi->connection;
5072         int h;
5073
5074         drbd_info(connection, "receiver (re)started\n");
5075
5076         do {
5077                 h = conn_connect(connection);
5078                 if (h == 0) {
5079                         conn_disconnect(connection);
5080                         schedule_timeout_interruptible(HZ);
5081                 }
5082                 if (h == -1) {
5083                         drbd_warn(connection, "Discarding network configuration.\n");
5084                         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
5085                 }
5086         } while (h == 0);
5087
5088         if (h > 0)
5089                 drbdd(connection);
5090
5091         conn_disconnect(connection);
5092
5093         drbd_info(connection, "receiver terminated\n");
5094         return 0;
5095 }
5096
5097 /* ********* acknowledge sender ******** */
5098
5099 static int got_conn_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
5100 {
5101         struct p_req_state_reply *p = pi->data;
5102         int retcode = be32_to_cpu(p->retcode);
5103
5104         if (retcode >= SS_SUCCESS) {
5105                 set_bit(CONN_WD_ST_CHG_OKAY, &connection->flags);
5106         } else {
5107                 set_bit(CONN_WD_ST_CHG_FAIL, &connection->flags);
5108                 drbd_err(connection, "Requested state change failed by peer: %s (%d)\n",
5109                          drbd_set_st_err_str(retcode), retcode);
5110         }
5111         wake_up(&connection->ping_wait);
5112
5113         return 0;
5114 }
5115
5116 static int got_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
5117 {
5118         struct drbd_peer_device *peer_device;
5119         struct drbd_device *device;
5120         struct p_req_state_reply *p = pi->data;
5121         int retcode = be32_to_cpu(p->retcode);
5122
5123         peer_device = conn_peer_device(connection, pi->vnr);
5124         if (!peer_device)
5125                 return -EIO;
5126         device = peer_device->device;
5127
5128         if (test_bit(CONN_WD_ST_CHG_REQ, &connection->flags)) {
5129                 D_ASSERT(device, connection->agreed_pro_version < 100);
5130                 return got_conn_RqSReply(connection, pi);
5131         }
5132
5133         if (retcode >= SS_SUCCESS) {
5134                 set_bit(CL_ST_CHG_SUCCESS, &device->flags);
5135         } else {
5136                 set_bit(CL_ST_CHG_FAIL, &device->flags);
5137                 drbd_err(device, "Requested state change failed by peer: %s (%d)\n",
5138                         drbd_set_st_err_str(retcode), retcode);
5139         }
5140         wake_up(&device->state_wait);
5141
5142         return 0;
5143 }
5144
5145 static int got_Ping(struct drbd_connection *connection, struct packet_info *pi)
5146 {
5147         return drbd_send_ping_ack(connection);
5148
5149 }
5150
5151 static int got_PingAck(struct drbd_connection *connection, struct packet_info *pi)
5152 {
5153         /* restore idle timeout */
5154         connection->meta.socket->sk->sk_rcvtimeo = connection->net_conf->ping_int*HZ;
5155         if (!test_and_set_bit(GOT_PING_ACK, &connection->flags))
5156                 wake_up(&connection->ping_wait);
5157
5158         return 0;
5159 }
5160
5161 static int got_IsInSync(struct drbd_connection *connection, struct packet_info *pi)
5162 {
5163         struct drbd_peer_device *peer_device;
5164         struct drbd_device *device;
5165         struct p_block_ack *p = pi->data;
5166         sector_t sector = be64_to_cpu(p->sector);
5167         int blksize = be32_to_cpu(p->blksize);
5168
5169         peer_device = conn_peer_device(connection, pi->vnr);
5170         if (!peer_device)
5171                 return -EIO;
5172         device = peer_device->device;
5173
5174         D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
5175
5176         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5177
5178         if (get_ldev(device)) {
5179                 drbd_rs_complete_io(device, sector);
5180                 drbd_set_in_sync(device, sector, blksize);
5181                 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
5182                 device->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
5183                 put_ldev(device);
5184         }
5185         dec_rs_pending(device);
5186         atomic_add(blksize >> 9, &device->rs_sect_in);
5187
5188         return 0;
5189 }
5190
5191 static int
5192 validate_req_change_req_state(struct drbd_device *device, u64 id, sector_t sector,
5193                               struct rb_root *root, const char *func,
5194                               enum drbd_req_event what, bool missing_ok)
5195 {
5196         struct drbd_request *req;
5197         struct bio_and_error m;
5198
5199         spin_lock_irq(&device->resource->req_lock);
5200         req = find_request(device, root, id, sector, missing_ok, func);
5201         if (unlikely(!req)) {
5202                 spin_unlock_irq(&device->resource->req_lock);
5203                 return -EIO;
5204         }
5205         __req_mod(req, what, &m);
5206         spin_unlock_irq(&device->resource->req_lock);
5207
5208         if (m.bio)
5209                 complete_master_bio(device, &m);
5210         return 0;
5211 }
5212
5213 static int got_BlockAck(struct drbd_connection *connection, struct packet_info *pi)
5214 {
5215         struct drbd_peer_device *peer_device;
5216         struct drbd_device *device;
5217         struct p_block_ack *p = pi->data;
5218         sector_t sector = be64_to_cpu(p->sector);
5219         int blksize = be32_to_cpu(p->blksize);
5220         enum drbd_req_event what;
5221
5222         peer_device = conn_peer_device(connection, pi->vnr);
5223         if (!peer_device)
5224                 return -EIO;
5225         device = peer_device->device;
5226
5227         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5228
5229         if (p->block_id == ID_SYNCER) {
5230                 drbd_set_in_sync(device, sector, blksize);
5231                 dec_rs_pending(device);
5232                 return 0;
5233         }
5234         switch (pi->cmd) {
5235         case P_RS_WRITE_ACK:
5236                 what = WRITE_ACKED_BY_PEER_AND_SIS;
5237                 break;
5238         case P_WRITE_ACK:
5239                 what = WRITE_ACKED_BY_PEER;
5240                 break;
5241         case P_RECV_ACK:
5242                 what = RECV_ACKED_BY_PEER;
5243                 break;
5244         case P_SUPERSEDED:
5245                 what = CONFLICT_RESOLVED;
5246                 break;
5247         case P_RETRY_WRITE:
5248                 what = POSTPONE_WRITE;
5249                 break;
5250         default:
5251                 BUG();
5252         }
5253
5254         return validate_req_change_req_state(device, p->block_id, sector,
5255                                              &device->write_requests, __func__,
5256                                              what, false);
5257 }
5258
5259 static int got_NegAck(struct drbd_connection *connection, struct packet_info *pi)
5260 {
5261         struct drbd_peer_device *peer_device;
5262         struct drbd_device *device;
5263         struct p_block_ack *p = pi->data;
5264         sector_t sector = be64_to_cpu(p->sector);
5265         int size = be32_to_cpu(p->blksize);
5266         int err;
5267
5268         peer_device = conn_peer_device(connection, pi->vnr);
5269         if (!peer_device)
5270                 return -EIO;
5271         device = peer_device->device;
5272
5273         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5274
5275         if (p->block_id == ID_SYNCER) {
5276                 dec_rs_pending(device);
5277                 drbd_rs_failed_io(device, sector, size);
5278                 return 0;
5279         }
5280
5281         err = validate_req_change_req_state(device, p->block_id, sector,
5282                                             &device->write_requests, __func__,
5283                                             NEG_ACKED, true);
5284         if (err) {
5285                 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
5286                    The master bio might already be completed, therefore the
5287                    request is no longer in the collision hash. */
5288                 /* In Protocol B we might already have got a P_RECV_ACK
5289                    but then get a P_NEG_ACK afterwards. */
5290                 drbd_set_out_of_sync(device, sector, size);
5291         }
5292         return 0;
5293 }
5294
5295 static int got_NegDReply(struct drbd_connection *connection, struct packet_info *pi)
5296 {
5297         struct drbd_peer_device *peer_device;
5298         struct drbd_device *device;
5299         struct p_block_ack *p = pi->data;
5300         sector_t sector = be64_to_cpu(p->sector);
5301
5302         peer_device = conn_peer_device(connection, pi->vnr);
5303         if (!peer_device)
5304                 return -EIO;
5305         device = peer_device->device;
5306
5307         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5308
5309         drbd_err(device, "Got NegDReply; Sector %llus, len %u.\n",
5310             (unsigned long long)sector, be32_to_cpu(p->blksize));
5311
5312         return validate_req_change_req_state(device, p->block_id, sector,
5313                                              &device->read_requests, __func__,
5314                                              NEG_ACKED, false);
5315 }
5316
5317 static int got_NegRSDReply(struct drbd_connection *connection, struct packet_info *pi)
5318 {
5319         struct drbd_peer_device *peer_device;
5320         struct drbd_device *device;
5321         sector_t sector;
5322         int size;
5323         struct p_block_ack *p = pi->data;
5324
5325         peer_device = conn_peer_device(connection, pi->vnr);
5326         if (!peer_device)
5327                 return -EIO;
5328         device = peer_device->device;
5329
5330         sector = be64_to_cpu(p->sector);
5331         size = be32_to_cpu(p->blksize);
5332
5333         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5334
5335         dec_rs_pending(device);
5336
5337         if (get_ldev_if_state(device, D_FAILED)) {
5338                 drbd_rs_complete_io(device, sector);
5339                 switch (pi->cmd) {
5340                 case P_NEG_RS_DREPLY:
5341                         drbd_rs_failed_io(device, sector, size);
5342                 case P_RS_CANCEL:
5343                         break;
5344                 default:
5345                         BUG();
5346                 }
5347                 put_ldev(device);
5348         }
5349
5350         return 0;
5351 }
5352
5353 static int got_BarrierAck(struct drbd_connection *connection, struct packet_info *pi)
5354 {
5355         struct p_barrier_ack *p = pi->data;
5356         struct drbd_peer_device *peer_device;
5357         int vnr;
5358
5359         tl_release(connection, p->barrier, be32_to_cpu(p->set_size));
5360
5361         rcu_read_lock();
5362         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5363                 struct drbd_device *device = peer_device->device;
5364
5365                 if (device->state.conn == C_AHEAD &&
5366                     atomic_read(&device->ap_in_flight) == 0 &&
5367                     !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &device->flags)) {
5368                         device->start_resync_timer.expires = jiffies + HZ;
5369                         add_timer(&device->start_resync_timer);
5370                 }
5371         }
5372         rcu_read_unlock();
5373
5374         return 0;
5375 }
5376
5377 static int got_OVResult(struct drbd_connection *connection, struct packet_info *pi)
5378 {
5379         struct drbd_peer_device *peer_device;
5380         struct drbd_device *device;
5381         struct p_block_ack *p = pi->data;
5382         struct drbd_device_work *dw;
5383         sector_t sector;
5384         int size;
5385
5386         peer_device = conn_peer_device(connection, pi->vnr);
5387         if (!peer_device)
5388                 return -EIO;
5389         device = peer_device->device;
5390
5391         sector = be64_to_cpu(p->sector);
5392         size = be32_to_cpu(p->blksize);
5393
5394         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5395
5396         if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
5397                 drbd_ov_out_of_sync_found(device, sector, size);
5398         else
5399                 ov_out_of_sync_print(device);
5400
5401         if (!get_ldev(device))
5402                 return 0;
5403
5404         drbd_rs_complete_io(device, sector);
5405         dec_rs_pending(device);
5406
5407         --device->ov_left;
5408
5409         /* let's advance progress step marks only for every other megabyte */
5410         if ((device->ov_left & 0x200) == 0x200)
5411                 drbd_advance_rs_marks(device, device->ov_left);
5412
5413         if (device->ov_left == 0) {
5414                 dw = kmalloc(sizeof(*dw), GFP_NOIO);
5415                 if (dw) {
5416                         dw->w.cb = w_ov_finished;
5417                         dw->device = device;
5418                         drbd_queue_work(&peer_device->connection->sender_work, &dw->w);
5419                 } else {
5420                         drbd_err(device, "kmalloc(dw) failed.");
5421                         ov_out_of_sync_print(device);
5422                         drbd_resync_finished(device);
5423                 }
5424         }
5425         put_ldev(device);
5426         return 0;
5427 }
5428
5429 static int got_skip(struct drbd_connection *connection, struct packet_info *pi)
5430 {
5431         return 0;
5432 }
5433
5434 static int connection_finish_peer_reqs(struct drbd_connection *connection)
5435 {
5436         struct drbd_peer_device *peer_device;
5437         int vnr, not_empty = 0;
5438
5439         do {
5440                 clear_bit(SIGNAL_ASENDER, &connection->flags);
5441                 flush_signals(current);
5442
5443                 rcu_read_lock();
5444                 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5445                         struct drbd_device *device = peer_device->device;
5446                         kref_get(&device->kref);
5447                         rcu_read_unlock();
5448                         if (drbd_finish_peer_reqs(device)) {
5449                                 kref_put(&device->kref, drbd_destroy_device);
5450                                 return 1;
5451                         }
5452                         kref_put(&device->kref, drbd_destroy_device);
5453                         rcu_read_lock();
5454                 }
5455                 set_bit(SIGNAL_ASENDER, &connection->flags);
5456
5457                 spin_lock_irq(&connection->resource->req_lock);
5458                 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5459                         struct drbd_device *device = peer_device->device;
5460                         not_empty = !list_empty(&device->done_ee);
5461                         if (not_empty)
5462                                 break;
5463                 }
5464                 spin_unlock_irq(&connection->resource->req_lock);
5465                 rcu_read_unlock();
5466         } while (not_empty);
5467
5468         return 0;
5469 }
5470
5471 struct asender_cmd {
5472         size_t pkt_size;
5473         int (*fn)(struct drbd_connection *connection, struct packet_info *);
5474 };
5475
5476 static struct asender_cmd asender_tbl[] = {
5477         [P_PING]            = { 0, got_Ping },
5478         [P_PING_ACK]        = { 0, got_PingAck },
5479         [P_RECV_ACK]        = { sizeof(struct p_block_ack), got_BlockAck },
5480         [P_WRITE_ACK]       = { sizeof(struct p_block_ack), got_BlockAck },
5481         [P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
5482         [P_SUPERSEDED]   = { sizeof(struct p_block_ack), got_BlockAck },
5483         [P_NEG_ACK]         = { sizeof(struct p_block_ack), got_NegAck },
5484         [P_NEG_DREPLY]      = { sizeof(struct p_block_ack), got_NegDReply },
5485         [P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply },
5486         [P_OV_RESULT]       = { sizeof(struct p_block_ack), got_OVResult },
5487         [P_BARRIER_ACK]     = { sizeof(struct p_barrier_ack), got_BarrierAck },
5488         [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
5489         [P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
5490         [P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
5491         [P_RS_CANCEL]       = { sizeof(struct p_block_ack), got_NegRSDReply },
5492         [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
5493         [P_RETRY_WRITE]     = { sizeof(struct p_block_ack), got_BlockAck },
5494 };
5495
5496 int drbd_asender(struct drbd_thread *thi)
5497 {
5498         struct drbd_connection *connection = thi->connection;
5499         struct asender_cmd *cmd = NULL;
5500         struct packet_info pi;
5501         int rv;
5502         void *buf    = connection->meta.rbuf;
5503         int received = 0;
5504         unsigned int header_size = drbd_header_size(connection);
5505         int expect   = header_size;
5506         bool ping_timeout_active = false;
5507         struct net_conf *nc;
5508         int ping_timeo, tcp_cork, ping_int;
5509         struct sched_param param = { .sched_priority = 2 };
5510
5511         rv = sched_setscheduler(current, SCHED_RR, &param);
5512         if (rv < 0)
5513                 drbd_err(connection, "drbd_asender: ERROR set priority, ret=%d\n", rv);
5514
5515         while (get_t_state(thi) == RUNNING) {
5516                 drbd_thread_current_set_cpu(thi);
5517
5518                 rcu_read_lock();
5519                 nc = rcu_dereference(connection->net_conf);
5520                 ping_timeo = nc->ping_timeo;
5521                 tcp_cork = nc->tcp_cork;
5522                 ping_int = nc->ping_int;
5523                 rcu_read_unlock();
5524
5525                 if (test_and_clear_bit(SEND_PING, &connection->flags)) {
5526                         if (drbd_send_ping(connection)) {
5527                                 drbd_err(connection, "drbd_send_ping has failed\n");
5528                                 goto reconnect;
5529                         }
5530                         connection->meta.socket->sk->sk_rcvtimeo = ping_timeo * HZ / 10;
5531                         ping_timeout_active = true;
5532                 }
5533
5534                 /* TODO: conditionally cork; it may hurt latency if we cork without
5535                    much to send */
5536                 if (tcp_cork)
5537                         drbd_tcp_cork(connection->meta.socket);
5538                 if (connection_finish_peer_reqs(connection)) {
5539                         drbd_err(connection, "connection_finish_peer_reqs() failed\n");
5540                         goto reconnect;
5541                 }
5542                 /* but unconditionally uncork unless disabled */
5543                 if (tcp_cork)
5544                         drbd_tcp_uncork(connection->meta.socket);
5545
5546                 /* short circuit, recv_msg would return EINTR anyways. */
5547                 if (signal_pending(current))
5548                         continue;
5549
5550                 rv = drbd_recv_short(connection->meta.socket, buf, expect-received, 0);
5551                 clear_bit(SIGNAL_ASENDER, &connection->flags);
5552
5553                 flush_signals(current);
5554
5555                 /* Note:
5556                  * -EINTR        (on meta) we got a signal
5557                  * -EAGAIN       (on meta) rcvtimeo expired
5558                  * -ECONNRESET   other side closed the connection
5559                  * -ERESTARTSYS  (on data) we got a signal
5560                  * rv <  0       other than above: unexpected error!
5561                  * rv == expected: full header or command
5562                  * rv <  expected: "woken" by signal during receive
5563                  * rv == 0       : "connection shut down by peer"
5564                  */
5565 received_more:
5566                 if (likely(rv > 0)) {
5567                         received += rv;
5568                         buf      += rv;
5569                 } else if (rv == 0) {
5570                         if (test_bit(DISCONNECT_SENT, &connection->flags)) {
5571                                 long t;
5572                                 rcu_read_lock();
5573                                 t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
5574                                 rcu_read_unlock();
5575
5576                                 t = wait_event_timeout(connection->ping_wait,
5577                                                        connection->cstate < C_WF_REPORT_PARAMS,
5578                                                        t);
5579                                 if (t)
5580                                         break;
5581                         }
5582                         drbd_err(connection, "meta connection shut down by peer.\n");
5583                         goto reconnect;
5584                 } else if (rv == -EAGAIN) {
5585                         /* If the data socket received something meanwhile,
5586                          * that is good enough: peer is still alive. */
5587                         if (time_after(connection->last_received,
5588                                 jiffies - connection->meta.socket->sk->sk_rcvtimeo))
5589                                 continue;
5590                         if (ping_timeout_active) {
5591                                 drbd_err(connection, "PingAck did not arrive in time.\n");
5592                                 goto reconnect;
5593                         }
5594                         set_bit(SEND_PING, &connection->flags);
5595                         continue;
5596                 } else if (rv == -EINTR) {
5597                         continue;
5598                 } else {
5599                         drbd_err(connection, "sock_recvmsg returned %d\n", rv);
5600                         goto reconnect;
5601                 }
5602
5603                 if (received == expect && cmd == NULL) {
5604                         if (decode_header(connection, connection->meta.rbuf, &pi))
5605                                 goto reconnect;
5606                         cmd = &asender_tbl[pi.cmd];
5607                         if (pi.cmd >= ARRAY_SIZE(asender_tbl) || !cmd->fn) {
5608                                 drbd_err(connection, "Unexpected meta packet %s (0x%04x)\n",
5609                                          cmdname(pi.cmd), pi.cmd);
5610                                 goto disconnect;
5611                         }
5612                         expect = header_size + cmd->pkt_size;
5613                         if (pi.size != expect - header_size) {
5614                                 drbd_err(connection, "Wrong packet size on meta (c: %d, l: %d)\n",
5615                                         pi.cmd, pi.size);
5616                                 goto reconnect;
5617                         }
5618                 }
5619                 if (received == expect) {
5620                         bool err;
5621
5622                         err = cmd->fn(connection, &pi);
5623                         if (err) {
5624                                 drbd_err(connection, "%pf failed\n", cmd->fn);
5625                                 goto reconnect;
5626                         }
5627
5628                         connection->last_received = jiffies;
5629
5630                         if (cmd == &asender_tbl[P_PING_ACK]) {
5631                                 /* restore idle timeout */
5632                                 connection->meta.socket->sk->sk_rcvtimeo = ping_int * HZ;
5633                                 ping_timeout_active = false;
5634                         }
5635
5636                         buf      = connection->meta.rbuf;
5637                         received = 0;
5638                         expect   = header_size;
5639                         cmd      = NULL;
5640                 }
5641                 if (test_bit(SEND_PING, &connection->flags))
5642                         continue;
5643                 rv = drbd_recv_short(connection->meta.socket, buf, expect-received, MSG_DONTWAIT);
5644                 if (rv > 0)
5645                         goto received_more;
5646         }
5647
5648         if (0) {
5649 reconnect:
5650                 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5651                 conn_md_sync(connection);
5652         }
5653         if (0) {
5654 disconnect:
5655                 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
5656         }
5657         clear_bit(SIGNAL_ASENDER, &connection->flags);
5658
5659         drbd_info(connection, "asender terminated\n");
5660
5661         return 0;
5662 }