These changes are the raw update to linux-4.4.6-rt14. Kernel sources
[kvmfornfv.git] / kernel / net / tipc / socket.c
1 /*
2  * net/tipc/socket.c: TIPC socket API
3  *
4  * Copyright (c) 2001-2007, 2012-2015, Ericsson AB
5  * Copyright (c) 2004-2008, 2010-2013, Wind River Systems
6  * All rights reserved.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in the
15  *    documentation and/or other materials provided with the distribution.
16  * 3. Neither the names of the copyright holders nor the names of its
17  *    contributors may be used to endorse or promote products derived from
18  *    this software without specific prior written permission.
19  *
20  * Alternatively, this software may be distributed under the terms of the
21  * GNU General Public License ("GPL") version 2 as published by the Free
22  * Software Foundation.
23  *
24  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
25  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
26  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
27  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
28  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
29  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
30  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
31  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
32  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
33  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
34  * POSSIBILITY OF SUCH DAMAGE.
35  */
36
37 #include <linux/rhashtable.h>
38 #include "core.h"
39 #include "name_table.h"
40 #include "node.h"
41 #include "link.h"
42 #include "name_distr.h"
43 #include "socket.h"
44 #include "bcast.h"
45
46 #define SS_LISTENING            -1      /* socket is listening */
47 #define SS_READY                -2      /* socket is connectionless */
48
49 #define CONN_TIMEOUT_DEFAULT    8000    /* default connect timeout = 8s */
50 #define CONN_PROBING_INTERVAL   msecs_to_jiffies(3600000)  /* [ms] => 1 h */
51 #define TIPC_FWD_MSG            1
52 #define TIPC_CONN_OK            0
53 #define TIPC_CONN_PROBING       1
54 #define TIPC_MAX_PORT           0xffffffff
55 #define TIPC_MIN_PORT           1
56
57 /**
58  * struct tipc_sock - TIPC socket structure
59  * @sk: socket - interacts with 'port' and with user via the socket API
60  * @connected: non-zero if port is currently connected to a peer port
61  * @conn_type: TIPC type used when connection was established
62  * @conn_instance: TIPC instance used when connection was established
63  * @published: non-zero if port has one or more associated names
64  * @max_pkt: maximum packet size "hint" used when building messages sent by port
65  * @portid: unique port identity in TIPC socket hash table
66  * @phdr: preformatted message header used when sending messages
67  * @port_list: adjacent ports in TIPC's global list of ports
68  * @publications: list of publications for port
69  * @pub_count: total # of publications port has made during its lifetime
70  * @probing_state:
71  * @probing_intv:
72  * @conn_timeout: the time we can wait for an unresponded setup request
73  * @dupl_rcvcnt: number of bytes counted twice, in both backlog and rcv queue
74  * @link_cong: non-zero if owner must sleep because of link congestion
75  * @sent_unacked: # messages sent by socket, and not yet acked by peer
76  * @rcv_unacked: # messages read by user, but not yet acked back to peer
77  * @remote: 'connected' peer for dgram/rdm
78  * @node: hash table node
79  * @rcu: rcu struct for tipc_sock
80  */
81 struct tipc_sock {
82         struct sock sk;
83         int connected;
84         u32 conn_type;
85         u32 conn_instance;
86         int published;
87         u32 max_pkt;
88         u32 portid;
89         struct tipc_msg phdr;
90         struct list_head sock_list;
91         struct list_head publications;
92         u32 pub_count;
93         u32 probing_state;
94         unsigned long probing_intv;
95         uint conn_timeout;
96         atomic_t dupl_rcvcnt;
97         bool link_cong;
98         uint sent_unacked;
99         uint rcv_unacked;
100         struct sockaddr_tipc remote;
101         struct rhash_head node;
102         struct rcu_head rcu;
103 };
104
105 static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb);
106 static void tipc_data_ready(struct sock *sk);
107 static void tipc_write_space(struct sock *sk);
108 static void tipc_sock_destruct(struct sock *sk);
109 static int tipc_release(struct socket *sock);
110 static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags);
111 static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p);
112 static void tipc_sk_timeout(unsigned long data);
113 static int tipc_sk_publish(struct tipc_sock *tsk, uint scope,
114                            struct tipc_name_seq const *seq);
115 static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope,
116                             struct tipc_name_seq const *seq);
117 static struct tipc_sock *tipc_sk_lookup(struct net *net, u32 portid);
118 static int tipc_sk_insert(struct tipc_sock *tsk);
119 static void tipc_sk_remove(struct tipc_sock *tsk);
120 static int __tipc_send_stream(struct socket *sock, struct msghdr *m,
121                               size_t dsz);
122 static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz);
123
124 static const struct proto_ops packet_ops;
125 static const struct proto_ops stream_ops;
126 static const struct proto_ops msg_ops;
127 static struct proto tipc_proto;
128
129 static const struct nla_policy tipc_nl_sock_policy[TIPC_NLA_SOCK_MAX + 1] = {
130         [TIPC_NLA_SOCK_UNSPEC]          = { .type = NLA_UNSPEC },
131         [TIPC_NLA_SOCK_ADDR]            = { .type = NLA_U32 },
132         [TIPC_NLA_SOCK_REF]             = { .type = NLA_U32 },
133         [TIPC_NLA_SOCK_CON]             = { .type = NLA_NESTED },
134         [TIPC_NLA_SOCK_HAS_PUBL]        = { .type = NLA_FLAG }
135 };
136
137 static const struct rhashtable_params tsk_rht_params;
138
139 /*
140  * Revised TIPC socket locking policy:
141  *
142  * Most socket operations take the standard socket lock when they start
143  * and hold it until they finish (or until they need to sleep).  Acquiring
144  * this lock grants the owner exclusive access to the fields of the socket
145  * data structures, with the exception of the backlog queue.  A few socket
146  * operations can be done without taking the socket lock because they only
147  * read socket information that never changes during the life of the socket.
148  *
149  * Socket operations may acquire the lock for the associated TIPC port if they
150  * need to perform an operation on the port.  If any routine needs to acquire
151  * both the socket lock and the port lock it must take the socket lock first
152  * to avoid the risk of deadlock.
153  *
154  * The dispatcher handling incoming messages cannot grab the socket lock in
155  * the standard fashion, since invoked it runs at the BH level and cannot block.
156  * Instead, it checks to see if the socket lock is currently owned by someone,
157  * and either handles the message itself or adds it to the socket's backlog
158  * queue; in the latter case the queued message is processed once the process
159  * owning the socket lock releases it.
160  *
161  * NOTE: Releasing the socket lock while an operation is sleeping overcomes
162  * the problem of a blocked socket operation preventing any other operations
163  * from occurring.  However, applications must be careful if they have
164  * multiple threads trying to send (or receive) on the same socket, as these
165  * operations might interfere with each other.  For example, doing a connect
166  * and a receive at the same time might allow the receive to consume the
167  * ACK message meant for the connect.  While additional work could be done
168  * to try and overcome this, it doesn't seem to be worthwhile at the present.
169  *
170  * NOTE: Releasing the socket lock while an operation is sleeping also ensures
171  * that another operation that must be performed in a non-blocking manner is
172  * not delayed for very long because the lock has already been taken.
173  *
174  * NOTE: This code assumes that certain fields of a port/socket pair are
175  * constant over its lifetime; such fields can be examined without taking
176  * the socket lock and/or port lock, and do not need to be re-read even
177  * after resuming processing after waiting.  These fields include:
178  *   - socket type
179  *   - pointer to socket sk structure (aka tipc_sock structure)
180  *   - pointer to port structure
181  *   - port reference
182  */
183
184 static u32 tsk_own_node(struct tipc_sock *tsk)
185 {
186         return msg_prevnode(&tsk->phdr);
187 }
188
189 static u32 tsk_peer_node(struct tipc_sock *tsk)
190 {
191         return msg_destnode(&tsk->phdr);
192 }
193
194 static u32 tsk_peer_port(struct tipc_sock *tsk)
195 {
196         return msg_destport(&tsk->phdr);
197 }
198
199 static  bool tsk_unreliable(struct tipc_sock *tsk)
200 {
201         return msg_src_droppable(&tsk->phdr) != 0;
202 }
203
204 static void tsk_set_unreliable(struct tipc_sock *tsk, bool unreliable)
205 {
206         msg_set_src_droppable(&tsk->phdr, unreliable ? 1 : 0);
207 }
208
209 static bool tsk_unreturnable(struct tipc_sock *tsk)
210 {
211         return msg_dest_droppable(&tsk->phdr) != 0;
212 }
213
214 static void tsk_set_unreturnable(struct tipc_sock *tsk, bool unreturnable)
215 {
216         msg_set_dest_droppable(&tsk->phdr, unreturnable ? 1 : 0);
217 }
218
219 static int tsk_importance(struct tipc_sock *tsk)
220 {
221         return msg_importance(&tsk->phdr);
222 }
223
224 static int tsk_set_importance(struct tipc_sock *tsk, int imp)
225 {
226         if (imp > TIPC_CRITICAL_IMPORTANCE)
227                 return -EINVAL;
228         msg_set_importance(&tsk->phdr, (u32)imp);
229         return 0;
230 }
231
232 static struct tipc_sock *tipc_sk(const struct sock *sk)
233 {
234         return container_of(sk, struct tipc_sock, sk);
235 }
236
237 static int tsk_conn_cong(struct tipc_sock *tsk)
238 {
239         return tsk->sent_unacked >= TIPC_FLOWCTRL_WIN;
240 }
241
242 /**
243  * tsk_advance_rx_queue - discard first buffer in socket receive queue
244  *
245  * Caller must hold socket lock
246  */
247 static void tsk_advance_rx_queue(struct sock *sk)
248 {
249         kfree_skb(__skb_dequeue(&sk->sk_receive_queue));
250 }
251
252 /* tipc_sk_respond() : send response message back to sender
253  */
254 static void tipc_sk_respond(struct sock *sk, struct sk_buff *skb, int err)
255 {
256         u32 selector;
257         u32 dnode;
258         u32 onode = tipc_own_addr(sock_net(sk));
259
260         if (!tipc_msg_reverse(onode, &skb, err))
261                 return;
262
263         dnode = msg_destnode(buf_msg(skb));
264         selector = msg_origport(buf_msg(skb));
265         tipc_node_xmit_skb(sock_net(sk), skb, dnode, selector);
266 }
267
268 /**
269  * tsk_rej_rx_queue - reject all buffers in socket receive queue
270  *
271  * Caller must hold socket lock
272  */
273 static void tsk_rej_rx_queue(struct sock *sk)
274 {
275         struct sk_buff *skb;
276
277         while ((skb = __skb_dequeue(&sk->sk_receive_queue)))
278                 tipc_sk_respond(sk, skb, TIPC_ERR_NO_PORT);
279 }
280
281 /* tsk_peer_msg - verify if message was sent by connected port's peer
282  *
283  * Handles cases where the node's network address has changed from
284  * the default of <0.0.0> to its configured setting.
285  */
286 static bool tsk_peer_msg(struct tipc_sock *tsk, struct tipc_msg *msg)
287 {
288         struct tipc_net *tn = net_generic(sock_net(&tsk->sk), tipc_net_id);
289         u32 peer_port = tsk_peer_port(tsk);
290         u32 orig_node;
291         u32 peer_node;
292
293         if (unlikely(!tsk->connected))
294                 return false;
295
296         if (unlikely(msg_origport(msg) != peer_port))
297                 return false;
298
299         orig_node = msg_orignode(msg);
300         peer_node = tsk_peer_node(tsk);
301
302         if (likely(orig_node == peer_node))
303                 return true;
304
305         if (!orig_node && (peer_node == tn->own_addr))
306                 return true;
307
308         if (!peer_node && (orig_node == tn->own_addr))
309                 return true;
310
311         return false;
312 }
313
314 /**
315  * tipc_sk_create - create a TIPC socket
316  * @net: network namespace (must be default network)
317  * @sock: pre-allocated socket structure
318  * @protocol: protocol indicator (must be 0)
319  * @kern: caused by kernel or by userspace?
320  *
321  * This routine creates additional data structures used by the TIPC socket,
322  * initializes them, and links them together.
323  *
324  * Returns 0 on success, errno otherwise
325  */
326 static int tipc_sk_create(struct net *net, struct socket *sock,
327                           int protocol, int kern)
328 {
329         struct tipc_net *tn;
330         const struct proto_ops *ops;
331         socket_state state;
332         struct sock *sk;
333         struct tipc_sock *tsk;
334         struct tipc_msg *msg;
335
336         /* Validate arguments */
337         if (unlikely(protocol != 0))
338                 return -EPROTONOSUPPORT;
339
340         switch (sock->type) {
341         case SOCK_STREAM:
342                 ops = &stream_ops;
343                 state = SS_UNCONNECTED;
344                 break;
345         case SOCK_SEQPACKET:
346                 ops = &packet_ops;
347                 state = SS_UNCONNECTED;
348                 break;
349         case SOCK_DGRAM:
350         case SOCK_RDM:
351                 ops = &msg_ops;
352                 state = SS_READY;
353                 break;
354         default:
355                 return -EPROTOTYPE;
356         }
357
358         /* Allocate socket's protocol area */
359         sk = sk_alloc(net, AF_TIPC, GFP_KERNEL, &tipc_proto, kern);
360         if (sk == NULL)
361                 return -ENOMEM;
362
363         tsk = tipc_sk(sk);
364         tsk->max_pkt = MAX_PKT_DEFAULT;
365         INIT_LIST_HEAD(&tsk->publications);
366         msg = &tsk->phdr;
367         tn = net_generic(sock_net(sk), tipc_net_id);
368         tipc_msg_init(tn->own_addr, msg, TIPC_LOW_IMPORTANCE, TIPC_NAMED_MSG,
369                       NAMED_H_SIZE, 0);
370
371         /* Finish initializing socket data structures */
372         sock->ops = ops;
373         sock->state = state;
374         sock_init_data(sock, sk);
375         if (tipc_sk_insert(tsk)) {
376                 pr_warn("Socket create failed; port numbrer exhausted\n");
377                 return -EINVAL;
378         }
379         msg_set_origport(msg, tsk->portid);
380         setup_timer(&sk->sk_timer, tipc_sk_timeout, (unsigned long)tsk);
381         sk->sk_backlog_rcv = tipc_backlog_rcv;
382         sk->sk_rcvbuf = sysctl_tipc_rmem[1];
383         sk->sk_data_ready = tipc_data_ready;
384         sk->sk_write_space = tipc_write_space;
385         sk->sk_destruct = tipc_sock_destruct;
386         tsk->conn_timeout = CONN_TIMEOUT_DEFAULT;
387         tsk->sent_unacked = 0;
388         atomic_set(&tsk->dupl_rcvcnt, 0);
389
390         if (sock->state == SS_READY) {
391                 tsk_set_unreturnable(tsk, true);
392                 if (sock->type == SOCK_DGRAM)
393                         tsk_set_unreliable(tsk, true);
394         }
395         return 0;
396 }
397
398 static void tipc_sk_callback(struct rcu_head *head)
399 {
400         struct tipc_sock *tsk = container_of(head, struct tipc_sock, rcu);
401
402         sock_put(&tsk->sk);
403 }
404
405 /**
406  * tipc_release - destroy a TIPC socket
407  * @sock: socket to destroy
408  *
409  * This routine cleans up any messages that are still queued on the socket.
410  * For DGRAM and RDM socket types, all queued messages are rejected.
411  * For SEQPACKET and STREAM socket types, the first message is rejected
412  * and any others are discarded.  (If the first message on a STREAM socket
413  * is partially-read, it is discarded and the next one is rejected instead.)
414  *
415  * NOTE: Rejected messages are not necessarily returned to the sender!  They
416  * are returned or discarded according to the "destination droppable" setting
417  * specified for the message by the sender.
418  *
419  * Returns 0 on success, errno otherwise
420  */
421 static int tipc_release(struct socket *sock)
422 {
423         struct sock *sk = sock->sk;
424         struct net *net;
425         struct tipc_sock *tsk;
426         struct sk_buff *skb;
427         u32 dnode;
428
429         /*
430          * Exit if socket isn't fully initialized (occurs when a failed accept()
431          * releases a pre-allocated child socket that was never used)
432          */
433         if (sk == NULL)
434                 return 0;
435
436         net = sock_net(sk);
437         tsk = tipc_sk(sk);
438         lock_sock(sk);
439
440         /*
441          * Reject all unreceived messages, except on an active connection
442          * (which disconnects locally & sends a 'FIN+' to peer)
443          */
444         dnode = tsk_peer_node(tsk);
445         while (sock->state != SS_DISCONNECTING) {
446                 skb = __skb_dequeue(&sk->sk_receive_queue);
447                 if (skb == NULL)
448                         break;
449                 if (TIPC_SKB_CB(skb)->handle != NULL)
450                         kfree_skb(skb);
451                 else {
452                         if ((sock->state == SS_CONNECTING) ||
453                             (sock->state == SS_CONNECTED)) {
454                                 sock->state = SS_DISCONNECTING;
455                                 tsk->connected = 0;
456                                 tipc_node_remove_conn(net, dnode, tsk->portid);
457                         }
458                         tipc_sk_respond(sk, skb, TIPC_ERR_NO_PORT);
459                 }
460         }
461
462         tipc_sk_withdraw(tsk, 0, NULL);
463         sk_stop_timer(sk, &sk->sk_timer);
464         tipc_sk_remove(tsk);
465         if (tsk->connected) {
466                 skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE,
467                                       TIPC_CONN_MSG, SHORT_H_SIZE, 0, dnode,
468                                       tsk_own_node(tsk), tsk_peer_port(tsk),
469                                       tsk->portid, TIPC_ERR_NO_PORT);
470                 if (skb)
471                         tipc_node_xmit_skb(net, skb, dnode, tsk->portid);
472                 tipc_node_remove_conn(net, dnode, tsk->portid);
473         }
474
475         /* Reject any messages that accumulated in backlog queue */
476         sock->state = SS_DISCONNECTING;
477         release_sock(sk);
478
479         call_rcu(&tsk->rcu, tipc_sk_callback);
480         sock->sk = NULL;
481
482         return 0;
483 }
484
485 /**
486  * tipc_bind - associate or disassocate TIPC name(s) with a socket
487  * @sock: socket structure
488  * @uaddr: socket address describing name(s) and desired operation
489  * @uaddr_len: size of socket address data structure
490  *
491  * Name and name sequence binding is indicated using a positive scope value;
492  * a negative scope value unbinds the specified name.  Specifying no name
493  * (i.e. a socket address length of 0) unbinds all names from the socket.
494  *
495  * Returns 0 on success, errno otherwise
496  *
497  * NOTE: This routine doesn't need to take the socket lock since it doesn't
498  *       access any non-constant socket information.
499  */
500 static int tipc_bind(struct socket *sock, struct sockaddr *uaddr,
501                      int uaddr_len)
502 {
503         struct sock *sk = sock->sk;
504         struct sockaddr_tipc *addr = (struct sockaddr_tipc *)uaddr;
505         struct tipc_sock *tsk = tipc_sk(sk);
506         int res = -EINVAL;
507
508         lock_sock(sk);
509         if (unlikely(!uaddr_len)) {
510                 res = tipc_sk_withdraw(tsk, 0, NULL);
511                 goto exit;
512         }
513
514         if (uaddr_len < sizeof(struct sockaddr_tipc)) {
515                 res = -EINVAL;
516                 goto exit;
517         }
518         if (addr->family != AF_TIPC) {
519                 res = -EAFNOSUPPORT;
520                 goto exit;
521         }
522
523         if (addr->addrtype == TIPC_ADDR_NAME)
524                 addr->addr.nameseq.upper = addr->addr.nameseq.lower;
525         else if (addr->addrtype != TIPC_ADDR_NAMESEQ) {
526                 res = -EAFNOSUPPORT;
527                 goto exit;
528         }
529
530         if ((addr->addr.nameseq.type < TIPC_RESERVED_TYPES) &&
531             (addr->addr.nameseq.type != TIPC_TOP_SRV) &&
532             (addr->addr.nameseq.type != TIPC_CFG_SRV)) {
533                 res = -EACCES;
534                 goto exit;
535         }
536
537         res = (addr->scope > 0) ?
538                 tipc_sk_publish(tsk, addr->scope, &addr->addr.nameseq) :
539                 tipc_sk_withdraw(tsk, -addr->scope, &addr->addr.nameseq);
540 exit:
541         release_sock(sk);
542         return res;
543 }
544
545 /**
546  * tipc_getname - get port ID of socket or peer socket
547  * @sock: socket structure
548  * @uaddr: area for returned socket address
549  * @uaddr_len: area for returned length of socket address
550  * @peer: 0 = own ID, 1 = current peer ID, 2 = current/former peer ID
551  *
552  * Returns 0 on success, errno otherwise
553  *
554  * NOTE: This routine doesn't need to take the socket lock since it only
555  *       accesses socket information that is unchanging (or which changes in
556  *       a completely predictable manner).
557  */
558 static int tipc_getname(struct socket *sock, struct sockaddr *uaddr,
559                         int *uaddr_len, int peer)
560 {
561         struct sockaddr_tipc *addr = (struct sockaddr_tipc *)uaddr;
562         struct tipc_sock *tsk = tipc_sk(sock->sk);
563         struct tipc_net *tn = net_generic(sock_net(sock->sk), tipc_net_id);
564
565         memset(addr, 0, sizeof(*addr));
566         if (peer) {
567                 if ((sock->state != SS_CONNECTED) &&
568                         ((peer != 2) || (sock->state != SS_DISCONNECTING)))
569                         return -ENOTCONN;
570                 addr->addr.id.ref = tsk_peer_port(tsk);
571                 addr->addr.id.node = tsk_peer_node(tsk);
572         } else {
573                 addr->addr.id.ref = tsk->portid;
574                 addr->addr.id.node = tn->own_addr;
575         }
576
577         *uaddr_len = sizeof(*addr);
578         addr->addrtype = TIPC_ADDR_ID;
579         addr->family = AF_TIPC;
580         addr->scope = 0;
581         addr->addr.name.domain = 0;
582
583         return 0;
584 }
585
586 /**
587  * tipc_poll - read and possibly block on pollmask
588  * @file: file structure associated with the socket
589  * @sock: socket for which to calculate the poll bits
590  * @wait: ???
591  *
592  * Returns pollmask value
593  *
594  * COMMENTARY:
595  * It appears that the usual socket locking mechanisms are not useful here
596  * since the pollmask info is potentially out-of-date the moment this routine
597  * exits.  TCP and other protocols seem to rely on higher level poll routines
598  * to handle any preventable race conditions, so TIPC will do the same ...
599  *
600  * TIPC sets the returned events as follows:
601  *
602  * socket state         flags set
603  * ------------         ---------
604  * unconnected          no read flags
605  *                      POLLOUT if port is not congested
606  *
607  * connecting           POLLIN/POLLRDNORM if ACK/NACK in rx queue
608  *                      no write flags
609  *
610  * connected            POLLIN/POLLRDNORM if data in rx queue
611  *                      POLLOUT if port is not congested
612  *
613  * disconnecting        POLLIN/POLLRDNORM/POLLHUP
614  *                      no write flags
615  *
616  * listening            POLLIN if SYN in rx queue
617  *                      no write flags
618  *
619  * ready                POLLIN/POLLRDNORM if data in rx queue
620  * [connectionless]     POLLOUT (since port cannot be congested)
621  *
622  * IMPORTANT: The fact that a read or write operation is indicated does NOT
623  * imply that the operation will succeed, merely that it should be performed
624  * and will not block.
625  */
626 static unsigned int tipc_poll(struct file *file, struct socket *sock,
627                               poll_table *wait)
628 {
629         struct sock *sk = sock->sk;
630         struct tipc_sock *tsk = tipc_sk(sk);
631         u32 mask = 0;
632
633         sock_poll_wait(file, sk_sleep(sk), wait);
634
635         switch ((int)sock->state) {
636         case SS_UNCONNECTED:
637                 if (!tsk->link_cong)
638                         mask |= POLLOUT;
639                 break;
640         case SS_READY:
641         case SS_CONNECTED:
642                 if (!tsk->link_cong && !tsk_conn_cong(tsk))
643                         mask |= POLLOUT;
644                 /* fall thru' */
645         case SS_CONNECTING:
646         case SS_LISTENING:
647                 if (!skb_queue_empty(&sk->sk_receive_queue))
648                         mask |= (POLLIN | POLLRDNORM);
649                 break;
650         case SS_DISCONNECTING:
651                 mask = (POLLIN | POLLRDNORM | POLLHUP);
652                 break;
653         }
654
655         return mask;
656 }
657
658 /**
659  * tipc_sendmcast - send multicast message
660  * @sock: socket structure
661  * @seq: destination address
662  * @msg: message to send
663  * @dsz: total length of message data
664  * @timeo: timeout to wait for wakeup
665  *
666  * Called from function tipc_sendmsg(), which has done all sanity checks
667  * Returns the number of bytes sent on success, or errno
668  */
669 static int tipc_sendmcast(struct  socket *sock, struct tipc_name_seq *seq,
670                           struct msghdr *msg, size_t dsz, long timeo)
671 {
672         struct sock *sk = sock->sk;
673         struct tipc_sock *tsk = tipc_sk(sk);
674         struct net *net = sock_net(sk);
675         struct tipc_msg *mhdr = &tsk->phdr;
676         struct sk_buff_head *pktchain = &sk->sk_write_queue;
677         struct iov_iter save = msg->msg_iter;
678         uint mtu;
679         int rc;
680
681         msg_set_type(mhdr, TIPC_MCAST_MSG);
682         msg_set_lookup_scope(mhdr, TIPC_CLUSTER_SCOPE);
683         msg_set_destport(mhdr, 0);
684         msg_set_destnode(mhdr, 0);
685         msg_set_nametype(mhdr, seq->type);
686         msg_set_namelower(mhdr, seq->lower);
687         msg_set_nameupper(mhdr, seq->upper);
688         msg_set_hdr_sz(mhdr, MCAST_H_SIZE);
689
690 new_mtu:
691         mtu = tipc_bcast_get_mtu(net);
692         rc = tipc_msg_build(mhdr, msg, 0, dsz, mtu, pktchain);
693         if (unlikely(rc < 0))
694                 return rc;
695
696         do {
697                 rc = tipc_bcast_xmit(net, pktchain);
698                 if (likely(!rc))
699                         return dsz;
700
701                 if (rc == -ELINKCONG) {
702                         tsk->link_cong = 1;
703                         rc = tipc_wait_for_sndmsg(sock, &timeo);
704                         if (!rc)
705                                 continue;
706                 }
707                 __skb_queue_purge(pktchain);
708                 if (rc == -EMSGSIZE) {
709                         msg->msg_iter = save;
710                         goto new_mtu;
711                 }
712                 break;
713         } while (1);
714         return rc;
715 }
716
717 /**
718  * tipc_sk_mcast_rcv - Deliver multicast messages to all destination sockets
719  * @arrvq: queue with arriving messages, to be cloned after destination lookup
720  * @inputq: queue with cloned messages, delivered to socket after dest lookup
721  *
722  * Multi-threaded: parallel calls with reference to same queues may occur
723  */
724 void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
725                        struct sk_buff_head *inputq)
726 {
727         struct tipc_msg *msg;
728         struct tipc_plist dports;
729         u32 portid;
730         u32 scope = TIPC_CLUSTER_SCOPE;
731         struct sk_buff_head tmpq;
732         uint hsz;
733         struct sk_buff *skb, *_skb;
734
735         __skb_queue_head_init(&tmpq);
736         tipc_plist_init(&dports);
737
738         skb = tipc_skb_peek(arrvq, &inputq->lock);
739         for (; skb; skb = tipc_skb_peek(arrvq, &inputq->lock)) {
740                 msg = buf_msg(skb);
741                 hsz = skb_headroom(skb) + msg_hdr_sz(msg);
742
743                 if (in_own_node(net, msg_orignode(msg)))
744                         scope = TIPC_NODE_SCOPE;
745
746                 /* Create destination port list and message clones: */
747                 tipc_nametbl_mc_translate(net,
748                                           msg_nametype(msg), msg_namelower(msg),
749                                           msg_nameupper(msg), scope, &dports);
750                 portid = tipc_plist_pop(&dports);
751                 for (; portid; portid = tipc_plist_pop(&dports)) {
752                         _skb = __pskb_copy(skb, hsz, GFP_ATOMIC);
753                         if (_skb) {
754                                 msg_set_destport(buf_msg(_skb), portid);
755                                 __skb_queue_tail(&tmpq, _skb);
756                                 continue;
757                         }
758                         pr_warn("Failed to clone mcast rcv buffer\n");
759                 }
760                 /* Append to inputq if not already done by other thread */
761                 spin_lock_bh(&inputq->lock);
762                 if (skb_peek(arrvq) == skb) {
763                         skb_queue_splice_tail_init(&tmpq, inputq);
764                         kfree_skb(__skb_dequeue(arrvq));
765                 }
766                 spin_unlock_bh(&inputq->lock);
767                 __skb_queue_purge(&tmpq);
768                 kfree_skb(skb);
769         }
770         tipc_sk_rcv(net, inputq);
771 }
772
773 /**
774  * tipc_sk_proto_rcv - receive a connection mng protocol message
775  * @tsk: receiving socket
776  * @skb: pointer to message buffer.
777  */
778 static void tipc_sk_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb)
779 {
780         struct sock *sk = &tsk->sk;
781         struct tipc_msg *hdr = buf_msg(skb);
782         int mtyp = msg_type(hdr);
783         int conn_cong;
784
785         /* Ignore if connection cannot be validated: */
786         if (!tsk_peer_msg(tsk, hdr))
787                 goto exit;
788
789         tsk->probing_state = TIPC_CONN_OK;
790
791         if (mtyp == CONN_PROBE) {
792                 msg_set_type(hdr, CONN_PROBE_REPLY);
793                 tipc_sk_respond(sk, skb, TIPC_OK);
794                 return;
795         } else if (mtyp == CONN_ACK) {
796                 conn_cong = tsk_conn_cong(tsk);
797                 tsk->sent_unacked -= msg_msgcnt(hdr);
798                 if (conn_cong)
799                         sk->sk_write_space(sk);
800         } else if (mtyp != CONN_PROBE_REPLY) {
801                 pr_warn("Received unknown CONN_PROTO msg\n");
802         }
803 exit:
804         kfree_skb(skb);
805 }
806
807 static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p)
808 {
809         struct sock *sk = sock->sk;
810         struct tipc_sock *tsk = tipc_sk(sk);
811         DEFINE_WAIT(wait);
812         int done;
813
814         do {
815                 int err = sock_error(sk);
816                 if (err)
817                         return err;
818                 if (sock->state == SS_DISCONNECTING)
819                         return -EPIPE;
820                 if (!*timeo_p)
821                         return -EAGAIN;
822                 if (signal_pending(current))
823                         return sock_intr_errno(*timeo_p);
824
825                 prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
826                 done = sk_wait_event(sk, timeo_p, !tsk->link_cong);
827                 finish_wait(sk_sleep(sk), &wait);
828         } while (!done);
829         return 0;
830 }
831
832 /**
833  * tipc_sendmsg - send message in connectionless manner
834  * @sock: socket structure
835  * @m: message to send
836  * @dsz: amount of user data to be sent
837  *
838  * Message must have an destination specified explicitly.
839  * Used for SOCK_RDM and SOCK_DGRAM messages,
840  * and for 'SYN' messages on SOCK_SEQPACKET and SOCK_STREAM connections.
841  * (Note: 'SYN+' is prohibited on SOCK_STREAM.)
842  *
843  * Returns the number of bytes sent on success, or errno otherwise
844  */
845 static int tipc_sendmsg(struct socket *sock,
846                         struct msghdr *m, size_t dsz)
847 {
848         struct sock *sk = sock->sk;
849         int ret;
850
851         lock_sock(sk);
852         ret = __tipc_sendmsg(sock, m, dsz);
853         release_sock(sk);
854
855         return ret;
856 }
857
858 static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz)
859 {
860         DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
861         struct sock *sk = sock->sk;
862         struct tipc_sock *tsk = tipc_sk(sk);
863         struct net *net = sock_net(sk);
864         struct tipc_msg *mhdr = &tsk->phdr;
865         u32 dnode, dport;
866         struct sk_buff_head *pktchain = &sk->sk_write_queue;
867         struct sk_buff *skb;
868         struct tipc_name_seq *seq;
869         struct iov_iter save;
870         u32 mtu;
871         long timeo;
872         int rc;
873
874         if (dsz > TIPC_MAX_USER_MSG_SIZE)
875                 return -EMSGSIZE;
876         if (unlikely(!dest)) {
877                 if (tsk->connected && sock->state == SS_READY)
878                         dest = &tsk->remote;
879                 else
880                         return -EDESTADDRREQ;
881         } else if (unlikely(m->msg_namelen < sizeof(*dest)) ||
882                    dest->family != AF_TIPC) {
883                 return -EINVAL;
884         }
885         if (unlikely(sock->state != SS_READY)) {
886                 if (sock->state == SS_LISTENING)
887                         return -EPIPE;
888                 if (sock->state != SS_UNCONNECTED)
889                         return -EISCONN;
890                 if (tsk->published)
891                         return -EOPNOTSUPP;
892                 if (dest->addrtype == TIPC_ADDR_NAME) {
893                         tsk->conn_type = dest->addr.name.name.type;
894                         tsk->conn_instance = dest->addr.name.name.instance;
895                 }
896         }
897         seq = &dest->addr.nameseq;
898         timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
899
900         if (dest->addrtype == TIPC_ADDR_MCAST) {
901                 return tipc_sendmcast(sock, seq, m, dsz, timeo);
902         } else if (dest->addrtype == TIPC_ADDR_NAME) {
903                 u32 type = dest->addr.name.name.type;
904                 u32 inst = dest->addr.name.name.instance;
905                 u32 domain = dest->addr.name.domain;
906
907                 dnode = domain;
908                 msg_set_type(mhdr, TIPC_NAMED_MSG);
909                 msg_set_hdr_sz(mhdr, NAMED_H_SIZE);
910                 msg_set_nametype(mhdr, type);
911                 msg_set_nameinst(mhdr, inst);
912                 msg_set_lookup_scope(mhdr, tipc_addr_scope(domain));
913                 dport = tipc_nametbl_translate(net, type, inst, &dnode);
914                 msg_set_destnode(mhdr, dnode);
915                 msg_set_destport(mhdr, dport);
916                 if (unlikely(!dport && !dnode))
917                         return -EHOSTUNREACH;
918         } else if (dest->addrtype == TIPC_ADDR_ID) {
919                 dnode = dest->addr.id.node;
920                 msg_set_type(mhdr, TIPC_DIRECT_MSG);
921                 msg_set_lookup_scope(mhdr, 0);
922                 msg_set_destnode(mhdr, dnode);
923                 msg_set_destport(mhdr, dest->addr.id.ref);
924                 msg_set_hdr_sz(mhdr, BASIC_H_SIZE);
925         }
926
927         save = m->msg_iter;
928 new_mtu:
929         mtu = tipc_node_get_mtu(net, dnode, tsk->portid);
930         rc = tipc_msg_build(mhdr, m, 0, dsz, mtu, pktchain);
931         if (rc < 0)
932                 return rc;
933
934         do {
935                 skb = skb_peek(pktchain);
936                 TIPC_SKB_CB(skb)->wakeup_pending = tsk->link_cong;
937                 rc = tipc_node_xmit(net, pktchain, dnode, tsk->portid);
938                 if (likely(!rc)) {
939                         if (sock->state != SS_READY)
940                                 sock->state = SS_CONNECTING;
941                         return dsz;
942                 }
943                 if (rc == -ELINKCONG) {
944                         tsk->link_cong = 1;
945                         rc = tipc_wait_for_sndmsg(sock, &timeo);
946                         if (!rc)
947                                 continue;
948                 }
949                 __skb_queue_purge(pktchain);
950                 if (rc == -EMSGSIZE) {
951                         m->msg_iter = save;
952                         goto new_mtu;
953                 }
954                 break;
955         } while (1);
956
957         return rc;
958 }
959
960 static int tipc_wait_for_sndpkt(struct socket *sock, long *timeo_p)
961 {
962         struct sock *sk = sock->sk;
963         struct tipc_sock *tsk = tipc_sk(sk);
964         DEFINE_WAIT(wait);
965         int done;
966
967         do {
968                 int err = sock_error(sk);
969                 if (err)
970                         return err;
971                 if (sock->state == SS_DISCONNECTING)
972                         return -EPIPE;
973                 else if (sock->state != SS_CONNECTED)
974                         return -ENOTCONN;
975                 if (!*timeo_p)
976                         return -EAGAIN;
977                 if (signal_pending(current))
978                         return sock_intr_errno(*timeo_p);
979
980                 prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
981                 done = sk_wait_event(sk, timeo_p,
982                                      (!tsk->link_cong &&
983                                       !tsk_conn_cong(tsk)) ||
984                                      !tsk->connected);
985                 finish_wait(sk_sleep(sk), &wait);
986         } while (!done);
987         return 0;
988 }
989
990 /**
991  * tipc_send_stream - send stream-oriented data
992  * @sock: socket structure
993  * @m: data to send
994  * @dsz: total length of data to be transmitted
995  *
996  * Used for SOCK_STREAM data.
997  *
998  * Returns the number of bytes sent on success (or partial success),
999  * or errno if no data sent
1000  */
1001 static int tipc_send_stream(struct socket *sock, struct msghdr *m, size_t dsz)
1002 {
1003         struct sock *sk = sock->sk;
1004         int ret;
1005
1006         lock_sock(sk);
1007         ret = __tipc_send_stream(sock, m, dsz);
1008         release_sock(sk);
1009
1010         return ret;
1011 }
1012
1013 static int __tipc_send_stream(struct socket *sock, struct msghdr *m, size_t dsz)
1014 {
1015         struct sock *sk = sock->sk;
1016         struct net *net = sock_net(sk);
1017         struct tipc_sock *tsk = tipc_sk(sk);
1018         struct tipc_msg *mhdr = &tsk->phdr;
1019         struct sk_buff_head *pktchain = &sk->sk_write_queue;
1020         DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
1021         u32 portid = tsk->portid;
1022         int rc = -EINVAL;
1023         long timeo;
1024         u32 dnode;
1025         uint mtu, send, sent = 0;
1026         struct iov_iter save;
1027
1028         /* Handle implied connection establishment */
1029         if (unlikely(dest)) {
1030                 rc = __tipc_sendmsg(sock, m, dsz);
1031                 if (dsz && (dsz == rc))
1032                         tsk->sent_unacked = 1;
1033                 return rc;
1034         }
1035         if (dsz > (uint)INT_MAX)
1036                 return -EMSGSIZE;
1037
1038         if (unlikely(sock->state != SS_CONNECTED)) {
1039                 if (sock->state == SS_DISCONNECTING)
1040                         return -EPIPE;
1041                 else
1042                         return -ENOTCONN;
1043         }
1044
1045         timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
1046         dnode = tsk_peer_node(tsk);
1047
1048 next:
1049         save = m->msg_iter;
1050         mtu = tsk->max_pkt;
1051         send = min_t(uint, dsz - sent, TIPC_MAX_USER_MSG_SIZE);
1052         rc = tipc_msg_build(mhdr, m, sent, send, mtu, pktchain);
1053         if (unlikely(rc < 0))
1054                 return rc;
1055         do {
1056                 if (likely(!tsk_conn_cong(tsk))) {
1057                         rc = tipc_node_xmit(net, pktchain, dnode, portid);
1058                         if (likely(!rc)) {
1059                                 tsk->sent_unacked++;
1060                                 sent += send;
1061                                 if (sent == dsz)
1062                                         return dsz;
1063                                 goto next;
1064                         }
1065                         if (rc == -EMSGSIZE) {
1066                                 __skb_queue_purge(pktchain);
1067                                 tsk->max_pkt = tipc_node_get_mtu(net, dnode,
1068                                                                  portid);
1069                                 m->msg_iter = save;
1070                                 goto next;
1071                         }
1072                         if (rc != -ELINKCONG)
1073                                 break;
1074
1075                         tsk->link_cong = 1;
1076                 }
1077                 rc = tipc_wait_for_sndpkt(sock, &timeo);
1078         } while (!rc);
1079
1080         __skb_queue_purge(pktchain);
1081         return sent ? sent : rc;
1082 }
1083
1084 /**
1085  * tipc_send_packet - send a connection-oriented message
1086  * @sock: socket structure
1087  * @m: message to send
1088  * @dsz: length of data to be transmitted
1089  *
1090  * Used for SOCK_SEQPACKET messages.
1091  *
1092  * Returns the number of bytes sent on success, or errno otherwise
1093  */
1094 static int tipc_send_packet(struct socket *sock, struct msghdr *m, size_t dsz)
1095 {
1096         if (dsz > TIPC_MAX_USER_MSG_SIZE)
1097                 return -EMSGSIZE;
1098
1099         return tipc_send_stream(sock, m, dsz);
1100 }
1101
1102 /* tipc_sk_finish_conn - complete the setup of a connection
1103  */
1104 static void tipc_sk_finish_conn(struct tipc_sock *tsk, u32 peer_port,
1105                                 u32 peer_node)
1106 {
1107         struct sock *sk = &tsk->sk;
1108         struct net *net = sock_net(sk);
1109         struct tipc_msg *msg = &tsk->phdr;
1110
1111         msg_set_destnode(msg, peer_node);
1112         msg_set_destport(msg, peer_port);
1113         msg_set_type(msg, TIPC_CONN_MSG);
1114         msg_set_lookup_scope(msg, 0);
1115         msg_set_hdr_sz(msg, SHORT_H_SIZE);
1116
1117         tsk->probing_intv = CONN_PROBING_INTERVAL;
1118         tsk->probing_state = TIPC_CONN_OK;
1119         tsk->connected = 1;
1120         sk_reset_timer(sk, &sk->sk_timer, jiffies + tsk->probing_intv);
1121         tipc_node_add_conn(net, peer_node, tsk->portid, peer_port);
1122         tsk->max_pkt = tipc_node_get_mtu(net, peer_node, tsk->portid);
1123 }
1124
1125 /**
1126  * set_orig_addr - capture sender's address for received message
1127  * @m: descriptor for message info
1128  * @msg: received message header
1129  *
1130  * Note: Address is not captured if not requested by receiver.
1131  */
1132 static void set_orig_addr(struct msghdr *m, struct tipc_msg *msg)
1133 {
1134         DECLARE_SOCKADDR(struct sockaddr_tipc *, addr, m->msg_name);
1135
1136         if (addr) {
1137                 addr->family = AF_TIPC;
1138                 addr->addrtype = TIPC_ADDR_ID;
1139                 memset(&addr->addr, 0, sizeof(addr->addr));
1140                 addr->addr.id.ref = msg_origport(msg);
1141                 addr->addr.id.node = msg_orignode(msg);
1142                 addr->addr.name.domain = 0;     /* could leave uninitialized */
1143                 addr->scope = 0;                /* could leave uninitialized */
1144                 m->msg_namelen = sizeof(struct sockaddr_tipc);
1145         }
1146 }
1147
1148 /**
1149  * tipc_sk_anc_data_recv - optionally capture ancillary data for received message
1150  * @m: descriptor for message info
1151  * @msg: received message header
1152  * @tsk: TIPC port associated with message
1153  *
1154  * Note: Ancillary data is not captured if not requested by receiver.
1155  *
1156  * Returns 0 if successful, otherwise errno
1157  */
1158 static int tipc_sk_anc_data_recv(struct msghdr *m, struct tipc_msg *msg,
1159                                  struct tipc_sock *tsk)
1160 {
1161         u32 anc_data[3];
1162         u32 err;
1163         u32 dest_type;
1164         int has_name;
1165         int res;
1166
1167         if (likely(m->msg_controllen == 0))
1168                 return 0;
1169
1170         /* Optionally capture errored message object(s) */
1171         err = msg ? msg_errcode(msg) : 0;
1172         if (unlikely(err)) {
1173                 anc_data[0] = err;
1174                 anc_data[1] = msg_data_sz(msg);
1175                 res = put_cmsg(m, SOL_TIPC, TIPC_ERRINFO, 8, anc_data);
1176                 if (res)
1177                         return res;
1178                 if (anc_data[1]) {
1179                         res = put_cmsg(m, SOL_TIPC, TIPC_RETDATA, anc_data[1],
1180                                        msg_data(msg));
1181                         if (res)
1182                                 return res;
1183                 }
1184         }
1185
1186         /* Optionally capture message destination object */
1187         dest_type = msg ? msg_type(msg) : TIPC_DIRECT_MSG;
1188         switch (dest_type) {
1189         case TIPC_NAMED_MSG:
1190                 has_name = 1;
1191                 anc_data[0] = msg_nametype(msg);
1192                 anc_data[1] = msg_namelower(msg);
1193                 anc_data[2] = msg_namelower(msg);
1194                 break;
1195         case TIPC_MCAST_MSG:
1196                 has_name = 1;
1197                 anc_data[0] = msg_nametype(msg);
1198                 anc_data[1] = msg_namelower(msg);
1199                 anc_data[2] = msg_nameupper(msg);
1200                 break;
1201         case TIPC_CONN_MSG:
1202                 has_name = (tsk->conn_type != 0);
1203                 anc_data[0] = tsk->conn_type;
1204                 anc_data[1] = tsk->conn_instance;
1205                 anc_data[2] = tsk->conn_instance;
1206                 break;
1207         default:
1208                 has_name = 0;
1209         }
1210         if (has_name) {
1211                 res = put_cmsg(m, SOL_TIPC, TIPC_DESTNAME, 12, anc_data);
1212                 if (res)
1213                         return res;
1214         }
1215
1216         return 0;
1217 }
1218
1219 static void tipc_sk_send_ack(struct tipc_sock *tsk, uint ack)
1220 {
1221         struct net *net = sock_net(&tsk->sk);
1222         struct sk_buff *skb = NULL;
1223         struct tipc_msg *msg;
1224         u32 peer_port = tsk_peer_port(tsk);
1225         u32 dnode = tsk_peer_node(tsk);
1226
1227         if (!tsk->connected)
1228                 return;
1229         skb = tipc_msg_create(CONN_MANAGER, CONN_ACK, INT_H_SIZE, 0,
1230                               dnode, tsk_own_node(tsk), peer_port,
1231                               tsk->portid, TIPC_OK);
1232         if (!skb)
1233                 return;
1234         msg = buf_msg(skb);
1235         msg_set_msgcnt(msg, ack);
1236         tipc_node_xmit_skb(net, skb, dnode, msg_link_selector(msg));
1237 }
1238
1239 static int tipc_wait_for_rcvmsg(struct socket *sock, long *timeop)
1240 {
1241         struct sock *sk = sock->sk;
1242         DEFINE_WAIT(wait);
1243         long timeo = *timeop;
1244         int err;
1245
1246         for (;;) {
1247                 prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
1248                 if (timeo && skb_queue_empty(&sk->sk_receive_queue)) {
1249                         if (sock->state == SS_DISCONNECTING) {
1250                                 err = -ENOTCONN;
1251                                 break;
1252                         }
1253                         release_sock(sk);
1254                         timeo = schedule_timeout(timeo);
1255                         lock_sock(sk);
1256                 }
1257                 err = 0;
1258                 if (!skb_queue_empty(&sk->sk_receive_queue))
1259                         break;
1260                 err = -EAGAIN;
1261                 if (!timeo)
1262                         break;
1263                 err = sock_intr_errno(timeo);
1264                 if (signal_pending(current))
1265                         break;
1266         }
1267         finish_wait(sk_sleep(sk), &wait);
1268         *timeop = timeo;
1269         return err;
1270 }
1271
1272 /**
1273  * tipc_recvmsg - receive packet-oriented message
1274  * @m: descriptor for message info
1275  * @buf_len: total size of user buffer area
1276  * @flags: receive flags
1277  *
1278  * Used for SOCK_DGRAM, SOCK_RDM, and SOCK_SEQPACKET messages.
1279  * If the complete message doesn't fit in user area, truncate it.
1280  *
1281  * Returns size of returned message data, errno otherwise
1282  */
1283 static int tipc_recvmsg(struct socket *sock, struct msghdr *m, size_t buf_len,
1284                         int flags)
1285 {
1286         struct sock *sk = sock->sk;
1287         struct tipc_sock *tsk = tipc_sk(sk);
1288         struct sk_buff *buf;
1289         struct tipc_msg *msg;
1290         long timeo;
1291         unsigned int sz;
1292         u32 err;
1293         int res;
1294
1295         /* Catch invalid receive requests */
1296         if (unlikely(!buf_len))
1297                 return -EINVAL;
1298
1299         lock_sock(sk);
1300
1301         if (unlikely(sock->state == SS_UNCONNECTED)) {
1302                 res = -ENOTCONN;
1303                 goto exit;
1304         }
1305
1306         timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
1307 restart:
1308
1309         /* Look for a message in receive queue; wait if necessary */
1310         res = tipc_wait_for_rcvmsg(sock, &timeo);
1311         if (res)
1312                 goto exit;
1313
1314         /* Look at first message in receive queue */
1315         buf = skb_peek(&sk->sk_receive_queue);
1316         msg = buf_msg(buf);
1317         sz = msg_data_sz(msg);
1318         err = msg_errcode(msg);
1319
1320         /* Discard an empty non-errored message & try again */
1321         if ((!sz) && (!err)) {
1322                 tsk_advance_rx_queue(sk);
1323                 goto restart;
1324         }
1325
1326         /* Capture sender's address (optional) */
1327         set_orig_addr(m, msg);
1328
1329         /* Capture ancillary data (optional) */
1330         res = tipc_sk_anc_data_recv(m, msg, tsk);
1331         if (res)
1332                 goto exit;
1333
1334         /* Capture message data (if valid) & compute return value (always) */
1335         if (!err) {
1336                 if (unlikely(buf_len < sz)) {
1337                         sz = buf_len;
1338                         m->msg_flags |= MSG_TRUNC;
1339                 }
1340                 res = skb_copy_datagram_msg(buf, msg_hdr_sz(msg), m, sz);
1341                 if (res)
1342                         goto exit;
1343                 res = sz;
1344         } else {
1345                 if ((sock->state == SS_READY) ||
1346                     ((err == TIPC_CONN_SHUTDOWN) || m->msg_control))
1347                         res = 0;
1348                 else
1349                         res = -ECONNRESET;
1350         }
1351
1352         /* Consume received message (optional) */
1353         if (likely(!(flags & MSG_PEEK))) {
1354                 if ((sock->state != SS_READY) &&
1355                     (++tsk->rcv_unacked >= TIPC_CONNACK_INTV)) {
1356                         tipc_sk_send_ack(tsk, tsk->rcv_unacked);
1357                         tsk->rcv_unacked = 0;
1358                 }
1359                 tsk_advance_rx_queue(sk);
1360         }
1361 exit:
1362         release_sock(sk);
1363         return res;
1364 }
1365
1366 /**
1367  * tipc_recv_stream - receive stream-oriented data
1368  * @m: descriptor for message info
1369  * @buf_len: total size of user buffer area
1370  * @flags: receive flags
1371  *
1372  * Used for SOCK_STREAM messages only.  If not enough data is available
1373  * will optionally wait for more; never truncates data.
1374  *
1375  * Returns size of returned message data, errno otherwise
1376  */
1377 static int tipc_recv_stream(struct socket *sock, struct msghdr *m,
1378                             size_t buf_len, int flags)
1379 {
1380         struct sock *sk = sock->sk;
1381         struct tipc_sock *tsk = tipc_sk(sk);
1382         struct sk_buff *buf;
1383         struct tipc_msg *msg;
1384         long timeo;
1385         unsigned int sz;
1386         int sz_to_copy, target, needed;
1387         int sz_copied = 0;
1388         u32 err;
1389         int res = 0;
1390
1391         /* Catch invalid receive attempts */
1392         if (unlikely(!buf_len))
1393                 return -EINVAL;
1394
1395         lock_sock(sk);
1396
1397         if (unlikely(sock->state == SS_UNCONNECTED)) {
1398                 res = -ENOTCONN;
1399                 goto exit;
1400         }
1401
1402         target = sock_rcvlowat(sk, flags & MSG_WAITALL, buf_len);
1403         timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
1404
1405 restart:
1406         /* Look for a message in receive queue; wait if necessary */
1407         res = tipc_wait_for_rcvmsg(sock, &timeo);
1408         if (res)
1409                 goto exit;
1410
1411         /* Look at first message in receive queue */
1412         buf = skb_peek(&sk->sk_receive_queue);
1413         msg = buf_msg(buf);
1414         sz = msg_data_sz(msg);
1415         err = msg_errcode(msg);
1416
1417         /* Discard an empty non-errored message & try again */
1418         if ((!sz) && (!err)) {
1419                 tsk_advance_rx_queue(sk);
1420                 goto restart;
1421         }
1422
1423         /* Optionally capture sender's address & ancillary data of first msg */
1424         if (sz_copied == 0) {
1425                 set_orig_addr(m, msg);
1426                 res = tipc_sk_anc_data_recv(m, msg, tsk);
1427                 if (res)
1428                         goto exit;
1429         }
1430
1431         /* Capture message data (if valid) & compute return value (always) */
1432         if (!err) {
1433                 u32 offset = (u32)(unsigned long)(TIPC_SKB_CB(buf)->handle);
1434
1435                 sz -= offset;
1436                 needed = (buf_len - sz_copied);
1437                 sz_to_copy = (sz <= needed) ? sz : needed;
1438
1439                 res = skb_copy_datagram_msg(buf, msg_hdr_sz(msg) + offset,
1440                                             m, sz_to_copy);
1441                 if (res)
1442                         goto exit;
1443
1444                 sz_copied += sz_to_copy;
1445
1446                 if (sz_to_copy < sz) {
1447                         if (!(flags & MSG_PEEK))
1448                                 TIPC_SKB_CB(buf)->handle =
1449                                 (void *)(unsigned long)(offset + sz_to_copy);
1450                         goto exit;
1451                 }
1452         } else {
1453                 if (sz_copied != 0)
1454                         goto exit; /* can't add error msg to valid data */
1455
1456                 if ((err == TIPC_CONN_SHUTDOWN) || m->msg_control)
1457                         res = 0;
1458                 else
1459                         res = -ECONNRESET;
1460         }
1461
1462         /* Consume received message (optional) */
1463         if (likely(!(flags & MSG_PEEK))) {
1464                 if (unlikely(++tsk->rcv_unacked >= TIPC_CONNACK_INTV)) {
1465                         tipc_sk_send_ack(tsk, tsk->rcv_unacked);
1466                         tsk->rcv_unacked = 0;
1467                 }
1468                 tsk_advance_rx_queue(sk);
1469         }
1470
1471         /* Loop around if more data is required */
1472         if ((sz_copied < buf_len) &&    /* didn't get all requested data */
1473             (!skb_queue_empty(&sk->sk_receive_queue) ||
1474             (sz_copied < target)) &&    /* and more is ready or required */
1475             (!(flags & MSG_PEEK)) &&    /* and aren't just peeking at data */
1476             (!err))                     /* and haven't reached a FIN */
1477                 goto restart;
1478
1479 exit:
1480         release_sock(sk);
1481         return sz_copied ? sz_copied : res;
1482 }
1483
1484 /**
1485  * tipc_write_space - wake up thread if port congestion is released
1486  * @sk: socket
1487  */
1488 static void tipc_write_space(struct sock *sk)
1489 {
1490         struct socket_wq *wq;
1491
1492         rcu_read_lock();
1493         wq = rcu_dereference(sk->sk_wq);
1494         if (wq_has_sleeper(wq))
1495                 wake_up_interruptible_sync_poll(&wq->wait, POLLOUT |
1496                                                 POLLWRNORM | POLLWRBAND);
1497         rcu_read_unlock();
1498 }
1499
1500 /**
1501  * tipc_data_ready - wake up threads to indicate messages have been received
1502  * @sk: socket
1503  * @len: the length of messages
1504  */
1505 static void tipc_data_ready(struct sock *sk)
1506 {
1507         struct socket_wq *wq;
1508
1509         rcu_read_lock();
1510         wq = rcu_dereference(sk->sk_wq);
1511         if (wq_has_sleeper(wq))
1512                 wake_up_interruptible_sync_poll(&wq->wait, POLLIN |
1513                                                 POLLRDNORM | POLLRDBAND);
1514         rcu_read_unlock();
1515 }
1516
1517 static void tipc_sock_destruct(struct sock *sk)
1518 {
1519         __skb_queue_purge(&sk->sk_receive_queue);
1520 }
1521
1522 /**
1523  * filter_connect - Handle all incoming messages for a connection-based socket
1524  * @tsk: TIPC socket
1525  * @skb: pointer to message buffer. Set to NULL if buffer is consumed
1526  *
1527  * Returns true if everything ok, false otherwise
1528  */
1529 static bool filter_connect(struct tipc_sock *tsk, struct sk_buff *skb)
1530 {
1531         struct sock *sk = &tsk->sk;
1532         struct net *net = sock_net(sk);
1533         struct socket *sock = sk->sk_socket;
1534         struct tipc_msg *hdr = buf_msg(skb);
1535
1536         if (unlikely(msg_mcast(hdr)))
1537                 return false;
1538
1539         switch ((int)sock->state) {
1540         case SS_CONNECTED:
1541
1542                 /* Accept only connection-based messages sent by peer */
1543                 if (unlikely(!tsk_peer_msg(tsk, hdr)))
1544                         return false;
1545
1546                 if (unlikely(msg_errcode(hdr))) {
1547                         sock->state = SS_DISCONNECTING;
1548                         tsk->connected = 0;
1549                         /* Let timer expire on it's own */
1550                         tipc_node_remove_conn(net, tsk_peer_node(tsk),
1551                                               tsk->portid);
1552                 }
1553                 return true;
1554
1555         case SS_CONNECTING:
1556
1557                 /* Accept only ACK or NACK message */
1558                 if (unlikely(!msg_connected(hdr)))
1559                         return false;
1560
1561                 if (unlikely(msg_errcode(hdr))) {
1562                         sock->state = SS_DISCONNECTING;
1563                         sk->sk_err = ECONNREFUSED;
1564                         return true;
1565                 }
1566
1567                 if (unlikely(!msg_isdata(hdr))) {
1568                         sock->state = SS_DISCONNECTING;
1569                         sk->sk_err = EINVAL;
1570                         return true;
1571                 }
1572
1573                 tipc_sk_finish_conn(tsk, msg_origport(hdr), msg_orignode(hdr));
1574                 msg_set_importance(&tsk->phdr, msg_importance(hdr));
1575                 sock->state = SS_CONNECTED;
1576
1577                 /* If 'ACK+' message, add to socket receive queue */
1578                 if (msg_data_sz(hdr))
1579                         return true;
1580
1581                 /* If empty 'ACK-' message, wake up sleeping connect() */
1582                 if (waitqueue_active(sk_sleep(sk)))
1583                         wake_up_interruptible(sk_sleep(sk));
1584
1585                 /* 'ACK-' message is neither accepted nor rejected: */
1586                 msg_set_dest_droppable(hdr, 1);
1587                 return false;
1588
1589         case SS_LISTENING:
1590         case SS_UNCONNECTED:
1591
1592                 /* Accept only SYN message */
1593                 if (!msg_connected(hdr) && !(msg_errcode(hdr)))
1594                         return true;
1595                 break;
1596         case SS_DISCONNECTING:
1597                 break;
1598         default:
1599                 pr_err("Unknown socket state %u\n", sock->state);
1600         }
1601         return false;
1602 }
1603
1604 /**
1605  * rcvbuf_limit - get proper overload limit of socket receive queue
1606  * @sk: socket
1607  * @buf: message
1608  *
1609  * For all connection oriented messages, irrespective of importance,
1610  * the default overload value (i.e. 67MB) is set as limit.
1611  *
1612  * For all connectionless messages, by default new queue limits are
1613  * as belows:
1614  *
1615  * TIPC_LOW_IMPORTANCE       (4 MB)
1616  * TIPC_MEDIUM_IMPORTANCE    (8 MB)
1617  * TIPC_HIGH_IMPORTANCE      (16 MB)
1618  * TIPC_CRITICAL_IMPORTANCE  (32 MB)
1619  *
1620  * Returns overload limit according to corresponding message importance
1621  */
1622 static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *buf)
1623 {
1624         struct tipc_msg *msg = buf_msg(buf);
1625
1626         if (msg_connected(msg))
1627                 return sysctl_tipc_rmem[2];
1628
1629         return sk->sk_rcvbuf >> TIPC_CRITICAL_IMPORTANCE <<
1630                 msg_importance(msg);
1631 }
1632
1633 /**
1634  * filter_rcv - validate incoming message
1635  * @sk: socket
1636  * @skb: pointer to message.
1637  *
1638  * Enqueues message on receive queue if acceptable; optionally handles
1639  * disconnect indication for a connected socket.
1640  *
1641  * Called with socket lock already taken
1642  *
1643  * Returns true if message was added to socket receive queue, otherwise false
1644  */
1645 static bool filter_rcv(struct sock *sk, struct sk_buff *skb)
1646 {
1647         struct socket *sock = sk->sk_socket;
1648         struct tipc_sock *tsk = tipc_sk(sk);
1649         struct tipc_msg *hdr = buf_msg(skb);
1650         unsigned int limit = rcvbuf_limit(sk, skb);
1651         int err = TIPC_OK;
1652         int usr = msg_user(hdr);
1653
1654         if (unlikely(msg_user(hdr) == CONN_MANAGER)) {
1655                 tipc_sk_proto_rcv(tsk, skb);
1656                 return false;
1657         }
1658
1659         if (unlikely(usr == SOCK_WAKEUP)) {
1660                 kfree_skb(skb);
1661                 tsk->link_cong = 0;
1662                 sk->sk_write_space(sk);
1663                 return false;
1664         }
1665
1666         /* Drop if illegal message type */
1667         if (unlikely(msg_type(hdr) > TIPC_DIRECT_MSG)) {
1668                 kfree_skb(skb);
1669                 return false;
1670         }
1671
1672         /* Reject if wrong message type for current socket state */
1673         if (unlikely(sock->state == SS_READY)) {
1674                 if (msg_connected(hdr)) {
1675                         err = TIPC_ERR_NO_PORT;
1676                         goto reject;
1677                 }
1678         } else if (unlikely(!filter_connect(tsk, skb))) {
1679                 err = TIPC_ERR_NO_PORT;
1680                 goto reject;
1681         }
1682
1683         /* Reject message if there isn't room to queue it */
1684         if (unlikely(sk_rmem_alloc_get(sk) + skb->truesize >= limit)) {
1685                 err = TIPC_ERR_OVERLOAD;
1686                 goto reject;
1687         }
1688
1689         /* Enqueue message */
1690         TIPC_SKB_CB(skb)->handle = NULL;
1691         __skb_queue_tail(&sk->sk_receive_queue, skb);
1692         skb_set_owner_r(skb, sk);
1693
1694         sk->sk_data_ready(sk);
1695         return true;
1696
1697 reject:
1698         tipc_sk_respond(sk, skb, err);
1699         return false;
1700 }
1701
1702 /**
1703  * tipc_backlog_rcv - handle incoming message from backlog queue
1704  * @sk: socket
1705  * @skb: message
1706  *
1707  * Caller must hold socket lock
1708  *
1709  * Returns 0
1710  */
1711 static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb)
1712 {
1713         unsigned int truesize = skb->truesize;
1714
1715         if (likely(filter_rcv(sk, skb)))
1716                 atomic_add(truesize, &tipc_sk(sk)->dupl_rcvcnt);
1717         return 0;
1718 }
1719
1720 /**
1721  * tipc_sk_enqueue - extract all buffers with destination 'dport' from
1722  *                   inputq and try adding them to socket or backlog queue
1723  * @inputq: list of incoming buffers with potentially different destinations
1724  * @sk: socket where the buffers should be enqueued
1725  * @dport: port number for the socket
1726  *
1727  * Caller must hold socket lock
1728  */
1729 static void tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk,
1730                             u32 dport)
1731 {
1732         unsigned int lim;
1733         atomic_t *dcnt;
1734         struct sk_buff *skb;
1735         unsigned long time_limit = jiffies + 2;
1736
1737         while (skb_queue_len(inputq)) {
1738                 if (unlikely(time_after_eq(jiffies, time_limit)))
1739                         return;
1740
1741                 skb = tipc_skb_dequeue(inputq, dport);
1742                 if (unlikely(!skb))
1743                         return;
1744
1745                 /* Add message directly to receive queue if possible */
1746                 if (!sock_owned_by_user(sk)) {
1747                         filter_rcv(sk, skb);
1748                         continue;
1749                 }
1750
1751                 /* Try backlog, compensating for double-counted bytes */
1752                 dcnt = &tipc_sk(sk)->dupl_rcvcnt;
1753                 if (sk->sk_backlog.len)
1754                         atomic_set(dcnt, 0);
1755                 lim = rcvbuf_limit(sk, skb) + atomic_read(dcnt);
1756                 if (likely(!sk_add_backlog(sk, skb, lim)))
1757                         continue;
1758
1759                 /* Overload => reject message back to sender */
1760                 tipc_sk_respond(sk, skb, TIPC_ERR_OVERLOAD);
1761                 break;
1762         }
1763 }
1764
1765 /**
1766  * tipc_sk_rcv - handle a chain of incoming buffers
1767  * @inputq: buffer list containing the buffers
1768  * Consumes all buffers in list until inputq is empty
1769  * Note: may be called in multiple threads referring to the same queue
1770  */
1771 void tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq)
1772 {
1773         u32 dnode, dport = 0;
1774         int err;
1775         struct tipc_sock *tsk;
1776         struct sock *sk;
1777         struct sk_buff *skb;
1778
1779         while (skb_queue_len(inputq)) {
1780                 dport = tipc_skb_peek_port(inputq, dport);
1781                 tsk = tipc_sk_lookup(net, dport);
1782
1783                 if (likely(tsk)) {
1784                         sk = &tsk->sk;
1785                         if (likely(spin_trylock_bh(&sk->sk_lock.slock))) {
1786                                 tipc_sk_enqueue(inputq, sk, dport);
1787                                 spin_unlock_bh(&sk->sk_lock.slock);
1788                         }
1789                         sock_put(sk);
1790                         continue;
1791                 }
1792
1793                 /* No destination socket => dequeue skb if still there */
1794                 skb = tipc_skb_dequeue(inputq, dport);
1795                 if (!skb)
1796                         return;
1797
1798                 /* Try secondary lookup if unresolved named message */
1799                 err = TIPC_ERR_NO_PORT;
1800                 if (tipc_msg_lookup_dest(net, skb, &err))
1801                         goto xmit;
1802
1803                 /* Prepare for message rejection */
1804                 if (!tipc_msg_reverse(tipc_own_addr(net), &skb, err))
1805                         continue;
1806 xmit:
1807                 dnode = msg_destnode(buf_msg(skb));
1808                 tipc_node_xmit_skb(net, skb, dnode, dport);
1809         }
1810 }
1811
1812 static int tipc_wait_for_connect(struct socket *sock, long *timeo_p)
1813 {
1814         struct sock *sk = sock->sk;
1815         DEFINE_WAIT(wait);
1816         int done;
1817
1818         do {
1819                 int err = sock_error(sk);
1820                 if (err)
1821                         return err;
1822                 if (!*timeo_p)
1823                         return -ETIMEDOUT;
1824                 if (signal_pending(current))
1825                         return sock_intr_errno(*timeo_p);
1826
1827                 prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
1828                 done = sk_wait_event(sk, timeo_p, sock->state != SS_CONNECTING);
1829                 finish_wait(sk_sleep(sk), &wait);
1830         } while (!done);
1831         return 0;
1832 }
1833
1834 /**
1835  * tipc_connect - establish a connection to another TIPC port
1836  * @sock: socket structure
1837  * @dest: socket address for destination port
1838  * @destlen: size of socket address data structure
1839  * @flags: file-related flags associated with socket
1840  *
1841  * Returns 0 on success, errno otherwise
1842  */
1843 static int tipc_connect(struct socket *sock, struct sockaddr *dest,
1844                         int destlen, int flags)
1845 {
1846         struct sock *sk = sock->sk;
1847         struct tipc_sock *tsk = tipc_sk(sk);
1848         struct sockaddr_tipc *dst = (struct sockaddr_tipc *)dest;
1849         struct msghdr m = {NULL,};
1850         long timeout = (flags & O_NONBLOCK) ? 0 : tsk->conn_timeout;
1851         socket_state previous;
1852         int res = 0;
1853
1854         lock_sock(sk);
1855
1856         /* DGRAM/RDM connect(), just save the destaddr */
1857         if (sock->state == SS_READY) {
1858                 if (dst->family == AF_UNSPEC) {
1859                         memset(&tsk->remote, 0, sizeof(struct sockaddr_tipc));
1860                         tsk->connected = 0;
1861                 } else if (destlen != sizeof(struct sockaddr_tipc)) {
1862                         res = -EINVAL;
1863                 } else {
1864                         memcpy(&tsk->remote, dest, destlen);
1865                         tsk->connected = 1;
1866                 }
1867                 goto exit;
1868         }
1869
1870         /*
1871          * Reject connection attempt using multicast address
1872          *
1873          * Note: send_msg() validates the rest of the address fields,
1874          *       so there's no need to do it here
1875          */
1876         if (dst->addrtype == TIPC_ADDR_MCAST) {
1877                 res = -EINVAL;
1878                 goto exit;
1879         }
1880
1881         previous = sock->state;
1882         switch (sock->state) {
1883         case SS_UNCONNECTED:
1884                 /* Send a 'SYN-' to destination */
1885                 m.msg_name = dest;
1886                 m.msg_namelen = destlen;
1887
1888                 /* If connect is in non-blocking case, set MSG_DONTWAIT to
1889                  * indicate send_msg() is never blocked.
1890                  */
1891                 if (!timeout)
1892                         m.msg_flags = MSG_DONTWAIT;
1893
1894                 res = __tipc_sendmsg(sock, &m, 0);
1895                 if ((res < 0) && (res != -EWOULDBLOCK))
1896                         goto exit;
1897
1898                 /* Just entered SS_CONNECTING state; the only
1899                  * difference is that return value in non-blocking
1900                  * case is EINPROGRESS, rather than EALREADY.
1901                  */
1902                 res = -EINPROGRESS;
1903         case SS_CONNECTING:
1904                 if (previous == SS_CONNECTING)
1905                         res = -EALREADY;
1906                 if (!timeout)
1907                         goto exit;
1908                 timeout = msecs_to_jiffies(timeout);
1909                 /* Wait until an 'ACK' or 'RST' arrives, or a timeout occurs */
1910                 res = tipc_wait_for_connect(sock, &timeout);
1911                 break;
1912         case SS_CONNECTED:
1913                 res = -EISCONN;
1914                 break;
1915         default:
1916                 res = -EINVAL;
1917                 break;
1918         }
1919 exit:
1920         release_sock(sk);
1921         return res;
1922 }
1923
1924 /**
1925  * tipc_listen - allow socket to listen for incoming connections
1926  * @sock: socket structure
1927  * @len: (unused)
1928  *
1929  * Returns 0 on success, errno otherwise
1930  */
1931 static int tipc_listen(struct socket *sock, int len)
1932 {
1933         struct sock *sk = sock->sk;
1934         int res;
1935
1936         lock_sock(sk);
1937
1938         if (sock->state != SS_UNCONNECTED)
1939                 res = -EINVAL;
1940         else {
1941                 sock->state = SS_LISTENING;
1942                 res = 0;
1943         }
1944
1945         release_sock(sk);
1946         return res;
1947 }
1948
1949 static int tipc_wait_for_accept(struct socket *sock, long timeo)
1950 {
1951         struct sock *sk = sock->sk;
1952         DEFINE_WAIT(wait);
1953         int err;
1954
1955         /* True wake-one mechanism for incoming connections: only
1956          * one process gets woken up, not the 'whole herd'.
1957          * Since we do not 'race & poll' for established sockets
1958          * anymore, the common case will execute the loop only once.
1959         */
1960         for (;;) {
1961                 prepare_to_wait_exclusive(sk_sleep(sk), &wait,
1962                                           TASK_INTERRUPTIBLE);
1963                 if (timeo && skb_queue_empty(&sk->sk_receive_queue)) {
1964                         release_sock(sk);
1965                         timeo = schedule_timeout(timeo);
1966                         lock_sock(sk);
1967                 }
1968                 err = 0;
1969                 if (!skb_queue_empty(&sk->sk_receive_queue))
1970                         break;
1971                 err = -EINVAL;
1972                 if (sock->state != SS_LISTENING)
1973                         break;
1974                 err = -EAGAIN;
1975                 if (!timeo)
1976                         break;
1977                 err = sock_intr_errno(timeo);
1978                 if (signal_pending(current))
1979                         break;
1980         }
1981         finish_wait(sk_sleep(sk), &wait);
1982         return err;
1983 }
1984
1985 /**
1986  * tipc_accept - wait for connection request
1987  * @sock: listening socket
1988  * @newsock: new socket that is to be connected
1989  * @flags: file-related flags associated with socket
1990  *
1991  * Returns 0 on success, errno otherwise
1992  */
1993 static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags)
1994 {
1995         struct sock *new_sk, *sk = sock->sk;
1996         struct sk_buff *buf;
1997         struct tipc_sock *new_tsock;
1998         struct tipc_msg *msg;
1999         long timeo;
2000         int res;
2001
2002         lock_sock(sk);
2003
2004         if (sock->state != SS_LISTENING) {
2005                 res = -EINVAL;
2006                 goto exit;
2007         }
2008         timeo = sock_rcvtimeo(sk, flags & O_NONBLOCK);
2009         res = tipc_wait_for_accept(sock, timeo);
2010         if (res)
2011                 goto exit;
2012
2013         buf = skb_peek(&sk->sk_receive_queue);
2014
2015         res = tipc_sk_create(sock_net(sock->sk), new_sock, 0, 1);
2016         if (res)
2017                 goto exit;
2018         security_sk_clone(sock->sk, new_sock->sk);
2019
2020         new_sk = new_sock->sk;
2021         new_tsock = tipc_sk(new_sk);
2022         msg = buf_msg(buf);
2023
2024         /* we lock on new_sk; but lockdep sees the lock on sk */
2025         lock_sock_nested(new_sk, SINGLE_DEPTH_NESTING);
2026
2027         /*
2028          * Reject any stray messages received by new socket
2029          * before the socket lock was taken (very, very unlikely)
2030          */
2031         tsk_rej_rx_queue(new_sk);
2032
2033         /* Connect new socket to it's peer */
2034         tipc_sk_finish_conn(new_tsock, msg_origport(msg), msg_orignode(msg));
2035         new_sock->state = SS_CONNECTED;
2036
2037         tsk_set_importance(new_tsock, msg_importance(msg));
2038         if (msg_named(msg)) {
2039                 new_tsock->conn_type = msg_nametype(msg);
2040                 new_tsock->conn_instance = msg_nameinst(msg);
2041         }
2042
2043         /*
2044          * Respond to 'SYN-' by discarding it & returning 'ACK'-.
2045          * Respond to 'SYN+' by queuing it on new socket.
2046          */
2047         if (!msg_data_sz(msg)) {
2048                 struct msghdr m = {NULL,};
2049
2050                 tsk_advance_rx_queue(sk);
2051                 __tipc_send_stream(new_sock, &m, 0);
2052         } else {
2053                 __skb_dequeue(&sk->sk_receive_queue);
2054                 __skb_queue_head(&new_sk->sk_receive_queue, buf);
2055                 skb_set_owner_r(buf, new_sk);
2056         }
2057         release_sock(new_sk);
2058 exit:
2059         release_sock(sk);
2060         return res;
2061 }
2062
2063 /**
2064  * tipc_shutdown - shutdown socket connection
2065  * @sock: socket structure
2066  * @how: direction to close (must be SHUT_RDWR)
2067  *
2068  * Terminates connection (if necessary), then purges socket's receive queue.
2069  *
2070  * Returns 0 on success, errno otherwise
2071  */
2072 static int tipc_shutdown(struct socket *sock, int how)
2073 {
2074         struct sock *sk = sock->sk;
2075         struct net *net = sock_net(sk);
2076         struct tipc_sock *tsk = tipc_sk(sk);
2077         struct sk_buff *skb;
2078         u32 dnode = tsk_peer_node(tsk);
2079         u32 dport = tsk_peer_port(tsk);
2080         u32 onode = tipc_own_addr(net);
2081         u32 oport = tsk->portid;
2082         int res;
2083
2084         if (how != SHUT_RDWR)
2085                 return -EINVAL;
2086
2087         lock_sock(sk);
2088
2089         switch (sock->state) {
2090         case SS_CONNECTING:
2091         case SS_CONNECTED:
2092
2093 restart:
2094                 dnode = tsk_peer_node(tsk);
2095
2096                 /* Disconnect and send a 'FIN+' or 'FIN-' message to peer */
2097                 skb = __skb_dequeue(&sk->sk_receive_queue);
2098                 if (skb) {
2099                         if (TIPC_SKB_CB(skb)->handle != NULL) {
2100                                 kfree_skb(skb);
2101                                 goto restart;
2102                         }
2103                         tipc_sk_respond(sk, skb, TIPC_CONN_SHUTDOWN);
2104                 } else {
2105                         skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE,
2106                                               TIPC_CONN_MSG, SHORT_H_SIZE,
2107                                               0, dnode, onode, dport, oport,
2108                                               TIPC_CONN_SHUTDOWN);
2109                         tipc_node_xmit_skb(net, skb, dnode, tsk->portid);
2110                 }
2111                 tsk->connected = 0;
2112                 sock->state = SS_DISCONNECTING;
2113                 tipc_node_remove_conn(net, dnode, tsk->portid);
2114                 /* fall through */
2115
2116         case SS_DISCONNECTING:
2117
2118                 /* Discard any unreceived messages */
2119                 __skb_queue_purge(&sk->sk_receive_queue);
2120
2121                 /* Wake up anyone sleeping in poll */
2122                 sk->sk_state_change(sk);
2123                 res = 0;
2124                 break;
2125
2126         default:
2127                 res = -ENOTCONN;
2128         }
2129
2130         release_sock(sk);
2131         return res;
2132 }
2133
2134 static void tipc_sk_timeout(unsigned long data)
2135 {
2136         struct tipc_sock *tsk = (struct tipc_sock *)data;
2137         struct sock *sk = &tsk->sk;
2138         struct sk_buff *skb = NULL;
2139         u32 peer_port, peer_node;
2140         u32 own_node = tsk_own_node(tsk);
2141
2142         bh_lock_sock(sk);
2143         if (!tsk->connected) {
2144                 bh_unlock_sock(sk);
2145                 goto exit;
2146         }
2147         peer_port = tsk_peer_port(tsk);
2148         peer_node = tsk_peer_node(tsk);
2149
2150         if (tsk->probing_state == TIPC_CONN_PROBING) {
2151                 if (!sock_owned_by_user(sk)) {
2152                         sk->sk_socket->state = SS_DISCONNECTING;
2153                         tsk->connected = 0;
2154                         tipc_node_remove_conn(sock_net(sk), tsk_peer_node(tsk),
2155                                               tsk_peer_port(tsk));
2156                         sk->sk_state_change(sk);
2157                 } else {
2158                         /* Try again later */
2159                         sk_reset_timer(sk, &sk->sk_timer, (HZ / 20));
2160                 }
2161
2162         } else {
2163                 skb = tipc_msg_create(CONN_MANAGER, CONN_PROBE,
2164                                       INT_H_SIZE, 0, peer_node, own_node,
2165                                       peer_port, tsk->portid, TIPC_OK);
2166                 tsk->probing_state = TIPC_CONN_PROBING;
2167                 sk_reset_timer(sk, &sk->sk_timer, jiffies + tsk->probing_intv);
2168         }
2169         bh_unlock_sock(sk);
2170         if (skb)
2171                 tipc_node_xmit_skb(sock_net(sk), skb, peer_node, tsk->portid);
2172 exit:
2173         sock_put(sk);
2174 }
2175
2176 static int tipc_sk_publish(struct tipc_sock *tsk, uint scope,
2177                            struct tipc_name_seq const *seq)
2178 {
2179         struct net *net = sock_net(&tsk->sk);
2180         struct publication *publ;
2181         u32 key;
2182
2183         if (tsk->connected)
2184                 return -EINVAL;
2185         key = tsk->portid + tsk->pub_count + 1;
2186         if (key == tsk->portid)
2187                 return -EADDRINUSE;
2188
2189         publ = tipc_nametbl_publish(net, seq->type, seq->lower, seq->upper,
2190                                     scope, tsk->portid, key);
2191         if (unlikely(!publ))
2192                 return -EINVAL;
2193
2194         list_add(&publ->pport_list, &tsk->publications);
2195         tsk->pub_count++;
2196         tsk->published = 1;
2197         return 0;
2198 }
2199
2200 static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope,
2201                             struct tipc_name_seq const *seq)
2202 {
2203         struct net *net = sock_net(&tsk->sk);
2204         struct publication *publ;
2205         struct publication *safe;
2206         int rc = -EINVAL;
2207
2208         list_for_each_entry_safe(publ, safe, &tsk->publications, pport_list) {
2209                 if (seq) {
2210                         if (publ->scope != scope)
2211                                 continue;
2212                         if (publ->type != seq->type)
2213                                 continue;
2214                         if (publ->lower != seq->lower)
2215                                 continue;
2216                         if (publ->upper != seq->upper)
2217                                 break;
2218                         tipc_nametbl_withdraw(net, publ->type, publ->lower,
2219                                               publ->ref, publ->key);
2220                         rc = 0;
2221                         break;
2222                 }
2223                 tipc_nametbl_withdraw(net, publ->type, publ->lower,
2224                                       publ->ref, publ->key);
2225                 rc = 0;
2226         }
2227         if (list_empty(&tsk->publications))
2228                 tsk->published = 0;
2229         return rc;
2230 }
2231
2232 /* tipc_sk_reinit: set non-zero address in all existing sockets
2233  *                 when we go from standalone to network mode.
2234  */
2235 void tipc_sk_reinit(struct net *net)
2236 {
2237         struct tipc_net *tn = net_generic(net, tipc_net_id);
2238         const struct bucket_table *tbl;
2239         struct rhash_head *pos;
2240         struct tipc_sock *tsk;
2241         struct tipc_msg *msg;
2242         int i;
2243
2244         rcu_read_lock();
2245         tbl = rht_dereference_rcu((&tn->sk_rht)->tbl, &tn->sk_rht);
2246         for (i = 0; i < tbl->size; i++) {
2247                 rht_for_each_entry_rcu(tsk, pos, tbl, i, node) {
2248                         spin_lock_bh(&tsk->sk.sk_lock.slock);
2249                         msg = &tsk->phdr;
2250                         msg_set_prevnode(msg, tn->own_addr);
2251                         msg_set_orignode(msg, tn->own_addr);
2252                         spin_unlock_bh(&tsk->sk.sk_lock.slock);
2253                 }
2254         }
2255         rcu_read_unlock();
2256 }
2257
2258 static struct tipc_sock *tipc_sk_lookup(struct net *net, u32 portid)
2259 {
2260         struct tipc_net *tn = net_generic(net, tipc_net_id);
2261         struct tipc_sock *tsk;
2262
2263         rcu_read_lock();
2264         tsk = rhashtable_lookup_fast(&tn->sk_rht, &portid, tsk_rht_params);
2265         if (tsk)
2266                 sock_hold(&tsk->sk);
2267         rcu_read_unlock();
2268
2269         return tsk;
2270 }
2271
2272 static int tipc_sk_insert(struct tipc_sock *tsk)
2273 {
2274         struct sock *sk = &tsk->sk;
2275         struct net *net = sock_net(sk);
2276         struct tipc_net *tn = net_generic(net, tipc_net_id);
2277         u32 remaining = (TIPC_MAX_PORT - TIPC_MIN_PORT) + 1;
2278         u32 portid = prandom_u32() % remaining + TIPC_MIN_PORT;
2279
2280         while (remaining--) {
2281                 portid++;
2282                 if ((portid < TIPC_MIN_PORT) || (portid > TIPC_MAX_PORT))
2283                         portid = TIPC_MIN_PORT;
2284                 tsk->portid = portid;
2285                 sock_hold(&tsk->sk);
2286                 if (!rhashtable_lookup_insert_fast(&tn->sk_rht, &tsk->node,
2287                                                    tsk_rht_params))
2288                         return 0;
2289                 sock_put(&tsk->sk);
2290         }
2291
2292         return -1;
2293 }
2294
2295 static void tipc_sk_remove(struct tipc_sock *tsk)
2296 {
2297         struct sock *sk = &tsk->sk;
2298         struct tipc_net *tn = net_generic(sock_net(sk), tipc_net_id);
2299
2300         if (!rhashtable_remove_fast(&tn->sk_rht, &tsk->node, tsk_rht_params)) {
2301                 WARN_ON(atomic_read(&sk->sk_refcnt) == 1);
2302                 __sock_put(sk);
2303         }
2304 }
2305
2306 static const struct rhashtable_params tsk_rht_params = {
2307         .nelem_hint = 192,
2308         .head_offset = offsetof(struct tipc_sock, node),
2309         .key_offset = offsetof(struct tipc_sock, portid),
2310         .key_len = sizeof(u32), /* portid */
2311         .max_size = 1048576,
2312         .min_size = 256,
2313         .automatic_shrinking = true,
2314 };
2315
2316 int tipc_sk_rht_init(struct net *net)
2317 {
2318         struct tipc_net *tn = net_generic(net, tipc_net_id);
2319
2320         return rhashtable_init(&tn->sk_rht, &tsk_rht_params);
2321 }
2322
2323 void tipc_sk_rht_destroy(struct net *net)
2324 {
2325         struct tipc_net *tn = net_generic(net, tipc_net_id);
2326
2327         /* Wait for socket readers to complete */
2328         synchronize_net();
2329
2330         rhashtable_destroy(&tn->sk_rht);
2331 }
2332
2333 /**
2334  * tipc_setsockopt - set socket option
2335  * @sock: socket structure
2336  * @lvl: option level
2337  * @opt: option identifier
2338  * @ov: pointer to new option value
2339  * @ol: length of option value
2340  *
2341  * For stream sockets only, accepts and ignores all IPPROTO_TCP options
2342  * (to ease compatibility).
2343  *
2344  * Returns 0 on success, errno otherwise
2345  */
2346 static int tipc_setsockopt(struct socket *sock, int lvl, int opt,
2347                            char __user *ov, unsigned int ol)
2348 {
2349         struct sock *sk = sock->sk;
2350         struct tipc_sock *tsk = tipc_sk(sk);
2351         u32 value;
2352         int res;
2353
2354         if ((lvl == IPPROTO_TCP) && (sock->type == SOCK_STREAM))
2355                 return 0;
2356         if (lvl != SOL_TIPC)
2357                 return -ENOPROTOOPT;
2358         if (ol < sizeof(value))
2359                 return -EINVAL;
2360         res = get_user(value, (u32 __user *)ov);
2361         if (res)
2362                 return res;
2363
2364         lock_sock(sk);
2365
2366         switch (opt) {
2367         case TIPC_IMPORTANCE:
2368                 res = tsk_set_importance(tsk, value);
2369                 break;
2370         case TIPC_SRC_DROPPABLE:
2371                 if (sock->type != SOCK_STREAM)
2372                         tsk_set_unreliable(tsk, value);
2373                 else
2374                         res = -ENOPROTOOPT;
2375                 break;
2376         case TIPC_DEST_DROPPABLE:
2377                 tsk_set_unreturnable(tsk, value);
2378                 break;
2379         case TIPC_CONN_TIMEOUT:
2380                 tipc_sk(sk)->conn_timeout = value;
2381                 /* no need to set "res", since already 0 at this point */
2382                 break;
2383         default:
2384                 res = -EINVAL;
2385         }
2386
2387         release_sock(sk);
2388
2389         return res;
2390 }
2391
2392 /**
2393  * tipc_getsockopt - get socket option
2394  * @sock: socket structure
2395  * @lvl: option level
2396  * @opt: option identifier
2397  * @ov: receptacle for option value
2398  * @ol: receptacle for length of option value
2399  *
2400  * For stream sockets only, returns 0 length result for all IPPROTO_TCP options
2401  * (to ease compatibility).
2402  *
2403  * Returns 0 on success, errno otherwise
2404  */
2405 static int tipc_getsockopt(struct socket *sock, int lvl, int opt,
2406                            char __user *ov, int __user *ol)
2407 {
2408         struct sock *sk = sock->sk;
2409         struct tipc_sock *tsk = tipc_sk(sk);
2410         int len;
2411         u32 value;
2412         int res;
2413
2414         if ((lvl == IPPROTO_TCP) && (sock->type == SOCK_STREAM))
2415                 return put_user(0, ol);
2416         if (lvl != SOL_TIPC)
2417                 return -ENOPROTOOPT;
2418         res = get_user(len, ol);
2419         if (res)
2420                 return res;
2421
2422         lock_sock(sk);
2423
2424         switch (opt) {
2425         case TIPC_IMPORTANCE:
2426                 value = tsk_importance(tsk);
2427                 break;
2428         case TIPC_SRC_DROPPABLE:
2429                 value = tsk_unreliable(tsk);
2430                 break;
2431         case TIPC_DEST_DROPPABLE:
2432                 value = tsk_unreturnable(tsk);
2433                 break;
2434         case TIPC_CONN_TIMEOUT:
2435                 value = tsk->conn_timeout;
2436                 /* no need to set "res", since already 0 at this point */
2437                 break;
2438         case TIPC_NODE_RECVQ_DEPTH:
2439                 value = 0; /* was tipc_queue_size, now obsolete */
2440                 break;
2441         case TIPC_SOCK_RECVQ_DEPTH:
2442                 value = skb_queue_len(&sk->sk_receive_queue);
2443                 break;
2444         default:
2445                 res = -EINVAL;
2446         }
2447
2448         release_sock(sk);
2449
2450         if (res)
2451                 return res;     /* "get" failed */
2452
2453         if (len < sizeof(value))
2454                 return -EINVAL;
2455
2456         if (copy_to_user(ov, &value, sizeof(value)))
2457                 return -EFAULT;
2458
2459         return put_user(sizeof(value), ol);
2460 }
2461
2462 static int tipc_ioctl(struct socket *sock, unsigned int cmd, unsigned long arg)
2463 {
2464         struct sock *sk = sock->sk;
2465         struct tipc_sioc_ln_req lnr;
2466         void __user *argp = (void __user *)arg;
2467
2468         switch (cmd) {
2469         case SIOCGETLINKNAME:
2470                 if (copy_from_user(&lnr, argp, sizeof(lnr)))
2471                         return -EFAULT;
2472                 if (!tipc_node_get_linkname(sock_net(sk),
2473                                             lnr.bearer_id & 0xffff, lnr.peer,
2474                                             lnr.linkname, TIPC_MAX_LINK_NAME)) {
2475                         if (copy_to_user(argp, &lnr, sizeof(lnr)))
2476                                 return -EFAULT;
2477                         return 0;
2478                 }
2479                 return -EADDRNOTAVAIL;
2480         default:
2481                 return -ENOIOCTLCMD;
2482         }
2483 }
2484
2485 /* Protocol switches for the various types of TIPC sockets */
2486
2487 static const struct proto_ops msg_ops = {
2488         .owner          = THIS_MODULE,
2489         .family         = AF_TIPC,
2490         .release        = tipc_release,
2491         .bind           = tipc_bind,
2492         .connect        = tipc_connect,
2493         .socketpair     = sock_no_socketpair,
2494         .accept         = sock_no_accept,
2495         .getname        = tipc_getname,
2496         .poll           = tipc_poll,
2497         .ioctl          = tipc_ioctl,
2498         .listen         = sock_no_listen,
2499         .shutdown       = tipc_shutdown,
2500         .setsockopt     = tipc_setsockopt,
2501         .getsockopt     = tipc_getsockopt,
2502         .sendmsg        = tipc_sendmsg,
2503         .recvmsg        = tipc_recvmsg,
2504         .mmap           = sock_no_mmap,
2505         .sendpage       = sock_no_sendpage
2506 };
2507
2508 static const struct proto_ops packet_ops = {
2509         .owner          = THIS_MODULE,
2510         .family         = AF_TIPC,
2511         .release        = tipc_release,
2512         .bind           = tipc_bind,
2513         .connect        = tipc_connect,
2514         .socketpair     = sock_no_socketpair,
2515         .accept         = tipc_accept,
2516         .getname        = tipc_getname,
2517         .poll           = tipc_poll,
2518         .ioctl          = tipc_ioctl,
2519         .listen         = tipc_listen,
2520         .shutdown       = tipc_shutdown,
2521         .setsockopt     = tipc_setsockopt,
2522         .getsockopt     = tipc_getsockopt,
2523         .sendmsg        = tipc_send_packet,
2524         .recvmsg        = tipc_recvmsg,
2525         .mmap           = sock_no_mmap,
2526         .sendpage       = sock_no_sendpage
2527 };
2528
2529 static const struct proto_ops stream_ops = {
2530         .owner          = THIS_MODULE,
2531         .family         = AF_TIPC,
2532         .release        = tipc_release,
2533         .bind           = tipc_bind,
2534         .connect        = tipc_connect,
2535         .socketpair     = sock_no_socketpair,
2536         .accept         = tipc_accept,
2537         .getname        = tipc_getname,
2538         .poll           = tipc_poll,
2539         .ioctl          = tipc_ioctl,
2540         .listen         = tipc_listen,
2541         .shutdown       = tipc_shutdown,
2542         .setsockopt     = tipc_setsockopt,
2543         .getsockopt     = tipc_getsockopt,
2544         .sendmsg        = tipc_send_stream,
2545         .recvmsg        = tipc_recv_stream,
2546         .mmap           = sock_no_mmap,
2547         .sendpage       = sock_no_sendpage
2548 };
2549
2550 static const struct net_proto_family tipc_family_ops = {
2551         .owner          = THIS_MODULE,
2552         .family         = AF_TIPC,
2553         .create         = tipc_sk_create
2554 };
2555
2556 static struct proto tipc_proto = {
2557         .name           = "TIPC",
2558         .owner          = THIS_MODULE,
2559         .obj_size       = sizeof(struct tipc_sock),
2560         .sysctl_rmem    = sysctl_tipc_rmem
2561 };
2562
2563 /**
2564  * tipc_socket_init - initialize TIPC socket interface
2565  *
2566  * Returns 0 on success, errno otherwise
2567  */
2568 int tipc_socket_init(void)
2569 {
2570         int res;
2571
2572         res = proto_register(&tipc_proto, 1);
2573         if (res) {
2574                 pr_err("Failed to register TIPC protocol type\n");
2575                 goto out;
2576         }
2577
2578         res = sock_register(&tipc_family_ops);
2579         if (res) {
2580                 pr_err("Failed to register TIPC socket type\n");
2581                 proto_unregister(&tipc_proto);
2582                 goto out;
2583         }
2584  out:
2585         return res;
2586 }
2587
2588 /**
2589  * tipc_socket_stop - stop TIPC socket interface
2590  */
2591 void tipc_socket_stop(void)
2592 {
2593         sock_unregister(tipc_family_ops.family);
2594         proto_unregister(&tipc_proto);
2595 }
2596
2597 /* Caller should hold socket lock for the passed tipc socket. */
2598 static int __tipc_nl_add_sk_con(struct sk_buff *skb, struct tipc_sock *tsk)
2599 {
2600         u32 peer_node;
2601         u32 peer_port;
2602         struct nlattr *nest;
2603
2604         peer_node = tsk_peer_node(tsk);
2605         peer_port = tsk_peer_port(tsk);
2606
2607         nest = nla_nest_start(skb, TIPC_NLA_SOCK_CON);
2608
2609         if (nla_put_u32(skb, TIPC_NLA_CON_NODE, peer_node))
2610                 goto msg_full;
2611         if (nla_put_u32(skb, TIPC_NLA_CON_SOCK, peer_port))
2612                 goto msg_full;
2613
2614         if (tsk->conn_type != 0) {
2615                 if (nla_put_flag(skb, TIPC_NLA_CON_FLAG))
2616                         goto msg_full;
2617                 if (nla_put_u32(skb, TIPC_NLA_CON_TYPE, tsk->conn_type))
2618                         goto msg_full;
2619                 if (nla_put_u32(skb, TIPC_NLA_CON_INST, tsk->conn_instance))
2620                         goto msg_full;
2621         }
2622         nla_nest_end(skb, nest);
2623
2624         return 0;
2625
2626 msg_full:
2627         nla_nest_cancel(skb, nest);
2628
2629         return -EMSGSIZE;
2630 }
2631
2632 /* Caller should hold socket lock for the passed tipc socket. */
2633 static int __tipc_nl_add_sk(struct sk_buff *skb, struct netlink_callback *cb,
2634                             struct tipc_sock *tsk)
2635 {
2636         int err;
2637         void *hdr;
2638         struct nlattr *attrs;
2639         struct net *net = sock_net(skb->sk);
2640         struct tipc_net *tn = net_generic(net, tipc_net_id);
2641
2642         hdr = genlmsg_put(skb, NETLINK_CB(cb->skb).portid, cb->nlh->nlmsg_seq,
2643                           &tipc_genl_family, NLM_F_MULTI, TIPC_NL_SOCK_GET);
2644         if (!hdr)
2645                 goto msg_cancel;
2646
2647         attrs = nla_nest_start(skb, TIPC_NLA_SOCK);
2648         if (!attrs)
2649                 goto genlmsg_cancel;
2650         if (nla_put_u32(skb, TIPC_NLA_SOCK_REF, tsk->portid))
2651                 goto attr_msg_cancel;
2652         if (nla_put_u32(skb, TIPC_NLA_SOCK_ADDR, tn->own_addr))
2653                 goto attr_msg_cancel;
2654
2655         if (tsk->connected) {
2656                 err = __tipc_nl_add_sk_con(skb, tsk);
2657                 if (err)
2658                         goto attr_msg_cancel;
2659         } else if (!list_empty(&tsk->publications)) {
2660                 if (nla_put_flag(skb, TIPC_NLA_SOCK_HAS_PUBL))
2661                         goto attr_msg_cancel;
2662         }
2663         nla_nest_end(skb, attrs);
2664         genlmsg_end(skb, hdr);
2665
2666         return 0;
2667
2668 attr_msg_cancel:
2669         nla_nest_cancel(skb, attrs);
2670 genlmsg_cancel:
2671         genlmsg_cancel(skb, hdr);
2672 msg_cancel:
2673         return -EMSGSIZE;
2674 }
2675
2676 int tipc_nl_sk_dump(struct sk_buff *skb, struct netlink_callback *cb)
2677 {
2678         int err;
2679         struct tipc_sock *tsk;
2680         const struct bucket_table *tbl;
2681         struct rhash_head *pos;
2682         struct net *net = sock_net(skb->sk);
2683         struct tipc_net *tn = net_generic(net, tipc_net_id);
2684         u32 tbl_id = cb->args[0];
2685         u32 prev_portid = cb->args[1];
2686
2687         rcu_read_lock();
2688         tbl = rht_dereference_rcu((&tn->sk_rht)->tbl, &tn->sk_rht);
2689         for (; tbl_id < tbl->size; tbl_id++) {
2690                 rht_for_each_entry_rcu(tsk, pos, tbl, tbl_id, node) {
2691                         spin_lock_bh(&tsk->sk.sk_lock.slock);
2692                         if (prev_portid && prev_portid != tsk->portid) {
2693                                 spin_unlock_bh(&tsk->sk.sk_lock.slock);
2694                                 continue;
2695                         }
2696
2697                         err = __tipc_nl_add_sk(skb, cb, tsk);
2698                         if (err) {
2699                                 prev_portid = tsk->portid;
2700                                 spin_unlock_bh(&tsk->sk.sk_lock.slock);
2701                                 goto out;
2702                         }
2703                         prev_portid = 0;
2704                         spin_unlock_bh(&tsk->sk.sk_lock.slock);
2705                 }
2706         }
2707 out:
2708         rcu_read_unlock();
2709         cb->args[0] = tbl_id;
2710         cb->args[1] = prev_portid;
2711
2712         return skb->len;
2713 }
2714
2715 /* Caller should hold socket lock for the passed tipc socket. */
2716 static int __tipc_nl_add_sk_publ(struct sk_buff *skb,
2717                                  struct netlink_callback *cb,
2718                                  struct publication *publ)
2719 {
2720         void *hdr;
2721         struct nlattr *attrs;
2722
2723         hdr = genlmsg_put(skb, NETLINK_CB(cb->skb).portid, cb->nlh->nlmsg_seq,
2724                           &tipc_genl_family, NLM_F_MULTI, TIPC_NL_PUBL_GET);
2725         if (!hdr)
2726                 goto msg_cancel;
2727
2728         attrs = nla_nest_start(skb, TIPC_NLA_PUBL);
2729         if (!attrs)
2730                 goto genlmsg_cancel;
2731
2732         if (nla_put_u32(skb, TIPC_NLA_PUBL_KEY, publ->key))
2733                 goto attr_msg_cancel;
2734         if (nla_put_u32(skb, TIPC_NLA_PUBL_TYPE, publ->type))
2735                 goto attr_msg_cancel;
2736         if (nla_put_u32(skb, TIPC_NLA_PUBL_LOWER, publ->lower))
2737                 goto attr_msg_cancel;
2738         if (nla_put_u32(skb, TIPC_NLA_PUBL_UPPER, publ->upper))
2739                 goto attr_msg_cancel;
2740
2741         nla_nest_end(skb, attrs);
2742         genlmsg_end(skb, hdr);
2743
2744         return 0;
2745
2746 attr_msg_cancel:
2747         nla_nest_cancel(skb, attrs);
2748 genlmsg_cancel:
2749         genlmsg_cancel(skb, hdr);
2750 msg_cancel:
2751         return -EMSGSIZE;
2752 }
2753
2754 /* Caller should hold socket lock for the passed tipc socket. */
2755 static int __tipc_nl_list_sk_publ(struct sk_buff *skb,
2756                                   struct netlink_callback *cb,
2757                                   struct tipc_sock *tsk, u32 *last_publ)
2758 {
2759         int err;
2760         struct publication *p;
2761
2762         if (*last_publ) {
2763                 list_for_each_entry(p, &tsk->publications, pport_list) {
2764                         if (p->key == *last_publ)
2765                                 break;
2766                 }
2767                 if (p->key != *last_publ) {
2768                         /* We never set seq or call nl_dump_check_consistent()
2769                          * this means that setting prev_seq here will cause the
2770                          * consistence check to fail in the netlink callback
2771                          * handler. Resulting in the last NLMSG_DONE message
2772                          * having the NLM_F_DUMP_INTR flag set.
2773                          */
2774                         cb->prev_seq = 1;
2775                         *last_publ = 0;
2776                         return -EPIPE;
2777                 }
2778         } else {
2779                 p = list_first_entry(&tsk->publications, struct publication,
2780                                      pport_list);
2781         }
2782
2783         list_for_each_entry_from(p, &tsk->publications, pport_list) {
2784                 err = __tipc_nl_add_sk_publ(skb, cb, p);
2785                 if (err) {
2786                         *last_publ = p->key;
2787                         return err;
2788                 }
2789         }
2790         *last_publ = 0;
2791
2792         return 0;
2793 }
2794
2795 int tipc_nl_publ_dump(struct sk_buff *skb, struct netlink_callback *cb)
2796 {
2797         int err;
2798         u32 tsk_portid = cb->args[0];
2799         u32 last_publ = cb->args[1];
2800         u32 done = cb->args[2];
2801         struct net *net = sock_net(skb->sk);
2802         struct tipc_sock *tsk;
2803
2804         if (!tsk_portid) {
2805                 struct nlattr **attrs;
2806                 struct nlattr *sock[TIPC_NLA_SOCK_MAX + 1];
2807
2808                 err = tipc_nlmsg_parse(cb->nlh, &attrs);
2809                 if (err)
2810                         return err;
2811
2812                 err = nla_parse_nested(sock, TIPC_NLA_SOCK_MAX,
2813                                        attrs[TIPC_NLA_SOCK],
2814                                        tipc_nl_sock_policy);
2815                 if (err)
2816                         return err;
2817
2818                 if (!sock[TIPC_NLA_SOCK_REF])
2819                         return -EINVAL;
2820
2821                 tsk_portid = nla_get_u32(sock[TIPC_NLA_SOCK_REF]);
2822         }
2823
2824         if (done)
2825                 return 0;
2826
2827         tsk = tipc_sk_lookup(net, tsk_portid);
2828         if (!tsk)
2829                 return -EINVAL;
2830
2831         lock_sock(&tsk->sk);
2832         err = __tipc_nl_list_sk_publ(skb, cb, tsk, &last_publ);
2833         if (!err)
2834                 done = 1;
2835         release_sock(&tsk->sk);
2836         sock_put(&tsk->sk);
2837
2838         cb->args[0] = tsk_portid;
2839         cb->args[1] = last_publ;
2840         cb->args[2] = done;
2841
2842         return skb->len;
2843 }