tipc: make resetting of links non-atomic
authorJon Paul Maloy <jon.maloy@ericsson.com>
Thu, 30 Jul 2015 22:24:23 +0000 (18:24 -0400)
committerDavid S. Miller <davem@davemloft.net>
Fri, 31 Jul 2015 00:25:14 +0000 (17:25 -0700)
In order to facilitate future improvements to the locking structure, we
want to make resetting and establishing of links non-atomic. I.e., the
functions tipc_node_link_up() and tipc_node_link_down() should be called
from outside the node lock context, and grab/release the node lock
themselves. This requires that we can freeze the link state from the
moment it is set to RESETTING or PEER_RESET in one lock context until
it is set to RESET or ESTABLISHING in a later context. The recently
introduced link FSM makes this possible, so we are now ready to introduce
the above change.

This commit implements this.

Tested-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
net/tipc/link.c
net/tipc/msg.h
net/tipc/node.c

index 9840b03..3a92924 100644 (file)
@@ -489,8 +489,8 @@ int tipc_link_timeout(struct tipc_link *l, struct sk_buff_head *xmitq)
                xmit = true;
                mtyp = ACTIVATE_MSG;
                break;
-       case LINK_RESETTING:
        case LINK_PEER_RESET:
+       case LINK_RESETTING:
        case LINK_FAILINGOVER:
                break;
        default:
index 115bb2a..53d98ef 100644 (file)
@@ -916,4 +916,33 @@ static inline bool __tipc_skb_queue_sorted(struct sk_buff_head *list,
        return false;
 }
 
+/* tipc_skb_queue_splice_tail - append an skb list to lock protected list
+ * @list: the new list to append. Not lock protected
+ * @head: target list. Lock protected.
+ */
+static inline void tipc_skb_queue_splice_tail(struct sk_buff_head *list,
+                                             struct sk_buff_head *head)
+{
+       spin_lock_bh(&head->lock);
+       skb_queue_splice_tail(list, head);
+       spin_unlock_bh(&head->lock);
+}
+
+/* tipc_skb_queue_splice_tail_init - merge two lock protected skb lists
+ * @list: the new list to add. Lock protected. Will be reinitialized
+ * @head: target list. Lock protected.
+ */
+static inline void tipc_skb_queue_splice_tail_init(struct sk_buff_head *list,
+                                                  struct sk_buff_head *head)
+{
+       struct sk_buff_head tmp;
+
+       __skb_queue_head_init(&tmp);
+
+       spin_lock_bh(&list->lock);
+       skb_queue_splice_tail_init(list, &tmp);
+       spin_unlock_bh(&list->lock);
+       tipc_skb_queue_splice_tail(&tmp, head);
+}
+
 #endif
index d03e88f..cdca57b 100644 (file)
@@ -66,8 +66,12 @@ enum {
        NODE_SYNCH_END_EVT      = 0xcee
 };
 
-static void tipc_node_link_down(struct tipc_node *n, int bearer_id);
-static void node_lost_contact(struct tipc_node *n_ptr);
+static void __tipc_node_link_down(struct tipc_node *n, int *bearer_id,
+                                 struct sk_buff_head *xmitq,
+                                 struct tipc_media_addr **maddr);
+static void tipc_node_link_down(struct tipc_node *n, int bearer_id,
+                               bool delete);
+static void node_lost_contact(struct tipc_node *n, struct sk_buff_head *inputq);
 static void node_established_contact(struct tipc_node *n_ptr);
 static void tipc_node_delete(struct tipc_node *node);
 static void tipc_node_timeout(unsigned long data);
@@ -275,9 +279,8 @@ void tipc_node_remove_conn(struct net *net, u32 dnode, u32 port)
 static void tipc_node_timeout(unsigned long data)
 {
        struct tipc_node *n = (struct tipc_node *)data;
+       struct tipc_link_entry *le;
        struct sk_buff_head xmitq;
-       struct tipc_link *l;
-       struct tipc_media_addr *maddr;
        int bearer_id;
        int rc = 0;
 
@@ -285,17 +288,16 @@ static void tipc_node_timeout(unsigned long data)
 
        for (bearer_id = 0; bearer_id < MAX_BEARERS; bearer_id++) {
                tipc_node_lock(n);
-               l = n->links[bearer_id].link;
-               if (l) {
+               le = &n->links[bearer_id];
+               if (le->link) {
                        /* Link tolerance may change asynchronously: */
-                       tipc_node_calculate_timer(n, l);
-                       rc = tipc_link_timeout(l, &xmitq);
-                       if (rc & TIPC_LINK_DOWN_EVT)
-                               tipc_node_link_down(n, bearer_id);
+                       tipc_node_calculate_timer(n, le->link);
+                       rc = tipc_link_timeout(le->link, &xmitq);
                }
                tipc_node_unlock(n);
-               maddr = &n->links[bearer_id].maddr;
-               tipc_bearer_xmit(n->net, bearer_id, &xmitq, maddr);
+               tipc_bearer_xmit(n->net, bearer_id, &xmitq, &le->maddr);
+               if (rc & TIPC_LINK_DOWN_EVT)
+                       tipc_node_link_down(n, bearer_id, false);
        }
        if (!mod_timer(&n->timer, jiffies + n->keepalive_intv))
                tipc_node_get(n);
@@ -303,18 +305,21 @@ static void tipc_node_timeout(unsigned long data)
 }
 
 /**
- * tipc_node_link_up - handle addition of link
- *
+ * __tipc_node_link_up - handle addition of link
+ * Node lock must be held by caller
  * Link becomes active (alone or shared) or standby, depending on its priority.
  */
-static void tipc_node_link_up(struct tipc_node *n, int bearer_id,
-                             struct sk_buff_head *xmitq)
+static void __tipc_node_link_up(struct tipc_node *n, int bearer_id,
+                               struct sk_buff_head *xmitq)
 {
        int *slot0 = &n->active_links[0];
        int *slot1 = &n->active_links[1];
        struct tipc_link *ol = node_active_link(n, 0);
        struct tipc_link *nl = n->links[bearer_id].link;
 
+       if (!nl || !tipc_link_is_up(nl))
+               return;
+
        if (n->working_links > 1) {
                pr_warn("Attempt to establish 3rd link to %x\n", n->addr);
                return;
@@ -356,28 +361,40 @@ static void tipc_node_link_up(struct tipc_node *n, int bearer_id,
 }
 
 /**
- * tipc_node_link_down - handle loss of link
+ * tipc_node_link_up - handle addition of link
+ *
+ * Link becomes active (alone or shared) or standby, depending on its priority.
  */
-static void tipc_node_link_down(struct tipc_node *n, int bearer_id)
+static void tipc_node_link_up(struct tipc_node *n, int bearer_id,
+                             struct sk_buff_head *xmitq)
 {
+       tipc_node_lock(n);
+       __tipc_node_link_up(n, bearer_id, xmitq);
+       tipc_node_unlock(n);
+}
+
+/**
+ * __tipc_node_link_down - handle loss of link
+ */
+static void __tipc_node_link_down(struct tipc_node *n, int *bearer_id,
+                                 struct sk_buff_head *xmitq,
+                                 struct tipc_media_addr **maddr)
+{
+       struct tipc_link_entry *le = &n->links[*bearer_id];
        int *slot0 = &n->active_links[0];
        int *slot1 = &n->active_links[1];
-       struct tipc_media_addr *maddr = &n->links[bearer_id].maddr;
        int i, highest = 0;
        struct tipc_link *l, *_l, *tnl;
-       struct sk_buff_head xmitq;
 
-       l = n->links[bearer_id].link;
+       l = n->links[*bearer_id].link;
        if (!l || tipc_link_is_reset(l))
                return;
 
-       __skb_queue_head_init(&xmitq);
-
        n->working_links--;
        n->action_flags |= TIPC_NOTIFY_LINK_DOWN;
-       n->link_id = l->peer_bearer_id << 16 | bearer_id;
+       n->link_id = l->peer_bearer_id << 16 | *bearer_id;
 
-       tipc_bearer_remove_dest(n->net, l->bearer_id, n->addr);
+       tipc_bearer_remove_dest(n->net, *bearer_id, n->addr);
 
        pr_debug("Lost link <%s> on network plane %c\n",
                 l->name, l->net_plane);
@@ -404,18 +421,40 @@ static void tipc_node_link_down(struct tipc_node *n, int bearer_id)
 
        if (!tipc_node_is_up(n)) {
                tipc_link_reset(l);
-               node_lost_contact(n);
+               node_lost_contact(n, &le->inputq);
                return;
        }
 
        /* There is still a working link => initiate failover */
        tnl = node_active_link(n, 0);
-       tipc_node_fsm_evt(n, NODE_FAILOVER_BEGIN_EVT);
        n->sync_point = tnl->rcv_nxt + (U16_MAX / 2 - 1);
-       tipc_link_tnl_prepare(l, tnl, FAILOVER_MSG, &xmitq);
+       tipc_link_tnl_prepare(l, tnl, FAILOVER_MSG, xmitq);
        tipc_link_reset(l);
        tipc_link_fsm_evt(l, LINK_FAILOVER_BEGIN_EVT);
-       tipc_bearer_xmit(n->net, tnl->bearer_id, &xmitq, maddr);
+       tipc_node_fsm_evt(n, NODE_FAILOVER_BEGIN_EVT);
+       *maddr = &n->links[tnl->bearer_id].maddr;
+       *bearer_id = tnl->bearer_id;
+}
+
+static void tipc_node_link_down(struct tipc_node *n, int bearer_id, bool delete)
+{
+       struct tipc_link_entry *le = &n->links[bearer_id];
+       struct tipc_media_addr *maddr;
+       struct sk_buff_head xmitq;
+
+       __skb_queue_head_init(&xmitq);
+
+       tipc_node_lock(n);
+       __tipc_node_link_down(n, &bearer_id, &xmitq, &maddr);
+       if (delete && le->link) {
+               kfree(le->link);
+               le->link = NULL;
+               n->link_cnt--;
+       }
+       tipc_node_unlock(n);
+
+       tipc_bearer_xmit(n->net, bearer_id, &xmitq, maddr);
+       tipc_sk_rcv(n->net, &le->inputq);
 }
 
 bool tipc_node_is_up(struct tipc_node *n)
@@ -437,7 +476,7 @@ void tipc_node_check_dest(struct net *net, u32 onode,
        bool sign_match = false;
        bool link_up = false;
        bool accept_addr = false;
-
+       bool reset = true;
        *dupl_addr = false;
        *respond = false;
 
@@ -460,6 +499,7 @@ void tipc_node_check_dest(struct net *net, u32 onode,
 
        if (sign_match && addr_match && link_up) {
                /* All is fine. Do nothing. */
+               reset = false;
        } else if (sign_match && addr_match && !link_up) {
                /* Respond. The link will come up in due time */
                *respond = true;
@@ -531,29 +571,21 @@ void tipc_node_check_dest(struct net *net, u32 onode,
        }
        memcpy(&l->media_addr, maddr, sizeof(*maddr));
        memcpy(curr_maddr, maddr, sizeof(*maddr));
-       tipc_node_link_down(n, b->identity);
 exit:
        tipc_node_unlock(n);
+       if (reset)
+               tipc_node_link_down(n, b->identity, false);
        tipc_node_put(n);
 }
 
 void tipc_node_delete_links(struct net *net, int bearer_id)
 {
        struct tipc_net *tn = net_generic(net, tipc_net_id);
-       struct tipc_link *l;
        struct tipc_node *n;
 
        rcu_read_lock();
        list_for_each_entry_rcu(n, &tn->node_list, list) {
-               tipc_node_lock(n);
-               l = n->links[bearer_id].link;
-               if (l) {
-                       tipc_node_link_down(n, bearer_id);
-                       n->links[bearer_id].link = NULL;
-                       n->link_cnt--;
-               }
-               tipc_node_unlock(n);
-               kfree(l);
+               tipc_node_link_down(n, bearer_id, true);
        }
        rcu_read_unlock();
 }
@@ -561,19 +593,14 @@ void tipc_node_delete_links(struct net *net, int bearer_id)
 static void tipc_node_reset_links(struct tipc_node *n)
 {
        char addr_string[16];
-       u32 i;
-
-       tipc_node_lock(n);
+       int i;
 
        pr_warn("Resetting all links to %s\n",
                tipc_addr_string_fill(addr_string, n->addr));
 
        for (i = 0; i < MAX_BEARERS; i++) {
-               if (!n->links[i].link)
-                       continue;
-               tipc_node_link_down(n, i);
+               tipc_node_link_down(n, i, false);
        }
-       tipc_node_unlock(n);
 }
 
 void tipc_node_attach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr)
@@ -798,10 +825,12 @@ static void node_established_contact(struct tipc_node *n_ptr)
        tipc_bclink_add_node(n_ptr->net, n_ptr->addr);
 }
 
-static void node_lost_contact(struct tipc_node *n_ptr)
+static void node_lost_contact(struct tipc_node *n_ptr,
+                             struct sk_buff_head *inputq)
 {
        char addr_string[16];
        struct tipc_sock_conn *conn, *safe;
+       struct tipc_link *l;
        struct list_head *conns = &n_ptr->conn_sks;
        struct sk_buff *skb;
        struct tipc_net *tn = net_generic(n_ptr->net, tipc_net_id);
@@ -827,14 +856,11 @@ static void node_lost_contact(struct tipc_node *n_ptr)
 
        /* Abort any ongoing link failover */
        for (i = 0; i < MAX_BEARERS; i++) {
-               struct tipc_link *l_ptr = n_ptr->links[i].link;
-               if (!l_ptr)
-                       continue;
-               tipc_link_fsm_evt(l_ptr, LINK_FAILOVER_END_EVT);
-               kfree_skb(l_ptr->failover_reasm_skb);
-               l_ptr->failover_reasm_skb = NULL;
-               tipc_link_reset_fragments(l_ptr);
+               l = n_ptr->links[i].link;
+               if (l)
+                       tipc_link_fsm_evt(l, LINK_FAILOVER_END_EVT);
        }
+
        /* Prevent re-contact with node until cleanup is done */
        tipc_node_fsm_evt(n_ptr, SELF_LOST_CONTACT_EVT);
 
@@ -848,7 +874,7 @@ static void node_lost_contact(struct tipc_node *n_ptr)
                                      conn->peer_node, conn->port,
                                      conn->peer_port, TIPC_ERR_NO_NODE);
                if (likely(skb)) {
-                       skb_queue_tail(n_ptr->inputq, skb);
+                       skb_queue_tail(inputq, skb);
                        n_ptr->action_flags |= TIPC_MSG_EVT;
                }
                list_del(&conn->list);
@@ -1025,9 +1051,9 @@ int tipc_node_xmit(struct net *net, struct sk_buff_head *list,
                l = tipc_node_select_link(n, selector, &bearer_id, &maddr);
                if (likely(l))
                        rc = tipc_link_xmit(l, list, &xmitq);
-               if (unlikely(rc == -ENOBUFS))
-                       tipc_node_link_down(n, bearer_id);
                tipc_node_unlock(n);
+               if (unlikely(rc == -ENOBUFS))
+                       tipc_node_link_down(n, bearer_id, false);
                tipc_node_put(n);
        }
        if (likely(!rc)) {
@@ -1081,8 +1107,8 @@ static bool tipc_node_check_state(struct tipc_node *n, struct sk_buff *skb,
        u16 rcv_nxt, syncpt, dlv_nxt;
        int state = n->state;
        struct tipc_link *l, *pl = NULL;
-       struct sk_buff_head;
-       int i;
+       struct tipc_media_addr *maddr;
+       int i, pb_id;
 
        l = n->links[bearer_id].link;
        if (!l)
@@ -1123,9 +1149,11 @@ static bool tipc_node_check_state(struct tipc_node *n, struct sk_buff *skb,
        /* Initiate or update failover mode if applicable */
        if ((usr == TUNNEL_PROTOCOL) && (mtyp == FAILOVER_MSG)) {
                syncpt = oseqno + exp_pkts - 1;
-               if (pl && tipc_link_is_up(pl))
-                       tipc_node_link_down(n, pl->bearer_id);
-
+               if (pl && tipc_link_is_up(pl)) {
+                       pb_id = pl->bearer_id;
+                       __tipc_node_link_down(n, &pb_id, xmitq, &maddr);
+                       tipc_skb_queue_splice_tail_init(pl->inputq, l->inputq);
+               }
                /* If pkts arrive out of order, use lowest calculated syncpt */
                if (less(syncpt, n->sync_point))
                        n->sync_point = syncpt;
@@ -1146,7 +1174,7 @@ static bool tipc_node_check_state(struct tipc_node *n, struct sk_buff *skb,
                syncpt = iseqno + exp_pkts - 1;
                if (!tipc_link_is_up(l)) {
                        tipc_link_fsm_evt(l, LINK_ESTABLISH_EVT);
-                       tipc_node_link_up(n, bearer_id, xmitq);
+                       __tipc_node_link_up(n, bearer_id, xmitq);
                }
                if (n->state == SELF_UP_PEER_UP) {
                        n->sync_point = syncpt;
@@ -1224,7 +1252,7 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b)
        if (unlikely(msg_user(hdr) == LINK_PROTOCOL))
                tipc_bclink_sync_state(n, hdr);
 
-       /* Release acked broadcast messages */
+       /* Release acked broadcast packets */
        if (unlikely(n->bclink.acked != msg_bcast_ack(hdr)))
                tipc_bclink_acknowledge(n, msg_bcast_ack(hdr));
 
@@ -1233,14 +1261,14 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b)
                rc = tipc_link_rcv(le->link, skb, &xmitq);
                skb = NULL;
        }
+unlock:
+       tipc_node_unlock(n);
 
        if (unlikely(rc & TIPC_LINK_UP_EVT))
                tipc_node_link_up(n, bearer_id, &xmitq);
 
        if (unlikely(rc & TIPC_LINK_DOWN_EVT))
-               tipc_node_link_down(n, bearer_id);
-unlock:
-       tipc_node_unlock(n);
+               tipc_node_link_down(n, bearer_id, false);
 
        if (!skb_queue_empty(&le->inputq))
                tipc_sk_rcv(net, &le->inputq);