These changes are the raw update to linux-4.4.6-rt14. Kernel sources
[kvmfornfv.git] / kernel / net / tipc / msg.c
index c3e96e8..8740930 100644 (file)
@@ -121,7 +121,7 @@ int tipc_buf_append(struct sk_buff **headbuf, struct sk_buff **buf)
 {
        struct sk_buff *head = *headbuf;
        struct sk_buff *frag = *buf;
-       struct sk_buff *tail;
+       struct sk_buff *tail = NULL;
        struct tipc_msg *msg;
        u32 fragid;
        int delta;
@@ -141,9 +141,15 @@ int tipc_buf_append(struct sk_buff **headbuf, struct sk_buff **buf)
                if (unlikely(skb_unclone(frag, GFP_ATOMIC)))
                        goto err;
                head = *headbuf = frag;
-               skb_frag_list_init(head);
-               TIPC_SKB_CB(head)->tail = NULL;
                *buf = NULL;
+               TIPC_SKB_CB(head)->tail = NULL;
+               if (skb_is_nonlinear(head)) {
+                       skb_walk_frags(head, tail) {
+                               TIPC_SKB_CB(head)->tail = tail;
+                       }
+               } else {
+                       skb_frag_list_init(head);
+               }
                return 0;
        }
 
@@ -176,7 +182,6 @@ int tipc_buf_append(struct sk_buff **headbuf, struct sk_buff **buf)
        *buf = NULL;
        return 0;
 err:
-       pr_warn_ratelimited("Unable to build fragment list\n");
        kfree_skb(*buf);
        kfree_skb(*headbuf);
        *buf = *headbuf = NULL;
@@ -331,16 +336,15 @@ error:
 
 /**
  * tipc_msg_bundle(): Append contents of a buffer to tail of an existing one
- * @bskb: the buffer to append to ("bundle")
- * @skb:  buffer to be appended
+ * @skb: the buffer to append to ("bundle")
+ * @msg:  message to be appended
  * @mtu:  max allowable size for the bundle buffer
  * Consumes buffer if successful
  * Returns true if bundling could be performed, otherwise false
  */
-bool tipc_msg_bundle(struct sk_buff *bskb, struct sk_buff *skb, u32 mtu)
+bool tipc_msg_bundle(struct sk_buff *skb, struct tipc_msg *msg, u32 mtu)
 {
        struct tipc_msg *bmsg;
-       struct tipc_msg *msg = buf_msg(skb);
        unsigned int bsz;
        unsigned int msz = msg_size(msg);
        u32 start, pad;
@@ -348,9 +352,9 @@ bool tipc_msg_bundle(struct sk_buff *bskb, struct sk_buff *skb, u32 mtu)
 
        if (likely(msg_user(msg) == MSG_FRAGMENTER))
                return false;
-       if (!bskb)
+       if (!skb)
                return false;
-       bmsg = buf_msg(bskb);
+       bmsg = buf_msg(skb);
        bsz = msg_size(bmsg);
        start = align(bsz);
        pad = start - bsz;
@@ -359,18 +363,20 @@ bool tipc_msg_bundle(struct sk_buff *bskb, struct sk_buff *skb, u32 mtu)
                return false;
        if (unlikely(msg_user(msg) == BCAST_PROTOCOL))
                return false;
-       if (likely(msg_user(bmsg) != MSG_BUNDLER))
+       if (unlikely(msg_user(bmsg) != MSG_BUNDLER))
                return false;
-       if (unlikely(skb_tailroom(bskb) < (pad + msz)))
+       if (unlikely(skb_tailroom(skb) < (pad + msz)))
                return false;
        if (unlikely(max < (start + msz)))
                return false;
+       if ((msg_importance(msg) < TIPC_SYSTEM_IMPORTANCE) &&
+           (msg_importance(bmsg) == TIPC_SYSTEM_IMPORTANCE))
+               return false;
 
-       skb_put(bskb, pad + msz);
-       skb_copy_to_linear_data_offset(bskb, start, skb->data, msz);
+       skb_put(skb, pad + msz);
+       skb_copy_to_linear_data_offset(skb, start, msg, msz);
        msg_set_size(bmsg, start + msz);
        msg_set_msgcnt(bmsg, msg_msgcnt(bmsg) + 1);
-       kfree_skb(skb);
        return true;
 }
 
@@ -416,18 +422,18 @@ none:
 
 /**
  * tipc_msg_make_bundle(): Create bundle buf and append message to its tail
- * @list: the buffer chain
- * @skb: buffer to be appended and replaced
+ * @list: the buffer chain, where head is the buffer to replace/append
+ * @skb: buffer to be created, appended to and returned in case of success
+ * @msg: message to be appended
  * @mtu: max allowable size for the bundle buffer, inclusive header
  * @dnode: destination node for message. (Not always present in header)
- * Replaces buffer if successful
  * Returns true if success, otherwise false
  */
-bool tipc_msg_make_bundle(struct sk_buff **skb, u32 mtu, u32 dnode)
+bool tipc_msg_make_bundle(struct sk_buff **skb,  struct tipc_msg *msg,
+                         u32 mtu, u32 dnode)
 {
-       struct sk_buff *bskb;
+       struct sk_buff *_skb;
        struct tipc_msg *bmsg;
-       struct tipc_msg *msg = buf_msg(*skb);
        u32 msz = msg_size(msg);
        u32 max = mtu - INT_H_SIZE;
 
@@ -440,78 +446,94 @@ bool tipc_msg_make_bundle(struct sk_buff **skb, u32 mtu, u32 dnode)
        if (msz > (max / 2))
                return false;
 
-       bskb = tipc_buf_acquire(max);
-       if (!bskb)
+       _skb = tipc_buf_acquire(max);
+       if (!_skb)
                return false;
 
-       skb_trim(bskb, INT_H_SIZE);
-       bmsg = buf_msg(bskb);
+       skb_trim(_skb, INT_H_SIZE);
+       bmsg = buf_msg(_skb);
        tipc_msg_init(msg_prevnode(msg), bmsg, MSG_BUNDLER, 0,
                      INT_H_SIZE, dnode);
+       if (msg_isdata(msg))
+               msg_set_importance(bmsg, TIPC_CRITICAL_IMPORTANCE);
+       else
+               msg_set_importance(bmsg, TIPC_SYSTEM_IMPORTANCE);
        msg_set_seqno(bmsg, msg_seqno(msg));
        msg_set_ack(bmsg, msg_ack(msg));
        msg_set_bcast_ack(bmsg, msg_bcast_ack(msg));
-       tipc_msg_bundle(bskb, *skb, mtu);
-       *skb = bskb;
+       tipc_msg_bundle(_skb, msg, mtu);
+       *skb = _skb;
        return true;
 }
 
 /**
  * tipc_msg_reverse(): swap source and destination addresses and add error code
- * @buf:  buffer containing message to be reversed
- * @dnode: return value: node where to send message after reversal
- * @err:  error code to be set in message
- * Consumes buffer if failure
+ * @own_node: originating node id for reversed message
+ * @skb:  buffer containing message to be reversed; may be replaced.
+ * @err:  error code to be set in message, if any
+ * Consumes buffer at failure
  * Returns true if success, otherwise false
  */
-bool tipc_msg_reverse(u32 own_addr,  struct sk_buff *buf, u32 *dnode,
-                     int err)
+bool tipc_msg_reverse(u32 own_node,  struct sk_buff **skb, int err)
 {
-       struct tipc_msg *msg = buf_msg(buf);
+       struct sk_buff *_skb = *skb;
+       struct tipc_msg *hdr = buf_msg(_skb);
        struct tipc_msg ohdr;
-       uint rdsz = min_t(uint, msg_data_sz(msg), MAX_FORWARD_SIZE);
+       int dlen = min_t(uint, msg_data_sz(hdr), MAX_FORWARD_SIZE);
 
-       if (skb_linearize(buf))
+       if (skb_linearize(_skb))
                goto exit;
-       msg = buf_msg(buf);
-       if (msg_dest_droppable(msg))
+       hdr = buf_msg(_skb);
+       if (msg_dest_droppable(hdr))
                goto exit;
-       if (msg_errcode(msg))
+       if (msg_errcode(hdr))
                goto exit;
-       memcpy(&ohdr, msg, msg_hdr_sz(msg));
-       msg_set_errcode(msg, err);
-       msg_set_origport(msg, msg_destport(&ohdr));
-       msg_set_destport(msg, msg_origport(&ohdr));
-       msg_set_prevnode(msg, own_addr);
-       if (!msg_short(msg)) {
-               msg_set_orignode(msg, msg_destnode(&ohdr));
-               msg_set_destnode(msg, msg_orignode(&ohdr));
+
+       /* Take a copy of original header before altering message */
+       memcpy(&ohdr, hdr, msg_hdr_sz(hdr));
+
+       /* Never return SHORT header; expand by replacing buffer if necessary */
+       if (msg_short(hdr)) {
+               *skb = tipc_buf_acquire(BASIC_H_SIZE + dlen);
+               if (!*skb)
+                       goto exit;
+               memcpy((*skb)->data + BASIC_H_SIZE, msg_data(hdr), dlen);
+               kfree_skb(_skb);
+               _skb = *skb;
+               hdr = buf_msg(_skb);
+               memcpy(hdr, &ohdr, BASIC_H_SIZE);
+               msg_set_hdr_sz(hdr, BASIC_H_SIZE);
        }
-       msg_set_size(msg, msg_hdr_sz(msg) + rdsz);
-       skb_trim(buf, msg_size(msg));
-       skb_orphan(buf);
-       *dnode = msg_orignode(&ohdr);
+
+       /* Now reverse the concerned fields */
+       msg_set_errcode(hdr, err);
+       msg_set_origport(hdr, msg_destport(&ohdr));
+       msg_set_destport(hdr, msg_origport(&ohdr));
+       msg_set_destnode(hdr, msg_prevnode(&ohdr));
+       msg_set_prevnode(hdr, own_node);
+       msg_set_orignode(hdr, own_node);
+       msg_set_size(hdr, msg_hdr_sz(hdr) + dlen);
+       skb_trim(_skb, msg_size(hdr));
+       skb_orphan(_skb);
        return true;
 exit:
-       kfree_skb(buf);
-       *dnode = 0;
+       kfree_skb(_skb);
+       *skb = NULL;
        return false;
 }
 
 /**
  * tipc_msg_lookup_dest(): try to find new destination for named message
  * @skb: the buffer containing the message.
- * @dnode: return value: next-hop node, if destination found
- * @err: return value: error code to use, if message to be rejected
+ * @err: error code to be used by caller if lookup fails
  * Does not consume buffer
  * Returns true if a destination is found, false otherwise
  */
-bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb,
-                         u32 *dnode, int *err)
+bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb, int *err)
 {
        struct tipc_msg *msg = buf_msg(skb);
-       u32 dport;
-       u32 own_addr = tipc_own_addr(net);
+       u32 dport, dnode;
+       u32 onode = tipc_own_addr(net);
 
        if (!msg_isdata(msg))
                return false;
@@ -522,17 +544,18 @@ bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb,
        *err = -TIPC_ERR_NO_NAME;
        if (skb_linearize(skb))
                return false;
+       msg = buf_msg(skb);
        if (msg_reroute_cnt(msg))
                return false;
-       *dnode = addr_domain(net, msg_lookup_scope(msg));
+       dnode = addr_domain(net, msg_lookup_scope(msg));
        dport = tipc_nametbl_translate(net, msg_nametype(msg),
-                                      msg_nameinst(msg), dnode);
+                                      msg_nameinst(msg), &dnode);
        if (!dport)
                return false;
        msg_incr_reroute_cnt(msg);
-       if (*dnode != own_addr)
-               msg_set_prevnode(msg, own_addr);
-       msg_set_destnode(msg, *dnode);
+       if (dnode != onode)
+               msg_set_prevnode(msg, onode);
+       msg_set_destnode(msg, dnode);
        msg_set_destport(msg, dport);
        *err = TIPC_OK;
        return true;
@@ -541,18 +564,22 @@ bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb,
 /* tipc_msg_reassemble() - clone a buffer chain of fragments and
  *                         reassemble the clones into one message
  */
-struct sk_buff *tipc_msg_reassemble(struct sk_buff_head *list)
+bool tipc_msg_reassemble(struct sk_buff_head *list, struct sk_buff_head *rcvq)
 {
-       struct sk_buff *skb;
+       struct sk_buff *skb, *_skb;
        struct sk_buff *frag = NULL;
        struct sk_buff *head = NULL;
-       int hdr_sz;
+       int hdr_len;
 
        /* Copy header if single buffer */
        if (skb_queue_len(list) == 1) {
                skb = skb_peek(list);
-               hdr_sz = skb_headroom(skb) + msg_hdr_sz(buf_msg(skb));
-               return __pskb_copy(skb, hdr_sz, GFP_ATOMIC);
+               hdr_len = skb_headroom(skb) + msg_hdr_sz(buf_msg(skb));
+               _skb = __pskb_copy(skb, hdr_len, GFP_ATOMIC);
+               if (!_skb)
+                       return false;
+               __skb_queue_tail(rcvq, _skb);
+               return true;
        }
 
        /* Clone all fragments and reassemble */
@@ -566,9 +593,41 @@ struct sk_buff *tipc_msg_reassemble(struct sk_buff_head *list)
                if (!head)
                        goto error;
        }
-       return frag;
+       __skb_queue_tail(rcvq, frag);
+       return true;
 error:
        pr_warn("Failed do clone local mcast rcv buffer\n");
        kfree_skb(head);
-       return NULL;
+       return false;
+}
+
+/* tipc_skb_queue_sorted(); sort pkt into list according to sequence number
+ * @list: list to be appended to
+ * @seqno: sequence number of buffer to add
+ * @skb: buffer to add
+ */
+void __tipc_skb_queue_sorted(struct sk_buff_head *list, u16 seqno,
+                            struct sk_buff *skb)
+{
+       struct sk_buff *_skb, *tmp;
+
+       if (skb_queue_empty(list) || less(seqno, buf_seqno(skb_peek(list)))) {
+               __skb_queue_head(list, skb);
+               return;
+       }
+
+       if (more(seqno, buf_seqno(skb_peek_tail(list)))) {
+               __skb_queue_tail(list, skb);
+               return;
+       }
+
+       skb_queue_walk_safe(list, _skb, tmp) {
+               if (more(seqno, buf_seqno(_skb)))
+                       continue;
+               if (seqno == buf_seqno(_skb))
+                       break;
+               __skb_queue_before(list, _skb, skb);
+               return;
+       }
+       kfree_skb(skb);
 }