These changes are the raw update to linux-4.4.6-rt14. Kernel sources
[kvmfornfv.git] / kernel / net / tipc / socket.c
index 20cc6df..b53246f 100644 (file)
@@ -41,6 +41,7 @@
 #include "link.h"
 #include "name_distr.h"
 #include "socket.h"
+#include "bcast.h"
 
 #define SS_LISTENING           -1      /* socket is listening */
 #define SS_READY               -2      /* socket is connectionless */
@@ -104,6 +105,7 @@ struct tipc_sock {
 static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb);
 static void tipc_data_ready(struct sock *sk);
 static void tipc_write_space(struct sock *sk);
+static void tipc_sock_destruct(struct sock *sk);
 static int tipc_release(struct socket *sock);
 static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags);
 static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p);
@@ -247,6 +249,22 @@ static void tsk_advance_rx_queue(struct sock *sk)
        kfree_skb(__skb_dequeue(&sk->sk_receive_queue));
 }
 
+/* tipc_sk_respond() : send response message back to sender
+ */
+static void tipc_sk_respond(struct sock *sk, struct sk_buff *skb, int err)
+{
+       u32 selector;
+       u32 dnode;
+       u32 onode = tipc_own_addr(sock_net(sk));
+
+       if (!tipc_msg_reverse(onode, &skb, err))
+               return;
+
+       dnode = msg_destnode(buf_msg(skb));
+       selector = msg_origport(buf_msg(skb));
+       tipc_node_xmit_skb(sock_net(sk), skb, dnode, selector);
+}
+
 /**
  * tsk_rej_rx_queue - reject all buffers in socket receive queue
  *
@@ -255,13 +273,9 @@ static void tsk_advance_rx_queue(struct sock *sk)
 static void tsk_rej_rx_queue(struct sock *sk)
 {
        struct sk_buff *skb;
-       u32 dnode;
-       u32 own_node = tsk_own_node(tipc_sk(sk));
 
-       while ((skb = __skb_dequeue(&sk->sk_receive_queue))) {
-               if (tipc_msg_reverse(own_node, skb, &dnode, TIPC_ERR_NO_PORT))
-                       tipc_link_xmit_skb(sock_net(sk), skb, dnode, 0);
-       }
+       while ((skb = __skb_dequeue(&sk->sk_receive_queue)))
+               tipc_sk_respond(sk, skb, TIPC_ERR_NO_PORT);
 }
 
 /* tsk_peer_msg - verify if message was sent by connected port's peer
@@ -342,7 +356,7 @@ static int tipc_sk_create(struct net *net, struct socket *sock,
        }
 
        /* Allocate socket's protocol area */
-       sk = sk_alloc(net, AF_TIPC, GFP_KERNEL, &tipc_proto);
+       sk = sk_alloc(net, AF_TIPC, GFP_KERNEL, &tipc_proto, kern);
        if (sk == NULL)
                return -ENOMEM;
 
@@ -368,6 +382,7 @@ static int tipc_sk_create(struct net *net, struct socket *sock,
        sk->sk_rcvbuf = sysctl_tipc_rmem[1];
        sk->sk_data_ready = tipc_data_ready;
        sk->sk_write_space = tipc_write_space;
+       sk->sk_destruct = tipc_sock_destruct;
        tsk->conn_timeout = CONN_TIMEOUT_DEFAULT;
        tsk->sent_unacked = 0;
        atomic_set(&tsk->dupl_rcvcnt, 0);
@@ -409,7 +424,7 @@ static int tipc_release(struct socket *sock)
        struct net *net;
        struct tipc_sock *tsk;
        struct sk_buff *skb;
-       u32 dnode, probing_state;
+       u32 dnode;
 
        /*
         * Exit if socket isn't fully initialized (occurs when a failed accept()
@@ -440,17 +455,12 @@ static int tipc_release(struct socket *sock)
                                tsk->connected = 0;
                                tipc_node_remove_conn(net, dnode, tsk->portid);
                        }
-                       if (tipc_msg_reverse(tsk_own_node(tsk), skb, &dnode,
-                                            TIPC_ERR_NO_PORT))
-                               tipc_link_xmit_skb(net, skb, dnode, 0);
+                       tipc_sk_respond(sk, skb, TIPC_ERR_NO_PORT);
                }
        }
 
        tipc_sk_withdraw(tsk, 0, NULL);
-       probing_state = tsk->probing_state;
-       if (del_timer_sync(&sk->sk_timer) &&
-           probing_state != TIPC_CONN_PROBING)
-               sock_put(sk);
+       sk_stop_timer(sk, &sk->sk_timer);
        tipc_sk_remove(tsk);
        if (tsk->connected) {
                skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE,
@@ -458,13 +468,10 @@ static int tipc_release(struct socket *sock)
                                      tsk_own_node(tsk), tsk_peer_port(tsk),
                                      tsk->portid, TIPC_ERR_NO_PORT);
                if (skb)
-                       tipc_link_xmit_skb(net, skb, dnode, tsk->portid);
+                       tipc_node_xmit_skb(net, skb, dnode, tsk->portid);
                tipc_node_remove_conn(net, dnode, tsk->portid);
        }
 
-       /* Discard any remaining (connection-based) messages in receive queue */
-       __skb_queue_purge(&sk->sk_receive_queue);
-
        /* Reject any messages that accumulated in backlog queue */
        sock->state = SS_DISCONNECTING;
        release_sock(sk);
@@ -681,28 +688,29 @@ static int tipc_sendmcast(struct  socket *sock, struct tipc_name_seq *seq,
        msg_set_hdr_sz(mhdr, MCAST_H_SIZE);
 
 new_mtu:
-       mtu = tipc_bclink_get_mtu();
+       mtu = tipc_bcast_get_mtu(net);
        rc = tipc_msg_build(mhdr, msg, 0, dsz, mtu, pktchain);
        if (unlikely(rc < 0))
                return rc;
 
        do {
-               rc = tipc_bclink_xmit(net, pktchain);
-               if (likely(rc >= 0)) {
-                       rc = dsz;
-                       break;
+               rc = tipc_bcast_xmit(net, pktchain);
+               if (likely(!rc))
+                       return dsz;
+
+               if (rc == -ELINKCONG) {
+                       tsk->link_cong = 1;
+                       rc = tipc_wait_for_sndmsg(sock, &timeo);
+                       if (!rc)
+                               continue;
                }
+               __skb_queue_purge(pktchain);
                if (rc == -EMSGSIZE) {
                        msg->msg_iter = save;
                        goto new_mtu;
                }
-               if (rc != -ELINKCONG)
-                       break;
-               tipc_sk(sk)->link_cong = 1;
-               rc = tipc_wait_for_sndmsg(sock, &timeo);
-               if (rc)
-                       __skb_queue_purge(pktchain);
-       } while (!rc);
+               break;
+       } while (1);
        return rc;
 }
 
@@ -765,35 +773,35 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
 /**
  * tipc_sk_proto_rcv - receive a connection mng protocol message
  * @tsk: receiving socket
- * @skb: pointer to message buffer. Set to NULL if buffer is consumed.
+ * @skb: pointer to message buffer.
  */
-static void tipc_sk_proto_rcv(struct tipc_sock *tsk, struct sk_buff **skb)
+static void tipc_sk_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb)
 {
-       struct tipc_msg *msg = buf_msg(*skb);
+       struct sock *sk = &tsk->sk;
+       struct tipc_msg *hdr = buf_msg(skb);
+       int mtyp = msg_type(hdr);
        int conn_cong;
-       u32 dnode;
-       u32 own_node = tsk_own_node(tsk);
+
        /* Ignore if connection cannot be validated: */
-       if (!tsk_peer_msg(tsk, msg))
+       if (!tsk_peer_msg(tsk, hdr))
                goto exit;
 
        tsk->probing_state = TIPC_CONN_OK;
 
-       if (msg_type(msg) == CONN_ACK) {
+       if (mtyp == CONN_PROBE) {
+               msg_set_type(hdr, CONN_PROBE_REPLY);
+               tipc_sk_respond(sk, skb, TIPC_OK);
+               return;
+       } else if (mtyp == CONN_ACK) {
                conn_cong = tsk_conn_cong(tsk);
-               tsk->sent_unacked -= msg_msgcnt(msg);
+               tsk->sent_unacked -= msg_msgcnt(hdr);
                if (conn_cong)
-                       tsk->sk.sk_write_space(&tsk->sk);
-       } else if (msg_type(msg) == CONN_PROBE) {
-               if (tipc_msg_reverse(own_node, *skb, &dnode, TIPC_OK)) {
-                       msg_set_type(msg, CONN_PROBE_REPLY);
-                       return;
-               }
+                       sk->sk_write_space(sk);
+       } else if (mtyp != CONN_PROBE_REPLY) {
+               pr_warn("Received unknown CONN_PROTO msg\n");
        }
-       /* Do nothing if msg_type() == CONN_PROBE_REPLY */
 exit:
-       kfree_skb(*skb);
-       *skb = NULL;
+       kfree_skb(skb);
 }
 
 static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p)
@@ -926,24 +934,25 @@ new_mtu:
        do {
                skb = skb_peek(pktchain);
                TIPC_SKB_CB(skb)->wakeup_pending = tsk->link_cong;
-               rc = tipc_link_xmit(net, pktchain, dnode, tsk->portid);
-               if (likely(rc >= 0)) {
+               rc = tipc_node_xmit(net, pktchain, dnode, tsk->portid);
+               if (likely(!rc)) {
                        if (sock->state != SS_READY)
                                sock->state = SS_CONNECTING;
-                       rc = dsz;
-                       break;
+                       return dsz;
                }
+               if (rc == -ELINKCONG) {
+                       tsk->link_cong = 1;
+                       rc = tipc_wait_for_sndmsg(sock, &timeo);
+                       if (!rc)
+                               continue;
+               }
+               __skb_queue_purge(pktchain);
                if (rc == -EMSGSIZE) {
                        m->msg_iter = save;
                        goto new_mtu;
                }
-               if (rc != -ELINKCONG)
-                       break;
-               tsk->link_cong = 1;
-               rc = tipc_wait_for_sndmsg(sock, &timeo);
-               if (rc)
-                       __skb_queue_purge(pktchain);
-       } while (!rc);
+               break;
+       } while (1);
 
        return rc;
 }
@@ -1045,15 +1054,16 @@ next:
                return rc;
        do {
                if (likely(!tsk_conn_cong(tsk))) {
-                       rc = tipc_link_xmit(net, pktchain, dnode, portid);
+                       rc = tipc_node_xmit(net, pktchain, dnode, portid);
                        if (likely(!rc)) {
                                tsk->sent_unacked++;
                                sent += send;
                                if (sent == dsz)
-                                       break;
+                                       return dsz;
                                goto next;
                        }
                        if (rc == -EMSGSIZE) {
+                               __skb_queue_purge(pktchain);
                                tsk->max_pkt = tipc_node_get_mtu(net, dnode,
                                                                 portid);
                                m->msg_iter = save;
@@ -1061,13 +1071,13 @@ next:
                        }
                        if (rc != -ELINKCONG)
                                break;
+
                        tsk->link_cong = 1;
                }
                rc = tipc_wait_for_sndpkt(sock, &timeo);
-               if (rc)
-                       __skb_queue_purge(pktchain);
        } while (!rc);
 
+       __skb_queue_purge(pktchain);
        return sent ? sent : rc;
 }
 
@@ -1223,7 +1233,7 @@ static void tipc_sk_send_ack(struct tipc_sock *tsk, uint ack)
                return;
        msg = buf_msg(skb);
        msg_set_msgcnt(msg, ack);
-       tipc_link_xmit_skb(net, skb, dnode, msg_link_selector(msg));
+       tipc_node_xmit_skb(net, skb, dnode, msg_link_selector(msg));
 }
 
 static int tipc_wait_for_rcvmsg(struct socket *sock, long *timeop)
@@ -1504,87 +1514,91 @@ static void tipc_data_ready(struct sock *sk)
        rcu_read_unlock();
 }
 
+static void tipc_sock_destruct(struct sock *sk)
+{
+       __skb_queue_purge(&sk->sk_receive_queue);
+}
+
 /**
  * filter_connect - Handle all incoming messages for a connection-based socket
  * @tsk: TIPC socket
  * @skb: pointer to message buffer. Set to NULL if buffer is consumed
  *
- * Returns 0 (TIPC_OK) if everything ok, -TIPC_ERR_NO_PORT otherwise
+ * Returns true if everything ok, false otherwise
  */
-static int filter_connect(struct tipc_sock *tsk, struct sk_buff **skb)
+static bool filter_connect(struct tipc_sock *tsk, struct sk_buff *skb)
 {
        struct sock *sk = &tsk->sk;
        struct net *net = sock_net(sk);
        struct socket *sock = sk->sk_socket;
-       struct tipc_msg *msg = buf_msg(*skb);
-       int retval = -TIPC_ERR_NO_PORT;
+       struct tipc_msg *hdr = buf_msg(skb);
 
-       if (msg_mcast(msg))
-               return retval;
+       if (unlikely(msg_mcast(hdr)))
+               return false;
 
        switch ((int)sock->state) {
        case SS_CONNECTED:
+
                /* Accept only connection-based messages sent by peer */
-               if (tsk_peer_msg(tsk, msg)) {
-                       if (unlikely(msg_errcode(msg))) {
-                               sock->state = SS_DISCONNECTING;
-                               tsk->connected = 0;
-                               /* let timer expire on it's own */
-                               tipc_node_remove_conn(net, tsk_peer_node(tsk),
-                                                     tsk->portid);
-                       }
-                       retval = TIPC_OK;
+               if (unlikely(!tsk_peer_msg(tsk, hdr)))
+                       return false;
+
+               if (unlikely(msg_errcode(hdr))) {
+                       sock->state = SS_DISCONNECTING;
+                       tsk->connected = 0;
+                       /* Let timer expire on it's own */
+                       tipc_node_remove_conn(net, tsk_peer_node(tsk),
+                                             tsk->portid);
                }
-               break;
+               return true;
+
        case SS_CONNECTING:
-               /* Accept only ACK or NACK message */
 
-               if (unlikely(!msg_connected(msg)))
-                       break;
+               /* Accept only ACK or NACK message */
+               if (unlikely(!msg_connected(hdr)))
+                       return false;
 
-               if (unlikely(msg_errcode(msg))) {
+               if (unlikely(msg_errcode(hdr))) {
                        sock->state = SS_DISCONNECTING;
                        sk->sk_err = ECONNREFUSED;
-                       retval = TIPC_OK;
-                       break;
+                       return true;
                }
 
-               if (unlikely(msg_importance(msg) > TIPC_CRITICAL_IMPORTANCE)) {
+               if (unlikely(!msg_isdata(hdr))) {
                        sock->state = SS_DISCONNECTING;
                        sk->sk_err = EINVAL;
-                       retval = TIPC_OK;
-                       break;
+                       return true;
                }
 
-               tipc_sk_finish_conn(tsk, msg_origport(msg), msg_orignode(msg));
-               msg_set_importance(&tsk->phdr, msg_importance(msg));
+               tipc_sk_finish_conn(tsk, msg_origport(hdr), msg_orignode(hdr));
+               msg_set_importance(&tsk->phdr, msg_importance(hdr));
                sock->state = SS_CONNECTED;
 
-               /* If an incoming message is an 'ACK-', it should be
-                * discarded here because it doesn't contain useful
-                * data. In addition, we should try to wake up
-                * connect() routine if sleeping.
-                */
-               if (msg_data_sz(msg) == 0) {
-                       kfree_skb(*skb);
-                       *skb = NULL;
-                       if (waitqueue_active(sk_sleep(sk)))
-                               wake_up_interruptible(sk_sleep(sk));
-               }
-               retval = TIPC_OK;
-               break;
+               /* If 'ACK+' message, add to socket receive queue */
+               if (msg_data_sz(hdr))
+                       return true;
+
+               /* If empty 'ACK-' message, wake up sleeping connect() */
+               if (waitqueue_active(sk_sleep(sk)))
+                       wake_up_interruptible(sk_sleep(sk));
+
+               /* 'ACK-' message is neither accepted nor rejected: */
+               msg_set_dest_droppable(hdr, 1);
+               return false;
+
        case SS_LISTENING:
        case SS_UNCONNECTED:
+
                /* Accept only SYN message */
-               if (!msg_connected(msg) && !(msg_errcode(msg)))
-                       retval = TIPC_OK;
+               if (!msg_connected(hdr) && !(msg_errcode(hdr)))
+                       return true;
                break;
        case SS_DISCONNECTING:
                break;
        default:
                pr_err("Unknown socket state %u\n", sock->state);
        }
-       return retval;
+       return false;
 }
 
 /**
@@ -1619,61 +1633,70 @@ static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *buf)
 /**
  * filter_rcv - validate incoming message
  * @sk: socket
- * @skb: pointer to message. Set to NULL if buffer is consumed.
+ * @skb: pointer to message.
  *
  * Enqueues message on receive queue if acceptable; optionally handles
  * disconnect indication for a connected socket.
  *
  * Called with socket lock already taken
  *
- * Returns 0 (TIPC_OK) if message was ok, -TIPC error code if rejected
+ * Returns true if message was added to socket receive queue, otherwise false
  */
-static int filter_rcv(struct sock *sk, struct sk_buff **skb)
+static bool filter_rcv(struct sock *sk, struct sk_buff *skb)
 {
        struct socket *sock = sk->sk_socket;
        struct tipc_sock *tsk = tipc_sk(sk);
-       struct tipc_msg *msg = buf_msg(*skb);
-       unsigned int limit = rcvbuf_limit(sk, *skb);
-       int rc = TIPC_OK;
+       struct tipc_msg *hdr = buf_msg(skb);
+       unsigned int limit = rcvbuf_limit(sk, skb);
+       int err = TIPC_OK;
+       int usr = msg_user(hdr);
 
-       if (unlikely(msg_user(msg) == CONN_MANAGER)) {
+       if (unlikely(msg_user(hdr) == CONN_MANAGER)) {
                tipc_sk_proto_rcv(tsk, skb);
-               return TIPC_OK;
+               return false;
        }
 
-       if (unlikely(msg_user(msg) == SOCK_WAKEUP)) {
-               kfree_skb(*skb);
+       if (unlikely(usr == SOCK_WAKEUP)) {
+               kfree_skb(skb);
                tsk->link_cong = 0;
                sk->sk_write_space(sk);
-               *skb = NULL;
-               return TIPC_OK;
+               return false;
        }
 
-       /* Reject message if it is wrong sort of message for socket */
-       if (msg_type(msg) > TIPC_DIRECT_MSG)
-               return -TIPC_ERR_NO_PORT;
+       /* Drop if illegal message type */
+       if (unlikely(msg_type(hdr) > TIPC_DIRECT_MSG)) {
+               kfree_skb(skb);
+               return false;
+       }
 
-       if (sock->state == SS_READY) {
-               if (msg_connected(msg))
-                       return -TIPC_ERR_NO_PORT;
-       } else {
-               rc = filter_connect(tsk, skb);
-               if (rc != TIPC_OK || !*skb)
-                       return rc;
+       /* Reject if wrong message type for current socket state */
+       if (unlikely(sock->state == SS_READY)) {
+               if (msg_connected(hdr)) {
+                       err = TIPC_ERR_NO_PORT;
+                       goto reject;
+               }
+       } else if (unlikely(!filter_connect(tsk, skb))) {
+               err = TIPC_ERR_NO_PORT;
+               goto reject;
        }
 
        /* Reject message if there isn't room to queue it */
-       if (sk_rmem_alloc_get(sk) + (*skb)->truesize >= limit)
-               return -TIPC_ERR_OVERLOAD;
+       if (unlikely(sk_rmem_alloc_get(sk) + skb->truesize >= limit)) {
+               err = TIPC_ERR_OVERLOAD;
+               goto reject;
+       }
 
        /* Enqueue message */
-       TIPC_SKB_CB(*skb)->handle = NULL;
-       __skb_queue_tail(&sk->sk_receive_queue, *skb);
-       skb_set_owner_r(*skb, sk);
+       TIPC_SKB_CB(skb)->handle = NULL;
+       __skb_queue_tail(&sk->sk_receive_queue, skb);
+       skb_set_owner_r(skb, sk);
 
        sk->sk_data_ready(sk);
-       *skb = NULL;
-       return TIPC_OK;
+       return true;
+
+reject:
+       tipc_sk_respond(sk, skb, err);
+       return false;
 }
 
 /**
@@ -1687,22 +1710,10 @@ static int filter_rcv(struct sock *sk, struct sk_buff **skb)
  */
 static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb)
 {
-       int err;
-       atomic_t *dcnt;
-       u32 dnode;
-       struct tipc_sock *tsk = tipc_sk(sk);
-       struct net *net = sock_net(sk);
-       uint truesize = skb->truesize;
+       unsigned int truesize = skb->truesize;
 
-       err = filter_rcv(sk, &skb);
-       if (likely(!skb)) {
-               dcnt = &tsk->dupl_rcvcnt;
-               if (atomic_read(dcnt) < TIPC_CONN_OVERLOAD_LIMIT)
-                       atomic_add(truesize, dcnt);
-               return 0;
-       }
-       if (!err || tipc_msg_reverse(tsk_own_node(tsk), skb, &dnode, -err))
-               tipc_link_xmit_skb(net, skb, dnode, tsk->portid);
+       if (likely(filter_rcv(sk, skb)))
+               atomic_add(truesize, &tipc_sk(sk)->dupl_rcvcnt);
        return 0;
 }
 
@@ -1712,45 +1723,43 @@ static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb)
  * @inputq: list of incoming buffers with potentially different destinations
  * @sk: socket where the buffers should be enqueued
  * @dport: port number for the socket
- * @_skb: returned buffer to be forwarded or rejected, if applicable
  *
  * Caller must hold socket lock
- *
- * Returns TIPC_OK if all buffers enqueued, otherwise -TIPC_ERR_OVERLOAD
- * or -TIPC_ERR_NO_PORT
  */
-static int tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk,
-                          u32 dport, struct sk_buff **_skb)
+static void tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk,
+                           u32 dport)
 {
        unsigned int lim;
        atomic_t *dcnt;
-       int err;
        struct sk_buff *skb;
        unsigned long time_limit = jiffies + 2;
 
        while (skb_queue_len(inputq)) {
                if (unlikely(time_after_eq(jiffies, time_limit)))
-                       return TIPC_OK;
+                       return;
+
                skb = tipc_skb_dequeue(inputq, dport);
                if (unlikely(!skb))
-                       return TIPC_OK;
+                       return;
+
+               /* Add message directly to receive queue if possible */
                if (!sock_owned_by_user(sk)) {
-                       err = filter_rcv(sk, &skb);
-                       if (likely(!skb))
-                               continue;
-                       *_skb = skb;
-                       return err;
+                       filter_rcv(sk, skb);
+                       continue;
                }
+
+               /* Try backlog, compensating for double-counted bytes */
                dcnt = &tipc_sk(sk)->dupl_rcvcnt;
                if (sk->sk_backlog.len)
                        atomic_set(dcnt, 0);
                lim = rcvbuf_limit(sk, skb) + atomic_read(dcnt);
                if (likely(!sk_add_backlog(sk, skb, lim)))
                        continue;
-               *_skb = skb;
-               return -TIPC_ERR_OVERLOAD;
+
+               /* Overload => reject message back to sender */
+               tipc_sk_respond(sk, skb, TIPC_ERR_OVERLOAD);
+               break;
        }
-       return TIPC_OK;
 }
 
 /**
@@ -1758,49 +1767,46 @@ static int tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk,
  * @inputq: buffer list containing the buffers
  * Consumes all buffers in list until inputq is empty
  * Note: may be called in multiple threads referring to the same queue
- * Returns 0 if last buffer was accepted, otherwise -EHOSTUNREACH
- * Only node local calls check the return value, sending single-buffer queues
  */
-int tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq)
+void tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq)
 {
        u32 dnode, dport = 0;
        int err;
-       struct sk_buff *skb;
        struct tipc_sock *tsk;
-       struct tipc_net *tn;
        struct sock *sk;
+       struct sk_buff *skb;
 
        while (skb_queue_len(inputq)) {
-               err = -TIPC_ERR_NO_PORT;
-               skb = NULL;
                dport = tipc_skb_peek_port(inputq, dport);
                tsk = tipc_sk_lookup(net, dport);
+
                if (likely(tsk)) {
                        sk = &tsk->sk;
                        if (likely(spin_trylock_bh(&sk->sk_lock.slock))) {
-                               err = tipc_sk_enqueue(inputq, sk, dport, &skb);
+                               tipc_sk_enqueue(inputq, sk, dport);
                                spin_unlock_bh(&sk->sk_lock.slock);
-                               dport = 0;
                        }
                        sock_put(sk);
-               } else {
-                       skb = tipc_skb_dequeue(inputq, dport);
-               }
-               if (likely(!skb))
                        continue;
-               if (tipc_msg_lookup_dest(net, skb, &dnode, &err))
-                       goto xmit;
-               if (!err) {
-                       dnode = msg_destnode(buf_msg(skb));
-                       goto xmit;
                }
-               tn = net_generic(net, tipc_net_id);
-               if (!tipc_msg_reverse(tn->own_addr, skb, &dnode, -err))
+
+               /* No destination socket => dequeue skb if still there */
+               skb = tipc_skb_dequeue(inputq, dport);
+               if (!skb)
+                       return;
+
+               /* Try secondary lookup if unresolved named message */
+               err = TIPC_ERR_NO_PORT;
+               if (tipc_msg_lookup_dest(net, skb, &err))
+                       goto xmit;
+
+               /* Prepare for message rejection */
+               if (!tipc_msg_reverse(tipc_own_addr(net), &skb, err))
                        continue;
 xmit:
-               tipc_link_xmit_skb(net, skb, dnode, dport);
+               dnode = msg_destnode(buf_msg(skb));
+               tipc_node_xmit_skb(net, skb, dnode, dport);
        }
-       return err ? -EHOSTUNREACH : 0;
 }
 
 static int tipc_wait_for_connect(struct socket *sock, long *timeo_p)
@@ -2069,7 +2075,10 @@ static int tipc_shutdown(struct socket *sock, int how)
        struct net *net = sock_net(sk);
        struct tipc_sock *tsk = tipc_sk(sk);
        struct sk_buff *skb;
-       u32 dnode;
+       u32 dnode = tsk_peer_node(tsk);
+       u32 dport = tsk_peer_port(tsk);
+       u32 onode = tipc_own_addr(net);
+       u32 oport = tsk->portid;
        int res;
 
        if (how != SHUT_RDWR)
@@ -2082,6 +2091,8 @@ static int tipc_shutdown(struct socket *sock, int how)
        case SS_CONNECTED:
 
 restart:
+               dnode = tsk_peer_node(tsk);
+
                /* Disconnect and send a 'FIN+' or 'FIN-' message to peer */
                skb = __skb_dequeue(&sk->sk_receive_queue);
                if (skb) {
@@ -2089,19 +2100,13 @@ restart:
                                kfree_skb(skb);
                                goto restart;
                        }
-                       if (tipc_msg_reverse(tsk_own_node(tsk), skb, &dnode,
-                                            TIPC_CONN_SHUTDOWN))
-                               tipc_link_xmit_skb(net, skb, dnode,
-                                                  tsk->portid);
+                       tipc_sk_respond(sk, skb, TIPC_CONN_SHUTDOWN);
                } else {
-                       dnode = tsk_peer_node(tsk);
-
                        skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE,
                                              TIPC_CONN_MSG, SHORT_H_SIZE,
-                                             0, dnode, tsk_own_node(tsk),
-                                             tsk_peer_port(tsk),
-                                             tsk->portid, TIPC_CONN_SHUTDOWN);
-                       tipc_link_xmit_skb(net, skb, dnode, tsk->portid);
+                                             0, dnode, onode, dport, oport,
+                                             TIPC_CONN_SHUTDOWN);
+                       tipc_node_xmit_skb(net, skb, dnode, tsk->portid);
                }
                tsk->connected = 0;
                sock->state = SS_DISCONNECTING;
@@ -2163,7 +2168,7 @@ static void tipc_sk_timeout(unsigned long data)
        }
        bh_unlock_sock(sk);
        if (skb)
-               tipc_link_xmit_skb(sock_net(sk), skb, peer_node, tsk->portid);
+               tipc_node_xmit_skb(sock_net(sk), skb, peer_node, tsk->portid);
 exit:
        sock_put(sk);
 }