These changes are the raw update to linux-4.4.6-rt14. Kernel sources
[kvmfornfv.git] / kernel / ipc / msg.c
index cedbf5f..b8c5e7f 100644 (file)
@@ -76,7 +76,7 @@ struct msg_sender {
 
 static inline struct msg_queue *msq_obtain_object(struct ipc_namespace *ns, int id)
 {
-       struct kern_ipc_perm *ipcp = ipc_obtain_object(&msg_ids(ns), id);
+       struct kern_ipc_perm *ipcp = ipc_obtain_object_idr(&msg_ids(ns), id);
 
        if (IS_ERR(ipcp))
                return ERR_CAST(ipcp);
@@ -137,13 +137,6 @@ static int newque(struct ipc_namespace *ns, struct ipc_params *params)
                return retval;
        }
 
-       /* ipc_addid() locks msq upon success. */
-       id = ipc_addid(&msg_ids(ns), &msq->q_perm, ns->msg_ctlmni);
-       if (id < 0) {
-               ipc_rcu_putref(msq, msg_rcu_free);
-               return id;
-       }
-
        msq->q_stime = msq->q_rtime = 0;
        msq->q_ctime = get_seconds();
        msq->q_cbytes = msq->q_qnum = 0;
@@ -153,6 +146,13 @@ static int newque(struct ipc_namespace *ns, struct ipc_params *params)
        INIT_LIST_HEAD(&msq->q_receivers);
        INIT_LIST_HEAD(&msq->q_senders);
 
+       /* ipc_addid() locks msq upon success. */
+       id = ipc_addid(&msg_ids(ns), &msq->q_perm, ns->msg_ctlmni);
+       if (id < 0) {
+               ipc_rcu_putref(msq, msg_rcu_free);
+               return id;
+       }
+
        ipc_unlock_object(&msq->q_perm);
        rcu_read_unlock();
 
@@ -183,29 +183,15 @@ static void ss_wakeup(struct list_head *h, int kill)
        }
 }
 
-static void expunge_all(struct msg_queue *msq, int res)
+static void expunge_all(struct msg_queue *msq, int res,
+                       struct wake_q_head *wake_q)
 {
        struct msg_receiver *msr, *t;
 
        list_for_each_entry_safe(msr, t, &msq->q_receivers, r_list) {
-               /*
-                * Make sure that the wakeup doesnt preempt
-                * this CPU prematurely. (on PREEMPT_RT)
-                */
-               preempt_disable_rt();
 
-               msr->r_msg = NULL; /* initialize expunge ordering */
-               wake_up_process(msr->r_tsk);
-               /*
-                * Ensure that the wakeup is visible before setting r_msg as
-                * the receiving end depends on it: either spinning on a nil,
-                * or dealing with -EAGAIN cases. See lockless receive part 1
-                * and 2 in do_msgrcv().
-                */
-               smp_mb();
+               wake_q_add(wake_q, msr->r_tsk);
                msr->r_msg = ERR_PTR(res);
-
-               preempt_enable_rt();
        }
 }
 
@@ -221,11 +207,13 @@ static void freeque(struct ipc_namespace *ns, struct kern_ipc_perm *ipcp)
 {
        struct msg_msg *msg, *t;
        struct msg_queue *msq = container_of(ipcp, struct msg_queue, q_perm);
+       WAKE_Q(wake_q);
 
-       expunge_all(msq, -EIDRM);
+       expunge_all(msq, -EIDRM, &wake_q);
        ss_wakeup(&msq->q_senders, 1);
        msg_rmid(ns, msq);
        ipc_unlock_object(&msq->q_perm);
+       wake_up_q(&wake_q);
        rcu_read_unlock();
 
        list_for_each_entry_safe(msg, t, &msq->q_messages, m_list) {
@@ -350,6 +338,7 @@ static int msgctl_down(struct ipc_namespace *ns, int msqid, int cmd,
        struct kern_ipc_perm *ipcp;
        struct msqid64_ds uninitialized_var(msqid64);
        struct msg_queue *msq;
+       WAKE_Q(wake_q);
        int err;
 
        if (cmd == IPC_SET) {
@@ -397,7 +386,7 @@ static int msgctl_down(struct ipc_namespace *ns, int msqid, int cmd,
                /* sleeping receivers might be excluded by
                 * stricter permissions.
                 */
-               expunge_all(msq, -EAGAIN);
+               expunge_all(msq, -EAGAIN, &wake_q);
                /* sleeping senders might be able to send
                 * due to a larger queue size.
                 */
@@ -410,6 +399,7 @@ static int msgctl_down(struct ipc_namespace *ns, int msqid, int cmd,
 
 out_unlock0:
        ipc_unlock_object(&msq->q_perm);
+       wake_up_q(&wake_q);
 out_unlock1:
        rcu_read_unlock();
 out_up:
@@ -574,7 +564,8 @@ static int testmsg(struct msg_msg *msg, long type, int mode)
        return 0;
 }
 
-static inline int pipelined_send(struct msg_queue *msq, struct msg_msg *msg)
+static inline int pipelined_send(struct msg_queue *msq, struct msg_msg *msg,
+                                struct wake_q_head *wake_q)
 {
        struct msg_receiver *msr, *t;
 
@@ -582,39 +573,21 @@ static inline int pipelined_send(struct msg_queue *msq, struct msg_msg *msg)
                if (testmsg(msg, msr->r_msgtype, msr->r_mode) &&
                    !security_msg_queue_msgrcv(msq, msg, msr->r_tsk,
                                               msr->r_msgtype, msr->r_mode)) {
-                       /*
-                        * Make sure that the wakeup doesnt preempt
-                        * this CPU prematurely. (on PREEMPT_RT)
-                        */
-                       preempt_disable_rt();
 
                        list_del(&msr->r_list);
                        if (msr->r_maxsize < msg->m_ts) {
-                               /* initialize pipelined send ordering */
-                               msr->r_msg = NULL;
-                               wake_up_process(msr->r_tsk);
-                               smp_mb(); /* see barrier comment below */
+                               wake_q_add(wake_q, msr->r_tsk);
                                msr->r_msg = ERR_PTR(-E2BIG);
                        } else {
-                               msr->r_msg = NULL;
                                msq->q_lrpid = task_pid_vnr(msr->r_tsk);
                                msq->q_rtime = get_seconds();
-                               wake_up_process(msr->r_tsk);
-                               /*
-                                * Ensure that the wakeup is visible before
-                                * setting r_msg, as the receiving end depends
-                                * on it. See lockless receive part 1 and 2 in
-                                * do_msgrcv().
-                                */
-                               smp_mb();
+                               wake_q_add(wake_q, msr->r_tsk);
                                msr->r_msg = msg;
-                               preempt_enable_rt();
-
                                return 1;
                        }
-                       preempt_enable_rt();
                }
        }
+
        return 0;
 }
 
@@ -625,6 +598,7 @@ long do_msgsnd(int msqid, long mtype, void __user *mtext,
        struct msg_msg *msg;
        int err;
        struct ipc_namespace *ns;
+       WAKE_Q(wake_q);
 
        ns = current->nsproxy->ipc_ns;
 
@@ -710,7 +684,7 @@ long do_msgsnd(int msqid, long mtype, void __user *mtext,
        msq->q_lspid = task_tgid_vnr(current);
        msq->q_stime = get_seconds();
 
-       if (!pipelined_send(msq, msg)) {
+       if (!pipelined_send(msq, msg, &wake_q)) {
                /* no one is waiting for this message, enqueue it */
                list_add_tail(&msg->m_list, &msq->q_messages);
                msq->q_cbytes += msgsz;
@@ -724,6 +698,7 @@ long do_msgsnd(int msqid, long mtype, void __user *mtext,
 
 out_unlock0:
        ipc_unlock_object(&msq->q_perm);
+       wake_up_q(&wake_q);
 out_unlock1:
        rcu_read_unlock();
        if (msg != NULL)
@@ -944,31 +919,25 @@ long do_msgrcv(int msqid, void __user *buf, size_t bufsz, long msgtyp, int msgfl
                rcu_read_lock();
 
                /* Lockless receive, part 2:
-                * Wait until pipelined_send or expunge_all are outside of
-                * wake_up_process(). There is a race with exit(), see
-                * ipc/mqueue.c for the details.
+                * The work in pipelined_send() and expunge_all():
+                * - Set pointer to message
+                * - Queue the receiver task for later wakeup
+                * - Wake up the process after the lock is dropped.
+                *
+                * Should the process wake up before this wakeup (due to a
+                * signal) it will either see the message and continue …
                 */
-               msg = (struct msg_msg *)msr_d.r_msg;
-               while (msg == NULL) {
-                       cpu_relax();
-                       msg = (struct msg_msg *)msr_d.r_msg;
-               }
 
-               /* Lockless receive, part 3:
-                * If there is a message or an error then accept it without
-                * locking.
-                */
+               msg = (struct msg_msg *)msr_d.r_msg;
                if (msg != ERR_PTR(-EAGAIN))
                        goto out_unlock1;
 
-               /* Lockless receive, part 3:
-                * Acquire the queue spinlock.
-                */
+                /*
+                 * … or see -EAGAIN, acquire the lock to check the message
+                 * again.
+                 */
                ipc_lock_object(&msq->q_perm);
 
-               /* Lockless receive, part 4:
-                * Repeat test after acquiring the spinlock.
-                */
                msg = (struct msg_msg *)msr_d.r_msg;
                if (msg != ERR_PTR(-EAGAIN))
                        goto out_unlock0;