These changes are the raw update to linux-4.4.6-rt14. Kernel sources
[kvmfornfv.git] / kernel / net / unix / af_unix.c
index 0643059..898a53a 100644 (file)
@@ -140,12 +140,17 @@ static struct hlist_head *unix_sockets_unbound(void *addr)
 #ifdef CONFIG_SECURITY_NETWORK
 static void unix_get_secdata(struct scm_cookie *scm, struct sk_buff *skb)
 {
-       memcpy(UNIXSID(skb), &scm->secid, sizeof(u32));
+       UNIXCB(skb).secid = scm->secid;
 }
 
 static inline void unix_set_secdata(struct scm_cookie *scm, struct sk_buff *skb)
 {
-       scm->secid = *UNIXSID(skb);
+       scm->secid = UNIXCB(skb).secid;
+}
+
+static inline bool unix_secdata_eq(struct scm_cookie *scm, struct sk_buff *skb)
+{
+       return (scm->secid == UNIXCB(skb).secid);
 }
 #else
 static inline void unix_get_secdata(struct scm_cookie *scm, struct sk_buff *skb)
@@ -153,6 +158,11 @@ static inline void unix_get_secdata(struct scm_cookie *scm, struct sk_buff *skb)
 
 static inline void unix_set_secdata(struct scm_cookie *scm, struct sk_buff *skb)
 { }
+
+static inline bool unix_secdata_eq(struct scm_cookie *scm, struct sk_buff *skb)
+{
+       return true;
+}
 #endif /* CONFIG_SECURITY_NETWORK */
 
 /*
@@ -316,9 +326,122 @@ found:
        return s;
 }
 
-static inline int unix_writable(struct sock *sk)
+/* Support code for asymmetrically connected dgram sockets
+ *
+ * If a datagram socket is connected to a socket not itself connected
+ * to the first socket (eg, /dev/log), clients may only enqueue more
+ * messages if the present receive queue of the server socket is not
+ * "too large". This means there's a second writeability condition
+ * poll and sendmsg need to test. The dgram recv code will do a wake
+ * up on the peer_wait wait queue of a socket upon reception of a
+ * datagram which needs to be propagated to sleeping would-be writers
+ * since these might not have sent anything so far. This can't be
+ * accomplished via poll_wait because the lifetime of the server
+ * socket might be less than that of its clients if these break their
+ * association with it or if the server socket is closed while clients
+ * are still connected to it and there's no way to inform "a polling
+ * implementation" that it should let go of a certain wait queue
+ *
+ * In order to propagate a wake up, a wait_queue_t of the client
+ * socket is enqueued on the peer_wait queue of the server socket
+ * whose wake function does a wake_up on the ordinary client socket
+ * wait queue. This connection is established whenever a write (or
+ * poll for write) hit the flow control condition and broken when the
+ * association to the server socket is dissolved or after a wake up
+ * was relayed.
+ */
+
+static int unix_dgram_peer_wake_relay(wait_queue_t *q, unsigned mode, int flags,
+                                     void *key)
+{
+       struct unix_sock *u;
+       wait_queue_head_t *u_sleep;
+
+       u = container_of(q, struct unix_sock, peer_wake);
+
+       __remove_wait_queue(&unix_sk(u->peer_wake.private)->peer_wait,
+                           q);
+       u->peer_wake.private = NULL;
+
+       /* relaying can only happen while the wq still exists */
+       u_sleep = sk_sleep(&u->sk);
+       if (u_sleep)
+               wake_up_interruptible_poll(u_sleep, key);
+
+       return 0;
+}
+
+static int unix_dgram_peer_wake_connect(struct sock *sk, struct sock *other)
 {
-       return (atomic_read(&sk->sk_wmem_alloc) << 2) <= sk->sk_sndbuf;
+       struct unix_sock *u, *u_other;
+       int rc;
+
+       u = unix_sk(sk);
+       u_other = unix_sk(other);
+       rc = 0;
+       spin_lock(&u_other->peer_wait.lock);
+
+       if (!u->peer_wake.private) {
+               u->peer_wake.private = other;
+               __add_wait_queue(&u_other->peer_wait, &u->peer_wake);
+
+               rc = 1;
+       }
+
+       spin_unlock(&u_other->peer_wait.lock);
+       return rc;
+}
+
+static void unix_dgram_peer_wake_disconnect(struct sock *sk,
+                                           struct sock *other)
+{
+       struct unix_sock *u, *u_other;
+
+       u = unix_sk(sk);
+       u_other = unix_sk(other);
+       spin_lock(&u_other->peer_wait.lock);
+
+       if (u->peer_wake.private == other) {
+               __remove_wait_queue(&u_other->peer_wait, &u->peer_wake);
+               u->peer_wake.private = NULL;
+       }
+
+       spin_unlock(&u_other->peer_wait.lock);
+}
+
+static void unix_dgram_peer_wake_disconnect_wakeup(struct sock *sk,
+                                                  struct sock *other)
+{
+       unix_dgram_peer_wake_disconnect(sk, other);
+       wake_up_interruptible_poll(sk_sleep(sk),
+                                  POLLOUT |
+                                  POLLWRNORM |
+                                  POLLWRBAND);
+}
+
+/* preconditions:
+ *     - unix_peer(sk) == other
+ *     - association is stable
+ */
+static int unix_dgram_peer_wake_me(struct sock *sk, struct sock *other)
+{
+       int connected;
+
+       connected = unix_dgram_peer_wake_connect(sk, other);
+
+       if (unix_recvq_full(other))
+               return 1;
+
+       if (connected)
+               unix_dgram_peer_wake_disconnect(sk, other);
+
+       return 0;
+}
+
+static int unix_writable(const struct sock *sk)
+{
+       return sk->sk_state != TCP_LISTEN &&
+              (atomic_read(&sk->sk_wmem_alloc) << 2) <= sk->sk_sndbuf;
 }
 
 static void unix_write_space(struct sock *sk)
@@ -420,6 +543,8 @@ static void unix_release_sock(struct sock *sk, int embrion)
                        skpair->sk_state_change(skpair);
                        sk_wake_async(skpair, SOCK_WAKE_WAITD, POLL_HUP);
                }
+
+               unix_dgram_peer_wake_disconnect(sk, skpair);
                sock_put(skpair); /* It may now die */
                unix_peer(sk) = NULL;
        }
@@ -430,6 +555,7 @@ static void unix_release_sock(struct sock *sk, int embrion)
                if (state == TCP_LISTEN)
                        unix_release_sock(skb->sk, 1);
                /* passed fds are erased in the kfree_skb hook        */
+               UNIXCB(skb).consumed = skb->len;
                kfree_skb(skb);
        }
 
@@ -518,6 +644,11 @@ static int unix_ioctl(struct socket *, unsigned int, unsigned long);
 static int unix_shutdown(struct socket *, int);
 static int unix_stream_sendmsg(struct socket *, struct msghdr *, size_t);
 static int unix_stream_recvmsg(struct socket *, struct msghdr *, size_t, int);
+static ssize_t unix_stream_sendpage(struct socket *, struct page *, int offset,
+                                   size_t size, int flags);
+static ssize_t unix_stream_splice_read(struct socket *,  loff_t *ppos,
+                                      struct pipe_inode_info *, size_t size,
+                                      unsigned int flags);
 static int unix_dgram_sendmsg(struct socket *, struct msghdr *, size_t);
 static int unix_dgram_recvmsg(struct socket *, struct msghdr *, size_t, int);
 static int unix_dgram_connect(struct socket *, struct sockaddr *,
@@ -558,7 +689,8 @@ static const struct proto_ops unix_stream_ops = {
        .sendmsg =      unix_stream_sendmsg,
        .recvmsg =      unix_stream_recvmsg,
        .mmap =         sock_no_mmap,
-       .sendpage =     sock_no_sendpage,
+       .sendpage =     unix_stream_sendpage,
+       .splice_read =  unix_stream_splice_read,
        .set_peek_off = unix_set_peek_off,
 };
 
@@ -620,7 +752,7 @@ static struct proto unix_proto = {
  */
 static struct lock_class_key af_unix_sk_receive_queue_lock_key;
 
-static struct sock *unix_create1(struct net *net, struct socket *sock)
+static struct sock *unix_create1(struct net *net, struct socket *sock, int kern)
 {
        struct sock *sk = NULL;
        struct unix_sock *u;
@@ -629,7 +761,7 @@ static struct sock *unix_create1(struct net *net, struct socket *sock)
        if (atomic_long_read(&unix_nr_socks) > 2 * get_max_files())
                goto out;
 
-       sk = sk_alloc(net, PF_UNIX, GFP_KERNEL, &unix_proto);
+       sk = sk_alloc(net, PF_UNIX, GFP_KERNEL, &unix_proto, kern);
        if (!sk)
                goto out;
 
@@ -648,6 +780,7 @@ static struct sock *unix_create1(struct net *net, struct socket *sock)
        INIT_LIST_HEAD(&u->link);
        mutex_init(&u->readlock); /* single task reading lock */
        init_waitqueue_head(&u->peer_wait);
+       init_waitqueue_func_entry(&u->peer_wake, unix_dgram_peer_wake_relay);
        unix_insert_socket(unix_sockets_unbound(sk), sk);
 out:
        if (sk == NULL)
@@ -688,7 +821,7 @@ static int unix_create(struct net *net, struct socket *sock, int protocol,
                return -ESOCKTNOSUPPORT;
        }
 
-       return unix_create1(net, sock) ? 0 : -ENOMEM;
+       return unix_create1(net, sock, kern) ? 0 : -ENOMEM;
 }
 
 static int unix_release(struct socket *sock)
@@ -820,32 +953,20 @@ fail:
        return NULL;
 }
 
-static int unix_mknod(const char *sun_path, umode_t mode, struct path *res)
+static int unix_mknod(struct dentry *dentry, struct path *path, umode_t mode,
+                     struct path *res)
 {
-       struct dentry *dentry;
-       struct path path;
-       int err = 0;
-       /*
-        * Get the parent directory, calculate the hash for last
-        * component.
-        */
-       dentry = kern_path_create(AT_FDCWD, sun_path, &path, 0);
-       err = PTR_ERR(dentry);
-       if (IS_ERR(dentry))
-               return err;
+       int err;
 
-       /*
-        * All right, let's create it.
-        */
-       err = security_path_mknod(&path, dentry, mode, 0);
+       err = security_path_mknod(path, dentry, mode, 0);
        if (!err) {
-               err = vfs_mknod(d_inode(path.dentry), dentry, mode, 0);
+               err = vfs_mknod(d_inode(path->dentry), dentry, mode, 0);
                if (!err) {
-                       res->mnt = mntget(path.mnt);
+                       res->mnt = mntget(path->mnt);
                        res->dentry = dget(dentry);
                }
        }
-       done_path_create(&path, dentry);
+
        return err;
 }
 
@@ -856,10 +977,12 @@ static int unix_bind(struct socket *sock, struct sockaddr *uaddr, int addr_len)
        struct unix_sock *u = unix_sk(sk);
        struct sockaddr_un *sunaddr = (struct sockaddr_un *)uaddr;
        char *sun_path = sunaddr->sun_path;
-       int err;
+       int err, name_err;
        unsigned int hash;
        struct unix_address *addr;
        struct hlist_head *list;
+       struct path path;
+       struct dentry *dentry;
 
        err = -EINVAL;
        if (sunaddr->sun_family != AF_UNIX)
@@ -875,14 +998,34 @@ static int unix_bind(struct socket *sock, struct sockaddr *uaddr, int addr_len)
                goto out;
        addr_len = err;
 
+       name_err = 0;
+       dentry = NULL;
+       if (sun_path[0]) {
+               /* Get the parent directory, calculate the hash for last
+                * component.
+                */
+               dentry = kern_path_create(AT_FDCWD, sun_path, &path, 0);
+
+               if (IS_ERR(dentry)) {
+                       /* delay report until after 'already bound' check */
+                       name_err = PTR_ERR(dentry);
+                       dentry = NULL;
+               }
+       }
+
        err = mutex_lock_interruptible(&u->readlock);
        if (err)
-               goto out;
+               goto out_path;
 
        err = -EINVAL;
        if (u->addr)
                goto out_up;
 
+       if (name_err) {
+               err = name_err == -EEXIST ? -EADDRINUSE : name_err;
+               goto out_up;
+       }
+
        err = -ENOMEM;
        addr = kmalloc(sizeof(*addr)+addr_len, GFP_KERNEL);
        if (!addr)
@@ -893,11 +1036,11 @@ static int unix_bind(struct socket *sock, struct sockaddr *uaddr, int addr_len)
        addr->hash = hash ^ sk->sk_type;
        atomic_set(&addr->refcnt, 1);
 
-       if (sun_path[0]) {
-               struct path path;
+       if (dentry) {
+               struct path u_path;
                umode_t mode = S_IFSOCK |
                       (SOCK_INODE(sock)->i_mode & ~current_umask());
-               err = unix_mknod(sun_path, mode, &path);
+               err = unix_mknod(dentry, &path, mode, &u_path);
                if (err) {
                        if (err == -EEXIST)
                                err = -EADDRINUSE;
@@ -905,9 +1048,9 @@ static int unix_bind(struct socket *sock, struct sockaddr *uaddr, int addr_len)
                        goto out_up;
                }
                addr->hash = UNIX_HASH_SIZE;
-               hash = d_backing_inode(path.dentry)->i_ino & (UNIX_HASH_SIZE-1);
+               hash = d_backing_inode(dentry)->i_ino & (UNIX_HASH_SIZE - 1);
                spin_lock(&unix_table_lock);
-               u->path = path;
+               u->path = u_path;
                list = &unix_socket_table[hash];
        } else {
                spin_lock(&unix_table_lock);
@@ -930,6 +1073,10 @@ out_unlock:
        spin_unlock(&unix_table_lock);
 out_up:
        mutex_unlock(&u->readlock);
+out_path:
+       if (dentry)
+               done_path_create(&path, dentry);
+
 out:
        return err;
 }
@@ -1015,6 +1162,8 @@ restart:
        if (unix_peer(sk)) {
                struct sock *old_peer = unix_peer(sk);
                unix_peer(sk) = other;
+               unix_dgram_peer_wake_disconnect_wakeup(sk, old_peer);
+
                unix_state_double_unlock(sk, other);
 
                if (other != old_peer)
@@ -1088,7 +1237,7 @@ static int unix_stream_connect(struct socket *sock, struct sockaddr *uaddr,
        err = -ENOMEM;
 
        /* create new sock for complete connection */
-       newsk = unix_create1(sock_net(sk), NULL);
+       newsk = unix_create1(sock_net(sk), NULL, 0);
        if (newsk == NULL)
                goto out;
 
@@ -1347,7 +1496,7 @@ static void unix_detach_fds(struct scm_cookie *scm, struct sk_buff *skb)
        UNIXCB(skb).fp = NULL;
 
        for (i = scm->fp->count-1; i >= 0; i--)
-               unix_notinflight(scm->fp->fp[i]);
+               unix_notinflight(scm->fp->user, scm->fp->fp[i]);
 }
 
 static void unix_destruct_scm(struct sk_buff *skb)
@@ -1364,6 +1513,21 @@ static void unix_destruct_scm(struct sk_buff *skb)
        sock_wfree(skb);
 }
 
+/*
+ * The "user->unix_inflight" variable is protected by the garbage
+ * collection lock, and we just read it locklessly here. If you go
+ * over the limit, there might be a tiny race in actually noticing
+ * it across threads. Tough.
+ */
+static inline bool too_many_unix_fds(struct task_struct *p)
+{
+       struct user_struct *user = current_user();
+
+       if (unlikely(user->unix_inflight > task_rlimit(p, RLIMIT_NOFILE)))
+               return !capable(CAP_SYS_RESOURCE) && !capable(CAP_SYS_ADMIN);
+       return false;
+}
+
 #define MAX_RECURSION_LEVEL 4
 
 static int unix_attach_fds(struct scm_cookie *scm, struct sk_buff *skb)
@@ -1372,6 +1536,9 @@ static int unix_attach_fds(struct scm_cookie *scm, struct sk_buff *skb)
        unsigned char max_level = 0;
        int unix_sock_count = 0;
 
+       if (too_many_unix_fds(current))
+               return -ETOOMANYREFS;
+
        for (i = scm->fp->count - 1; i >= 0; i--) {
                struct sock *sk = unix_get_socket(scm->fp->fp[i]);
 
@@ -1393,10 +1560,8 @@ static int unix_attach_fds(struct scm_cookie *scm, struct sk_buff *skb)
        if (!UNIXCB(skb).fp)
                return -ENOMEM;
 
-       if (unix_sock_count) {
-               for (i = scm->fp->count - 1; i >= 0; i--)
-                       unix_inflight(scm->fp->fp[i]);
-       }
+       for (i = scm->fp->count - 1; i >= 0; i--)
+               unix_inflight(scm->fp->user, scm->fp->fp[i]);
        return max_level;
 }
 
@@ -1408,6 +1573,7 @@ static int unix_scm_to_skb(struct scm_cookie *scm, struct sk_buff *skb, bool sen
        UNIXCB(skb).uid = scm->creds.uid;
        UNIXCB(skb).gid = scm->creds.gid;
        UNIXCB(skb).fp = NULL;
+       unix_get_secdata(scm, skb);
        if (scm->fp && send_fds)
                err = unix_attach_fds(scm, skb);
 
@@ -1415,6 +1581,14 @@ static int unix_scm_to_skb(struct scm_cookie *scm, struct sk_buff *skb, bool sen
        return err;
 }
 
+static bool unix_passcred_enabled(const struct socket *sock,
+                                 const struct sock *other)
+{
+       return test_bit(SOCK_PASSCRED, &sock->flags) ||
+              !other->sk_socket ||
+              test_bit(SOCK_PASSCRED, &other->sk_socket->flags);
+}
+
 /*
  * Some apps rely on write() giving SCM_CREDENTIALS
  * We include credentials if source or destination socket
@@ -1425,14 +1599,41 @@ static void maybe_add_creds(struct sk_buff *skb, const struct socket *sock,
 {
        if (UNIXCB(skb).pid)
                return;
-       if (test_bit(SOCK_PASSCRED, &sock->flags) ||
-           !other->sk_socket ||
-           test_bit(SOCK_PASSCRED, &other->sk_socket->flags)) {
+       if (unix_passcred_enabled(sock, other)) {
                UNIXCB(skb).pid  = get_pid(task_tgid(current));
                current_uid_gid(&UNIXCB(skb).uid, &UNIXCB(skb).gid);
        }
 }
 
+static int maybe_init_creds(struct scm_cookie *scm,
+                           struct socket *socket,
+                           const struct sock *other)
+{
+       int err;
+       struct msghdr msg = { .msg_controllen = 0 };
+
+       err = scm_send(socket, &msg, scm, false);
+       if (err)
+               return err;
+
+       if (unix_passcred_enabled(socket, other)) {
+               scm->pid = get_pid(task_tgid(current));
+               current_uid_gid(&scm->creds.uid, &scm->creds.gid);
+       }
+       return err;
+}
+
+static bool unix_skb_scm_eq(struct sk_buff *skb,
+                           struct scm_cookie *scm)
+{
+       const struct unix_skb_parms *u = &UNIXCB(skb);
+
+       return u->pid == scm->pid &&
+              uid_eq(u->uid, scm->creds.uid) &&
+              gid_eq(u->gid, scm->creds.gid) &&
+              unix_secdata_eq(scm, skb);
+}
+
 /*
  *     Send AF_UNIX data.
  */
@@ -1453,6 +1654,7 @@ static int unix_dgram_sendmsg(struct socket *sock, struct msghdr *msg,
        struct scm_cookie scm;
        int max_level;
        int data_len = 0;
+       int sk_locked;
 
        wait_for_unix_gc();
        err = scm_send(sock, msg, &scm, false);
@@ -1503,7 +1705,6 @@ static int unix_dgram_sendmsg(struct socket *sock, struct msghdr *msg,
        if (err < 0)
                goto out_free;
        max_level = err + 1;
-       unix_get_secdata(&scm, skb);
 
        skb_put(skb, len - data_len);
        skb->data_len = data_len;
@@ -1532,12 +1733,14 @@ restart:
                goto out_free;
        }
 
+       sk_locked = 0;
        unix_state_lock(other);
+restart_locked:
        err = -EPERM;
        if (!unix_may_send(sk, other))
                goto out_unlock;
 
-       if (sock_flag(other, SOCK_DEAD)) {
+       if (unlikely(sock_flag(other, SOCK_DEAD))) {
                /*
                 *      Check with 1003.1g - what should
                 *      datagram error
@@ -1545,10 +1748,14 @@ restart:
                unix_state_unlock(other);
                sock_put(other);
 
+               if (!sk_locked)
+                       unix_state_lock(sk);
+
                err = 0;
-               unix_state_lock(sk);
                if (unix_peer(sk) == other) {
                        unix_peer(sk) = NULL;
+                       unix_dgram_peer_wake_disconnect_wakeup(sk, other);
+
                        unix_state_unlock(sk);
 
                        unix_dgram_disconnected(sk, other);
@@ -1574,21 +1781,43 @@ restart:
                        goto out_unlock;
        }
 
-       if (unix_peer(other) != sk && unix_recvq_full(other)) {
-               if (!timeo) {
-                       err = -EAGAIN;
-                       goto out_unlock;
+       /* other == sk && unix_peer(other) != sk if
+        * - unix_peer(sk) == NULL, destination address bound to sk
+        * - unix_peer(sk) == sk by time of get but disconnected before lock
+        */
+       if (other != sk &&
+           unlikely(unix_peer(other) != sk && unix_recvq_full(other))) {
+               if (timeo) {
+                       timeo = unix_wait_for_peer(other, timeo);
+
+                       err = sock_intr_errno(timeo);
+                       if (signal_pending(current))
+                               goto out_free;
+
+                       goto restart;
                }
 
-               timeo = unix_wait_for_peer(other, timeo);
+               if (!sk_locked) {
+                       unix_state_unlock(other);
+                       unix_state_double_lock(sk, other);
+               }
 
-               err = sock_intr_errno(timeo);
-               if (signal_pending(current))
-                       goto out_free;
+               if (unix_peer(sk) != other ||
+                   unix_dgram_peer_wake_me(sk, other)) {
+                       err = -EAGAIN;
+                       sk_locked = 1;
+                       goto out_unlock;
+               }
 
-               goto restart;
+               if (!sk_locked) {
+                       sk_locked = 1;
+                       goto restart_locked;
+               }
        }
 
+       if (unlikely(sk_locked))
+               unix_state_unlock(sk);
+
        if (sock_flag(other, SOCK_RCVTSTAMP))
                __net_timestamp(skb);
        maybe_add_creds(skb, sock, other);
@@ -1602,6 +1831,8 @@ restart:
        return len;
 
 out_unlock:
+       if (sk_locked)
+               unix_state_unlock(sk);
        unix_state_unlock(other);
 out_free:
        kfree_skb(skb);
@@ -1720,6 +1951,122 @@ out_err:
        return sent ? : err;
 }
 
+static ssize_t unix_stream_sendpage(struct socket *socket, struct page *page,
+                                   int offset, size_t size, int flags)
+{
+       int err;
+       bool send_sigpipe = false;
+       bool init_scm = true;
+       struct scm_cookie scm;
+       struct sock *other, *sk = socket->sk;
+       struct sk_buff *skb, *newskb = NULL, *tail = NULL;
+
+       if (flags & MSG_OOB)
+               return -EOPNOTSUPP;
+
+       other = unix_peer(sk);
+       if (!other || sk->sk_state != TCP_ESTABLISHED)
+               return -ENOTCONN;
+
+       if (false) {
+alloc_skb:
+               unix_state_unlock(other);
+               mutex_unlock(&unix_sk(other)->readlock);
+               newskb = sock_alloc_send_pskb(sk, 0, 0, flags & MSG_DONTWAIT,
+                                             &err, 0);
+               if (!newskb)
+                       goto err;
+       }
+
+       /* we must acquire readlock as we modify already present
+        * skbs in the sk_receive_queue and mess with skb->len
+        */
+       err = mutex_lock_interruptible(&unix_sk(other)->readlock);
+       if (err) {
+               err = flags & MSG_DONTWAIT ? -EAGAIN : -ERESTARTSYS;
+               goto err;
+       }
+
+       if (sk->sk_shutdown & SEND_SHUTDOWN) {
+               err = -EPIPE;
+               send_sigpipe = true;
+               goto err_unlock;
+       }
+
+       unix_state_lock(other);
+
+       if (sock_flag(other, SOCK_DEAD) ||
+           other->sk_shutdown & RCV_SHUTDOWN) {
+               err = -EPIPE;
+               send_sigpipe = true;
+               goto err_state_unlock;
+       }
+
+       if (init_scm) {
+               err = maybe_init_creds(&scm, socket, other);
+               if (err)
+                       goto err_state_unlock;
+               init_scm = false;
+       }
+
+       skb = skb_peek_tail(&other->sk_receive_queue);
+       if (tail && tail == skb) {
+               skb = newskb;
+       } else if (!skb || !unix_skb_scm_eq(skb, &scm)) {
+               if (newskb) {
+                       skb = newskb;
+               } else {
+                       tail = skb;
+                       goto alloc_skb;
+               }
+       } else if (newskb) {
+               /* this is fast path, we don't necessarily need to
+                * call to kfree_skb even though with newskb == NULL
+                * this - does no harm
+                */
+               consume_skb(newskb);
+               newskb = NULL;
+       }
+
+       if (skb_append_pagefrags(skb, page, offset, size)) {
+               tail = skb;
+               goto alloc_skb;
+       }
+
+       skb->len += size;
+       skb->data_len += size;
+       skb->truesize += size;
+       atomic_add(size, &sk->sk_wmem_alloc);
+
+       if (newskb) {
+               err = unix_scm_to_skb(&scm, skb, false);
+               if (err)
+                       goto err_state_unlock;
+               spin_lock(&other->sk_receive_queue.lock);
+               __skb_queue_tail(&other->sk_receive_queue, newskb);
+               spin_unlock(&other->sk_receive_queue.lock);
+       }
+
+       unix_state_unlock(other);
+       mutex_unlock(&unix_sk(other)->readlock);
+
+       other->sk_data_ready(other);
+       scm_destroy(&scm);
+       return size;
+
+err_state_unlock:
+       unix_state_unlock(other);
+err_unlock:
+       mutex_unlock(&unix_sk(other)->readlock);
+err:
+       kfree_skb(newskb);
+       if (send_sigpipe && !(flags & MSG_NOSIGNAL))
+               send_sig(SIGPIPE, current, 0);
+       if (!init_scm)
+               scm_destroy(&scm);
+       return err;
+}
+
 static int unix_seqpacket_sendmsg(struct socket *sock, struct msghdr *msg,
                                  size_t len)
 {
@@ -1860,8 +2207,9 @@ out:
  *     Sleep until more data has arrived. But check for races..
  */
 static long unix_stream_data_wait(struct sock *sk, long timeo,
-                                 struct sk_buff *last)
+                                 struct sk_buff *last, unsigned int last_len)
 {
+       struct sk_buff *tail;
        DEFINE_WAIT(wait);
 
        unix_state_lock(sk);
@@ -1869,14 +2217,16 @@ static long unix_stream_data_wait(struct sock *sk, long timeo,
        for (;;) {
                prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
 
-               if (skb_peek_tail(&sk->sk_receive_queue) != last ||
+               tail = skb_peek_tail(&sk->sk_receive_queue);
+               if (tail != last ||
+                   (tail && tail->len != last_len) ||
                    sk->sk_err ||
                    (sk->sk_shutdown & RCV_SHUTDOWN) ||
                    signal_pending(current) ||
                    !timeo)
                        break;
 
-               set_bit(SOCK_ASYNC_WAITDATA, &sk->sk_socket->flags);
+               sk_set_bit(SOCKWQ_ASYNC_WAITDATA, sk);
                unix_state_unlock(sk);
                timeo = freezable_schedule_timeout(timeo);
                unix_state_lock(sk);
@@ -1884,7 +2234,7 @@ static long unix_stream_data_wait(struct sock *sk, long timeo,
                if (sock_flag(sk, SOCK_DEAD))
                        break;
 
-               clear_bit(SOCK_ASYNC_WAITDATA, &sk->sk_socket->flags);
+               sk_clear_bit(SOCKWQ_ASYNC_WAITDATA, sk);
        }
 
        finish_wait(sk_sleep(sk), &wait);
@@ -1897,49 +2247,62 @@ static unsigned int unix_skb_len(const struct sk_buff *skb)
        return skb->len - UNIXCB(skb).consumed;
 }
 
-static int unix_stream_recvmsg(struct socket *sock, struct msghdr *msg,
-                              size_t size, int flags)
+struct unix_stream_read_state {
+       int (*recv_actor)(struct sk_buff *, int, int,
+                         struct unix_stream_read_state *);
+       struct socket *socket;
+       struct msghdr *msg;
+       struct pipe_inode_info *pipe;
+       size_t size;
+       int flags;
+       unsigned int splice_flags;
+};
+
+static int unix_stream_read_generic(struct unix_stream_read_state *state)
 {
        struct scm_cookie scm;
+       struct socket *sock = state->socket;
        struct sock *sk = sock->sk;
        struct unix_sock *u = unix_sk(sk);
-       DECLARE_SOCKADDR(struct sockaddr_un *, sunaddr, msg->msg_name);
        int copied = 0;
+       int flags = state->flags;
        int noblock = flags & MSG_DONTWAIT;
-       int check_creds = 0;
+       bool check_creds = false;
        int target;
        int err = 0;
        long timeo;
        int skip;
+       size_t size = state->size;
+       unsigned int last_len;
 
-       err = -EINVAL;
-       if (sk->sk_state != TCP_ESTABLISHED)
+       if (unlikely(sk->sk_state != TCP_ESTABLISHED)) {
+               err = -EINVAL;
                goto out;
+       }
 
-       err = -EOPNOTSUPP;
-       if (flags&MSG_OOB)
+       if (unlikely(flags & MSG_OOB)) {
+               err = -EOPNOTSUPP;
                goto out;
+       }
 
-       target = sock_rcvlowat(sk, flags&MSG_WAITALL, size);
+       target = sock_rcvlowat(sk, flags & MSG_WAITALL, size);
        timeo = sock_rcvtimeo(sk, noblock);
 
+       memset(&scm, 0, sizeof(scm));
+
        /* Lock the socket to prevent queue disordering
         * while sleeps in memcpy_tomsg
         */
+       mutex_lock(&u->readlock);
 
-       memset(&scm, 0, sizeof(scm));
-
-       err = mutex_lock_interruptible(&u->readlock);
-       if (unlikely(err)) {
-               /* recvmsg() in non blocking mode is supposed to return -EAGAIN
-                * sk_rcvtimeo is not honored by mutex_lock_interruptible()
-                */
-               err = noblock ? -EAGAIN : -ERESTARTSYS;
-               goto out;
-       }
+       if (flags & MSG_PEEK)
+               skip = sk_peek_offset(sk, flags);
+       else
+               skip = 0;
 
        do {
                int chunk;
+               bool drop_skb;
                struct sk_buff *skb, *last;
 
                unix_state_lock(sk);
@@ -1948,6 +2311,7 @@ static int unix_stream_recvmsg(struct socket *sock, struct msghdr *msg,
                        goto unlock;
                }
                last = skb = skb_peek(&sk->sk_receive_queue);
+               last_len = last ? last->len : 0;
 again:
                if (skb == NULL) {
                        unix_sk(sk)->recursion_level = 0;
@@ -1965,29 +2329,33 @@ again:
                                goto unlock;
 
                        unix_state_unlock(sk);
-                       err = -EAGAIN;
-                       if (!timeo)
+                       if (!timeo) {
+                               err = -EAGAIN;
                                break;
+                       }
+
                        mutex_unlock(&u->readlock);
 
-                       timeo = unix_stream_data_wait(sk, timeo, last);
+                       timeo = unix_stream_data_wait(sk, timeo, last,
+                                                     last_len);
 
-                       if (signal_pending(current)
-                           ||  mutex_lock_interruptible(&u->readlock)) {
+                       if (signal_pending(current)) {
                                err = sock_intr_errno(timeo);
+                               scm_destroy(&scm);
                                goto out;
                        }
 
+                       mutex_lock(&u->readlock);
                        continue;
- unlock:
+unlock:
                        unix_state_unlock(sk);
                        break;
                }
 
-               skip = sk_peek_offset(sk, flags);
                while (skip >= unix_skb_len(skb)) {
                        skip -= unix_skb_len(skb);
                        last = skb;
+                       last_len = skb->len;
                        skb = skb_peek_next(skb, &sk->sk_receive_queue);
                        if (!skb)
                                goto again;
@@ -1997,25 +2365,30 @@ again:
 
                if (check_creds) {
                        /* Never glue messages from different writers */
-                       if ((UNIXCB(skb).pid  != scm.pid) ||
-                           !uid_eq(UNIXCB(skb).uid, scm.creds.uid) ||
-                           !gid_eq(UNIXCB(skb).gid, scm.creds.gid))
+                       if (!unix_skb_scm_eq(skb, &scm))
                                break;
                } else if (test_bit(SOCK_PASSCRED, &sock->flags)) {
                        /* Copy credentials */
                        scm_set_cred(&scm, UNIXCB(skb).pid, UNIXCB(skb).uid, UNIXCB(skb).gid);
-                       check_creds = 1;
+                       unix_set_secdata(&scm, skb);
+                       check_creds = true;
                }
 
                /* Copy address just once */
-               if (sunaddr) {
-                       unix_copy_addr(msg, skb->sk);
+               if (state->msg && state->msg->msg_name) {
+                       DECLARE_SOCKADDR(struct sockaddr_un *, sunaddr,
+                                        state->msg->msg_name);
+                       unix_copy_addr(state->msg, skb->sk);
                        sunaddr = NULL;
                }
 
                chunk = min_t(unsigned int, unix_skb_len(skb) - skip, size);
-               if (skb_copy_datagram_msg(skb, UNIXCB(skb).consumed + skip,
-                                         msg, chunk)) {
+               skb_get(skb);
+               chunk = state->recv_actor(skb, skip, chunk, state);
+               drop_skb = !unix_skb_len(skb);
+               /* skb is only safe to use if !drop_skb */
+               consume_skb(skb);
+               if (chunk < 0) {
                        if (copied == 0)
                                copied = -EFAULT;
                        break;
@@ -2023,6 +2396,18 @@ again:
                copied += chunk;
                size -= chunk;
 
+               if (drop_skb) {
+                       /* the skb was touched by a concurrent reader;
+                        * we should not expect anything from this skb
+                        * anymore and assume it invalid - we can be
+                        * sure it was dropped from the socket queue
+                        *
+                        * let's report a short read
+                        */
+                       err = 0;
+                       break;
+               }
+
                /* Mark read part of skb as used */
                if (!(flags & MSG_PEEK)) {
                        UNIXCB(skb).consumed += chunk;
@@ -2048,16 +2433,101 @@ again:
 
                        sk_peek_offset_fwd(sk, chunk);
 
+                       if (UNIXCB(skb).fp)
+                               break;
+
+                       skip = 0;
+                       last = skb;
+                       last_len = skb->len;
+                       unix_state_lock(sk);
+                       skb = skb_peek_next(skb, &sk->sk_receive_queue);
+                       if (skb)
+                               goto again;
+                       unix_state_unlock(sk);
                        break;
                }
        } while (size);
 
        mutex_unlock(&u->readlock);
-       scm_recv(sock, msg, &scm, flags);
+       if (state->msg)
+               scm_recv(sock, state->msg, &scm, flags);
+       else
+               scm_destroy(&scm);
 out:
        return copied ? : err;
 }
 
+static int unix_stream_read_actor(struct sk_buff *skb,
+                                 int skip, int chunk,
+                                 struct unix_stream_read_state *state)
+{
+       int ret;
+
+       ret = skb_copy_datagram_msg(skb, UNIXCB(skb).consumed + skip,
+                                   state->msg, chunk);
+       return ret ?: chunk;
+}
+
+static int unix_stream_recvmsg(struct socket *sock, struct msghdr *msg,
+                              size_t size, int flags)
+{
+       struct unix_stream_read_state state = {
+               .recv_actor = unix_stream_read_actor,
+               .socket = sock,
+               .msg = msg,
+               .size = size,
+               .flags = flags
+       };
+
+       return unix_stream_read_generic(&state);
+}
+
+static ssize_t skb_unix_socket_splice(struct sock *sk,
+                                     struct pipe_inode_info *pipe,
+                                     struct splice_pipe_desc *spd)
+{
+       int ret;
+       struct unix_sock *u = unix_sk(sk);
+
+       mutex_unlock(&u->readlock);
+       ret = splice_to_pipe(pipe, spd);
+       mutex_lock(&u->readlock);
+
+       return ret;
+}
+
+static int unix_stream_splice_actor(struct sk_buff *skb,
+                                   int skip, int chunk,
+                                   struct unix_stream_read_state *state)
+{
+       return skb_splice_bits(skb, state->socket->sk,
+                              UNIXCB(skb).consumed + skip,
+                              state->pipe, chunk, state->splice_flags,
+                              skb_unix_socket_splice);
+}
+
+static ssize_t unix_stream_splice_read(struct socket *sock,  loff_t *ppos,
+                                      struct pipe_inode_info *pipe,
+                                      size_t size, unsigned int flags)
+{
+       struct unix_stream_read_state state = {
+               .recv_actor = unix_stream_splice_actor,
+               .socket = sock,
+               .pipe = pipe,
+               .size = size,
+               .splice_flags = flags,
+       };
+
+       if (unlikely(*ppos))
+               return -ESPIPE;
+
+       if (sock->file->f_flags & O_NONBLOCK ||
+           flags & SPLICE_F_NONBLOCK)
+               state.flags = MSG_DONTWAIT;
+
+       return unix_stream_read_generic(&state);
+}
+
 static int unix_shutdown(struct socket *sock, int mode)
 {
        struct sock *sk = sock->sk;
@@ -2231,20 +2701,22 @@ static unsigned int unix_dgram_poll(struct file *file, struct socket *sock,
                return mask;
 
        writable = unix_writable(sk);
-       other = unix_peer_get(sk);
-       if (other) {
-               if (unix_peer(other) != sk) {
-                       sock_poll_wait(file, &unix_sk(other)->peer_wait, wait);
-                       if (unix_recvq_full(other))
-                               writable = 0;
-               }
-               sock_put(other);
+       if (writable) {
+               unix_state_lock(sk);
+
+               other = unix_peer(sk);
+               if (other && unix_peer(other) != sk &&
+                   unix_recvq_full(other) &&
+                   unix_dgram_peer_wake_me(sk, other))
+                       writable = 0;
+
+               unix_state_unlock(sk);
        }
 
        if (writable)
                mask |= POLLOUT | POLLWRNORM | POLLWRBAND;
        else
-               set_bit(SOCK_ASYNC_NOSPACE, &sk->sk_socket->flags);
+               sk_set_bit(SOCKWQ_ASYNC_NOSPACE, sk);
 
        return mask;
 }