tipc: change sk_buffer handling in tipc_link_xmit()
[cascardo/linux.git] / net / tipc / link.c
index eaa9fe5..ea32679 100644 (file)
@@ -132,9 +132,11 @@ static void tipc_link_put(struct tipc_link *l_ptr)
 
 static struct tipc_link *tipc_parallel_link(struct tipc_link *l)
 {
-       if (l->owner->active_links[0] != l)
-               return l->owner->active_links[0];
-       return l->owner->active_links[1];
+       struct tipc_node *n = l->owner;
+
+       if (node_active_link(n, 0) != l)
+               return node_active_link(n, 0);
+       return node_active_link(n, 1);
 }
 
 /*
@@ -147,10 +149,11 @@ int tipc_link_is_up(struct tipc_link *l_ptr)
        return link_working_working(l_ptr) || link_working_unknown(l_ptr);
 }
 
-int tipc_link_is_active(struct tipc_link *l_ptr)
+int tipc_link_is_active(struct tipc_link *l)
 {
-       return  (l_ptr->owner->active_links[0] == l_ptr) ||
-               (l_ptr->owner->active_links[1] == l_ptr);
+       struct tipc_node *n = l->owner;
+
+       return (node_active_link(n, 0) == l) || (node_active_link(n, 1) == l);
 }
 
 /**
@@ -224,7 +227,9 @@ static void link_set_timer(struct tipc_link *link, unsigned long time)
  */
 struct tipc_link *tipc_link_create(struct tipc_node *n_ptr,
                                   struct tipc_bearer *b_ptr,
-                                  const struct tipc_media_addr *media_addr)
+                                  const struct tipc_media_addr *media_addr,
+                                  struct sk_buff_head *inputq,
+                                  struct sk_buff_head *namedq)
 {
        struct tipc_net *tn = net_generic(n_ptr->net, tipc_net_id);
        struct tipc_link *l_ptr;
@@ -240,7 +245,7 @@ struct tipc_link *tipc_link_create(struct tipc_node *n_ptr,
                return NULL;
        }
 
-       if (n_ptr->links[b_ptr->identity]) {
+       if (n_ptr->links[b_ptr->identity].link) {
                tipc_addr_string_fill(addr_string, n_ptr->addr);
                pr_err("Attempt to establish second link on <%s> to %s\n",
                       b_ptr->name, addr_string);
@@ -286,8 +291,9 @@ struct tipc_link *tipc_link_create(struct tipc_node *n_ptr,
        __skb_queue_head_init(&l_ptr->backlogq);
        __skb_queue_head_init(&l_ptr->deferdq);
        skb_queue_head_init(&l_ptr->wakeupq);
-       skb_queue_head_init(&l_ptr->inputq);
-       skb_queue_head_init(&l_ptr->namedq);
+       l_ptr->inputq = inputq;
+       l_ptr->namedq = namedq;
+       skb_queue_head_init(l_ptr->inputq);
        link_reset_statistics(l_ptr);
        tipc_node_attach_link(n_ptr, l_ptr);
        setup_timer(&l_ptr->timer, link_timeout, (unsigned long)l_ptr);
@@ -321,7 +327,7 @@ void tipc_link_delete_list(struct net *net, unsigned int bearer_id)
        rcu_read_lock();
        list_for_each_entry_rcu(node, &tn->node_list, list) {
                tipc_node_lock(node);
-               link = node->links[bearer_id];
+               link = node->links[bearer_id].link;
                if (link)
                        tipc_link_delete(link);
                tipc_node_unlock(node);
@@ -334,7 +340,7 @@ void tipc_link_delete_list(struct net *net, unsigned int bearer_id)
  * @link: congested link
  * @list: message that was attempted sent
  * Create pseudo msg to send back to user when congestion abates
- * Only consumes message if there is an error
+ * Does not consume buffer list
  */
 static int link_schedule_user(struct tipc_link *link, struct sk_buff_head *list)
 {
@@ -348,7 +354,7 @@ static int link_schedule_user(struct tipc_link *link, struct sk_buff_head *list)
        if (unlikely(imp > TIPC_CRITICAL_IMPORTANCE)) {
                pr_warn("%s<%s>, send queue full", link_rst_msg, link->name);
                tipc_link_reset(link);
-               goto err;
+               return -ENOBUFS;
        }
        /* Non-blocking sender: */
        if (TIPC_SKB_CB(skb_peek(list))->wakeup_pending)
@@ -358,15 +364,12 @@ static int link_schedule_user(struct tipc_link *link, struct sk_buff_head *list)
        skb = tipc_msg_create(SOCK_WAKEUP, 0, INT_H_SIZE, 0,
                              addr, addr, oport, 0, 0);
        if (!skb)
-               goto err;
+               return -ENOBUFS;
        TIPC_SKB_CB(skb)->chain_sz = skb_queue_len(list);
        TIPC_SKB_CB(skb)->chain_imp = imp;
        skb_queue_tail(&link->wakeupq, skb);
        link->stats.link_congs++;
        return -ELINKCONG;
-err:
-       __skb_queue_purge(list);
-       return -ENOBUFS;
 }
 
 /**
@@ -388,8 +391,8 @@ void link_prepare_wakeup(struct tipc_link *l)
                if ((pnd[imp] + l->backlog[imp].len) >= lim)
                        break;
                skb_unlink(skb, &l->wakeupq);
-               skb_queue_tail(&l->inputq, skb);
-               l->owner->inputq = &l->inputq;
+               skb_queue_tail(l->inputq, skb);
+               l->owner->inputq = l->inputq;
                l->owner->action_flags |= TIPC_MSG_EVT;
        }
 }
@@ -446,7 +449,7 @@ void tipc_link_reset(struct tipc_link *l_ptr)
        if ((prev_state == RESET_UNKNOWN) || (prev_state == RESET_RESET))
                return;
 
-       tipc_node_link_down(l_ptr->owner, l_ptr);
+       tipc_node_link_down(l_ptr->owner, l_ptr->bearer_id);
        tipc_bearer_remove_dest(owner->net, l_ptr->bearer_id, l_ptr->addr);
 
        if (was_active_link && tipc_node_is_up(l_ptr->owner) && (pl != l_ptr)) {
@@ -462,7 +465,7 @@ void tipc_link_reset(struct tipc_link *l_ptr)
        __skb_queue_purge(&l_ptr->transmq);
        __skb_queue_purge(&l_ptr->deferdq);
        if (!owner->inputq)
-               owner->inputq = &l_ptr->inputq;
+               owner->inputq = l_ptr->inputq;
        skb_queue_splice_init(&l_ptr->wakeupq, owner->inputq);
        if (!skb_queue_empty(owner->inputq))
                owner->action_flags |= TIPC_MSG_EVT;
@@ -482,7 +485,7 @@ static void link_activate(struct tipc_link *link)
        link->rcv_nxt = 1;
        link->stats.recv_info = 1;
        link->silent_intv_cnt = 0;
-       tipc_node_link_up(node, link);
+       tipc_node_link_up(node, link->bearer_id);
        tipc_bearer_add_dest(node->net, link->bearer_id, link->addr);
 }
 
@@ -577,7 +580,7 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)
                case TRAFFIC_MSG_EVT:
                        break;
                case ACTIVATE_MSG:
-                       other = l_ptr->owner->active_links[0];
+                       other = node_active_link(l_ptr->owner, 0);
                        if (other && link_working_unknown(other))
                                break;
                        l_ptr->state = WORKING_WORKING;
@@ -606,7 +609,7 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)
                switch (event) {
                case TRAFFIC_MSG_EVT:
                case ACTIVATE_MSG:
-                       other = l_ptr->owner->active_links[0];
+                       other = node_active_link(l_ptr->owner, 0);
                        if (other && link_working_unknown(other))
                                break;
                        l_ptr->state = WORKING_WORKING;
@@ -635,8 +638,7 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)
  * @link: link to use
  * @list: chain of buffers containing message
  *
- * Consumes the buffer chain, except when returning -ELINKCONG,
- * since the caller then may want to make more send attempts.
+ * Consumes the buffer chain, except when returning an error code,
  * Returns 0 if success, or errno: -ELINKCONG, -EMSGSIZE or -ENOBUFS
  * Messages at TIPC_SYSTEM_IMPORTANCE are always accepted
  */
@@ -660,10 +662,9 @@ int __tipc_link_xmit(struct net *net, struct tipc_link *link,
                if (unlikely(link->backlog[i].len >= link->backlog[i].limit))
                        return link_schedule_user(link, list);
        }
-       if (unlikely(msg_size(msg) > mtu)) {
-               __skb_queue_purge(list);
+       if (unlikely(msg_size(msg) > mtu))
                return -EMSGSIZE;
-       }
+
        /* Prepare each packet for sending, and add to relevant queue: */
        while (skb_queue_len(list)) {
                skb = skb_peek(list);
@@ -716,7 +717,7 @@ static int __tipc_link_xmit_skb(struct tipc_link *link, struct sk_buff *skb)
 
 /* tipc_link_xmit_skb(): send single buffer to destination
  * Buffers sent via this functon are generally TIPC_SYSTEM_IMPORTANCE
- * messages, which will not be rejected
+ * messages, which will not cause link congestion
  * The only exception is datagram messages rerouted after secondary
  * lookup, which are rare and safe to dispose of anyway.
  * TODO: Return real return value, and let callers use
@@ -730,7 +731,7 @@ int tipc_link_xmit_skb(struct net *net, struct sk_buff *skb, u32 dnode,
 
        skb2list(skb, &head);
        rc = tipc_link_xmit(net, &head, dnode, selector);
-       if (rc == -ELINKCONG)
+       if (rc)
                kfree_skb(skb);
        return 0;
 }
@@ -742,7 +743,7 @@ int tipc_link_xmit_skb(struct net *net, struct sk_buff *skb, u32 dnode,
  * @dsz: amount of user data to be sent
  * @dnode: address of destination node
  * @selector: a number used for deterministic link selection
- * Consumes the buffer chain, except when returning -ELINKCONG
+ * Consumes the buffer chain, except when returning error
  * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE
  */
 int tipc_link_xmit(struct net *net, struct sk_buff_head *list, u32 dnode,
@@ -755,7 +756,7 @@ int tipc_link_xmit(struct net *net, struct sk_buff_head *list, u32 dnode,
        node = tipc_node_find(net, dnode);
        if (node) {
                tipc_node_lock(node);
-               link = node->active_links[selector & 1];
+               link = node_active_link(node, selector & 1);
                if (link)
                        rc = __tipc_link_xmit(net, link, list);
                tipc_node_unlock(node);
@@ -858,9 +859,9 @@ void tipc_link_reset_all(struct tipc_node *node)
                tipc_addr_string_fill(addr_string, node->addr));
 
        for (i = 0; i < MAX_BEARERS; i++) {
-               if (node->links[i]) {
-                       link_print(node->links[i], "Resetting link\n");
-                       tipc_link_reset(node->links[i]);
+               if (node->links[i].link) {
+                       link_print(node->links[i].link, "Resetting link\n");
+                       tipc_link_reset(node->links[i].link);
                }
        }
 
@@ -959,7 +960,7 @@ static bool link_synch(struct tipc_link *l)
 
        /* Is it still in the input queue ? */
        post_synch = mod(pl->rcv_nxt - l->synch_point) - 1;
-       if (skb_queue_len(&pl->inputq) > post_synch)
+       if (skb_queue_len(pl->inputq) > post_synch)
                return false;
 synched:
        l->flags &= ~LINK_SYNCHING;
@@ -1029,7 +1030,7 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b_ptr)
 
                tipc_node_lock(n_ptr);
                /* Locate unicast link endpoint that should handle message */
-               l_ptr = n_ptr->links[b_ptr->identity];
+               l_ptr = n_ptr->links[b_ptr->identity].link;
                if (unlikely(!l_ptr))
                        goto unlock;
 
@@ -1138,16 +1139,16 @@ static bool tipc_data_input(struct tipc_link *link, struct sk_buff *skb)
        case TIPC_HIGH_IMPORTANCE:
        case TIPC_CRITICAL_IMPORTANCE:
        case CONN_MANAGER:
-               if (tipc_skb_queue_tail(&link->inputq, skb, dport)) {
-                       node->inputq = &link->inputq;
+               if (tipc_skb_queue_tail(link->inputq, skb, dport)) {
+                       node->inputq = link->inputq;
                        node->action_flags |= TIPC_MSG_EVT;
                }
                return true;
        case NAME_DISTRIBUTOR:
                node->bclink.recv_permitted = true;
-               node->namedq = &link->namedq;
-               skb_queue_tail(&link->namedq, skb);
-               if (skb_queue_len(&link->namedq) == 1)
+               node->namedq = link->namedq;
+               skb_queue_tail(link->namedq, skb);
+               if (skb_queue_len(link->namedq) == 1)
                        node->action_flags |= TIPC_NAMED_MSG_EVT;
                return true;
        case MSG_BUNDLER:
@@ -1496,7 +1497,7 @@ static void tipc_link_tunnel_xmit(struct tipc_link *l_ptr,
        struct sk_buff *skb;
        u32 length = msg_size(msg);
 
-       tunnel = l_ptr->owner->active_links[selector & 1];
+       tunnel = node_active_link(l_ptr->owner, selector & 1);
        if (!tipc_link_is_up(tunnel)) {
                pr_warn("%stunnel link no longer available\n", link_co_err);
                return;
@@ -1522,7 +1523,7 @@ static void tipc_link_tunnel_xmit(struct tipc_link *l_ptr,
 void tipc_link_failover_send_queue(struct tipc_link *l_ptr)
 {
        int msgcount;
-       struct tipc_link *tunnel = l_ptr->owner->active_links[0];
+       struct tipc_link *tunnel = node_active_link(l_ptr->owner, 0);
        struct tipc_msg tunnel_hdr;
        struct sk_buff *skb;
        int split_bundles;
@@ -1556,8 +1557,8 @@ void tipc_link_failover_send_queue(struct tipc_link *l_ptr)
                return;
        }
 
-       split_bundles = (l_ptr->owner->active_links[0] !=
-                        l_ptr->owner->active_links[1]);
+       split_bundles = (node_active_link(l_ptr->owner, 0) !=
+                        node_active_link(l_ptr->owner, 0));
 
        skb_queue_walk(&l_ptr->transmq, skb) {
                struct tipc_msg *msg = buf_msg(skb);
@@ -1660,7 +1661,7 @@ static bool tipc_link_failover_rcv(struct tipc_link *link,
        if (bearer_id == link->bearer_id)
                goto exit;
 
-       pl = link->owner->links[bearer_id];
+       pl = link->owner->links[bearer_id].link;
        if (pl && tipc_link_is_up(pl))
                tipc_link_reset(pl);
 
@@ -1743,7 +1744,7 @@ static struct tipc_node *tipc_link_find_owner(struct net *net,
        list_for_each_entry_rcu(n_ptr, &tn->node_list, list) {
                tipc_node_lock(n_ptr);
                for (i = 0; i < MAX_BEARERS; i++) {
-                       l_ptr = n_ptr->links[i];
+                       l_ptr = n_ptr->links[i].link;
                        if (l_ptr && !strcmp(l_ptr->name, link_name)) {
                                *bearer_id = i;
                                found_node = n_ptr;
@@ -1865,7 +1866,7 @@ int tipc_nl_link_set(struct sk_buff *skb, struct genl_info *info)
 
        tipc_node_lock(node);
 
-       link = node->links[bearer_id];
+       link = node->links[bearer_id].link;
        if (!link) {
                res = -EINVAL;
                goto out;
@@ -2055,10 +2056,11 @@ static int __tipc_nl_add_node_links(struct net *net, struct tipc_nl_msg *msg,
        for (i = *prev_link; i < MAX_BEARERS; i++) {
                *prev_link = i;
 
-               if (!node->links[i])
+               if (!node->links[i].link)
                        continue;
 
-               err = __tipc_nl_add_link(net, msg, node->links[i], NLM_F_MULTI);
+               err = __tipc_nl_add_link(net, msg,
+                                        node->links[i].link, NLM_F_MULTI);
                if (err)
                        return err;
        }
@@ -2172,7 +2174,7 @@ int tipc_nl_link_get(struct sk_buff *skb, struct genl_info *info)
                        return -EINVAL;
 
                tipc_node_lock(node);
-               link = node->links[bearer_id];
+               link = node->links[bearer_id].link;
                if (!link) {
                        tipc_node_unlock(node);
                        nlmsg_free(msg.skb);
@@ -2227,7 +2229,7 @@ int tipc_nl_link_reset_stats(struct sk_buff *skb, struct genl_info *info)
 
        tipc_node_lock(node);
 
-       link = node->links[bearer_id];
+       link = node->links[bearer_id].link;
        if (!link) {
                tipc_node_unlock(node);
                return -EINVAL;