tipc: convert node lock to rwlock
[cascardo/linux.git] / net / tipc / node.c
index 20cddec..47d5f84 100644 (file)
@@ -141,10 +141,63 @@ struct tipc_node *tipc_node_find(struct net *net, u32 addr)
        return NULL;
 }
 
+void tipc_node_read_lock(struct tipc_node *n)
+{
+       read_lock_bh(&n->lock);
+}
+
+void tipc_node_read_unlock(struct tipc_node *n)
+{
+       read_unlock_bh(&n->lock);
+}
+
+static void tipc_node_write_lock(struct tipc_node *n)
+{
+       write_lock_bh(&n->lock);
+}
+
+static void tipc_node_write_unlock(struct tipc_node *n)
+{
+       struct net *net = n->net;
+       u32 addr = 0;
+       u32 flags = n->action_flags;
+       u32 link_id = 0;
+       struct list_head *publ_list;
+
+       if (likely(!flags)) {
+               write_unlock_bh(&n->lock);
+               return;
+       }
+
+       addr = n->addr;
+       link_id = n->link_id;
+       publ_list = &n->publ_list;
+
+       n->action_flags &= ~(TIPC_NOTIFY_NODE_DOWN | TIPC_NOTIFY_NODE_UP |
+                            TIPC_NOTIFY_LINK_DOWN | TIPC_NOTIFY_LINK_UP);
+
+       write_unlock_bh(&n->lock);
+
+       if (flags & TIPC_NOTIFY_NODE_DOWN)
+               tipc_publ_notify(net, publ_list, addr);
+
+       if (flags & TIPC_NOTIFY_NODE_UP)
+               tipc_named_node_up(net, addr);
+
+       if (flags & TIPC_NOTIFY_LINK_UP)
+               tipc_nametbl_publish(net, TIPC_LINK_STATE, addr, addr,
+                                    TIPC_NODE_SCOPE, link_id, addr);
+
+       if (flags & TIPC_NOTIFY_LINK_DOWN)
+               tipc_nametbl_withdraw(net, TIPC_LINK_STATE, addr,
+                                     link_id, addr);
+}
+
 struct tipc_node *tipc_node_create(struct net *net, u32 addr, u16 capabilities)
 {
        struct tipc_net *tn = net_generic(net, tipc_net_id);
        struct tipc_node *n_ptr, *temp_node;
+       int i;
 
        spin_lock_bh(&tn->node_list_lock);
        n_ptr = tipc_node_find(net, addr);
@@ -159,7 +212,7 @@ struct tipc_node *tipc_node_create(struct net *net, u32 addr, u16 capabilities)
        n_ptr->net = net;
        n_ptr->capabilities = capabilities;
        kref_init(&n_ptr->kref);
-       spin_lock_init(&n_ptr->lock);
+       rwlock_init(&n_ptr->lock);
        INIT_HLIST_NODE(&n_ptr->hash);
        INIT_LIST_HEAD(&n_ptr->list);
        INIT_LIST_HEAD(&n_ptr->publ_list);
@@ -168,6 +221,8 @@ struct tipc_node *tipc_node_create(struct net *net, u32 addr, u16 capabilities)
        skb_queue_head_init(&n_ptr->bc_entry.inputq1);
        __skb_queue_head_init(&n_ptr->bc_entry.arrvq);
        skb_queue_head_init(&n_ptr->bc_entry.inputq2);
+       for (i = 0; i < MAX_BEARERS; i++)
+               spin_lock_init(&n_ptr->links[i].lock);
        hlist_add_head_rcu(&n_ptr->hash, &tn->node_htable[tipc_hashfn(addr)]);
        list_for_each_entry_rcu(temp_node, &tn->node_list, list) {
                if (n_ptr->addr < temp_node->addr)
@@ -234,6 +289,42 @@ void tipc_node_stop(struct net *net)
        spin_unlock_bh(&tn->node_list_lock);
 }
 
+void tipc_node_subscribe(struct net *net, struct list_head *subscr, u32 addr)
+{
+       struct tipc_node *n;
+
+       if (in_own_node(net, addr))
+               return;
+
+       n = tipc_node_find(net, addr);
+       if (!n) {
+               pr_warn("Node subscribe rejected, unknown node 0x%x\n", addr);
+               return;
+       }
+       tipc_node_write_lock(n);
+       list_add_tail(subscr, &n->publ_list);
+       tipc_node_write_unlock(n);
+       tipc_node_put(n);
+}
+
+void tipc_node_unsubscribe(struct net *net, struct list_head *subscr, u32 addr)
+{
+       struct tipc_node *n;
+
+       if (in_own_node(net, addr))
+               return;
+
+       n = tipc_node_find(net, addr);
+       if (!n) {
+               pr_warn("Node unsubscribe rejected, unknown node 0x%x\n", addr);
+               return;
+       }
+       tipc_node_write_lock(n);
+       list_del_init(subscr);
+       tipc_node_write_unlock(n);
+       tipc_node_put(n);
+}
+
 int tipc_node_add_conn(struct net *net, u32 dnode, u32 port, u32 peer_port)
 {
        struct tipc_node *node;
@@ -257,9 +348,9 @@ int tipc_node_add_conn(struct net *net, u32 dnode, u32 port, u32 peer_port)
        conn->port = port;
        conn->peer_port = peer_port;
 
-       tipc_node_lock(node);
+       tipc_node_write_lock(node);
        list_add_tail(&conn->list, &node->conn_sks);
-       tipc_node_unlock(node);
+       tipc_node_write_unlock(node);
 exit:
        tipc_node_put(node);
        return err;
@@ -277,14 +368,14 @@ void tipc_node_remove_conn(struct net *net, u32 dnode, u32 port)
        if (!node)
                return;
 
-       tipc_node_lock(node);
+       tipc_node_write_lock(node);
        list_for_each_entry_safe(conn, safe, &node->conn_sks, list) {
                if (port != conn->port)
                        continue;
                list_del(&conn->list);
                kfree(conn);
        }
-       tipc_node_unlock(node);
+       tipc_node_write_unlock(node);
        tipc_node_put(node);
 }
 
@@ -301,14 +392,16 @@ static void tipc_node_timeout(unsigned long data)
        __skb_queue_head_init(&xmitq);
 
        for (bearer_id = 0; bearer_id < MAX_BEARERS; bearer_id++) {
-               tipc_node_lock(n);
+               tipc_node_read_lock(n);
                le = &n->links[bearer_id];
+               spin_lock_bh(&le->lock);
                if (le->link) {
                        /* Link tolerance may change asynchronously: */
                        tipc_node_calculate_timer(n, le->link);
                        rc = tipc_link_timeout(le->link, &xmitq);
                }
-               tipc_node_unlock(n);
+               spin_unlock_bh(&le->lock);
+               tipc_node_read_unlock(n);
                tipc_bearer_xmit(n->net, bearer_id, &xmitq, &le->maddr);
                if (rc & TIPC_LINK_DOWN_EVT)
                        tipc_node_link_down(n, bearer_id, false);
@@ -387,9 +480,9 @@ static void __tipc_node_link_up(struct tipc_node *n, int bearer_id,
 static void tipc_node_link_up(struct tipc_node *n, int bearer_id,
                              struct sk_buff_head *xmitq)
 {
-       tipc_node_lock(n);
+       tipc_node_write_lock(n);
        __tipc_node_link_up(n, bearer_id, xmitq);
-       tipc_node_unlock(n);
+       tipc_node_write_unlock(n);
 }
 
 /**
@@ -478,7 +571,7 @@ static void tipc_node_link_down(struct tipc_node *n, int bearer_id, bool delete)
 
        __skb_queue_head_init(&xmitq);
 
-       tipc_node_lock(n);
+       tipc_node_write_lock(n);
        if (!tipc_link_is_establishing(l)) {
                __tipc_node_link_down(n, &bearer_id, &xmitq, &maddr);
                if (delete) {
@@ -490,7 +583,7 @@ static void tipc_node_link_down(struct tipc_node *n, int bearer_id, bool delete)
                /* Defuse pending tipc_node_link_up() */
                tipc_link_fsm_evt(l, LINK_RESET_EVT);
        }
-       tipc_node_unlock(n);
+       tipc_node_write_unlock(n);
        tipc_bearer_xmit(n->net, bearer_id, &xmitq, maddr);
        tipc_sk_rcv(n->net, &le->inputq);
 }
@@ -523,7 +616,7 @@ void tipc_node_check_dest(struct net *net, u32 onode,
        if (!n)
                return;
 
-       tipc_node_lock(n);
+       tipc_node_write_lock(n);
 
        le = &n->links[b->identity];
 
@@ -626,7 +719,7 @@ void tipc_node_check_dest(struct net *net, u32 onode,
        }
        memcpy(&le->maddr, maddr, sizeof(*maddr));
 exit:
-       tipc_node_unlock(n);
+       tipc_node_write_unlock(n);
        if (reset && !tipc_link_is_reset(l))
                tipc_node_link_down(n, b->identity, false);
        tipc_node_put(n);
@@ -834,24 +927,6 @@ illegal_evt:
        pr_err("Illegal node fsm evt %x in state %x\n", evt, state);
 }
 
-bool tipc_node_filter_pkt(struct tipc_node *n, struct tipc_msg *hdr)
-{
-       int state = n->state;
-
-       if (likely(state == SELF_UP_PEER_UP))
-               return true;
-
-       if (state == SELF_LEAVING_PEER_DOWN)
-               return false;
-
-       if (state == SELF_DOWN_PEER_LEAVING) {
-               if (msg_peer_node_is_up(hdr))
-                       return false;
-       }
-
-       return true;
-}
-
 static void node_lost_contact(struct tipc_node *n,
                              struct sk_buff_head *inputq)
 {
@@ -913,56 +988,18 @@ int tipc_node_get_linkname(struct net *net, u32 bearer_id, u32 addr,
        if (bearer_id >= MAX_BEARERS)
                goto exit;
 
-       tipc_node_lock(node);
+       tipc_node_read_lock(node);
        link = node->links[bearer_id].link;
        if (link) {
                strncpy(linkname, link->name, len);
                err = 0;
        }
 exit:
-       tipc_node_unlock(node);
+       tipc_node_read_unlock(node);
        tipc_node_put(node);
        return err;
 }
 
-void tipc_node_unlock(struct tipc_node *node)
-{
-       struct net *net = node->net;
-       u32 addr = 0;
-       u32 flags = node->action_flags;
-       u32 link_id = 0;
-       struct list_head *publ_list;
-
-       if (likely(!flags)) {
-               spin_unlock_bh(&node->lock);
-               return;
-       }
-
-       addr = node->addr;
-       link_id = node->link_id;
-       publ_list = &node->publ_list;
-
-       node->action_flags &= ~(TIPC_NOTIFY_NODE_DOWN | TIPC_NOTIFY_NODE_UP |
-                               TIPC_NOTIFY_LINK_DOWN | TIPC_NOTIFY_LINK_UP);
-
-       spin_unlock_bh(&node->lock);
-
-       if (flags & TIPC_NOTIFY_NODE_DOWN)
-               tipc_publ_notify(net, publ_list, addr);
-
-       if (flags & TIPC_NOTIFY_NODE_UP)
-               tipc_named_node_up(net, addr);
-
-       if (flags & TIPC_NOTIFY_LINK_UP)
-               tipc_nametbl_publish(net, TIPC_LINK_STATE, addr, addr,
-                                    TIPC_NODE_SCOPE, link_id, addr);
-
-       if (flags & TIPC_NOTIFY_LINK_DOWN)
-               tipc_nametbl_withdraw(net, TIPC_LINK_STATE, addr,
-                                     link_id, addr);
-
-}
-
 /* Caller should hold node lock for the passed node */
 static int __tipc_nl_add_node(struct tipc_nl_msg *msg, struct tipc_node *node)
 {
@@ -997,20 +1034,6 @@ msg_full:
        return -EMSGSIZE;
 }
 
-static struct tipc_link *tipc_node_select_link(struct tipc_node *n, int sel,
-                                              int *bearer_id,
-                                              struct tipc_media_addr **maddr)
-{
-       int id = n->active_links[sel & 1];
-
-       if (unlikely(id < 0))
-               return NULL;
-
-       *bearer_id = id;
-       *maddr = &n->links[id].maddr;
-       return n->links[id].link;
-}
-
 /**
  * tipc_node_xmit() is the general link level function for message sending
  * @net: the applicable net namespace
@@ -1023,34 +1046,38 @@ static struct tipc_link *tipc_node_select_link(struct tipc_node *n, int sel,
 int tipc_node_xmit(struct net *net, struct sk_buff_head *list,
                   u32 dnode, int selector)
 {
-       struct tipc_link *l = NULL;
+       struct tipc_link_entry *le = NULL;
        struct tipc_node *n;
        struct sk_buff_head xmitq;
-       struct tipc_media_addr *maddr;
-       int bearer_id;
+       int bearer_id = -1;
        int rc = -EHOSTUNREACH;
 
        __skb_queue_head_init(&xmitq);
        n = tipc_node_find(net, dnode);
        if (likely(n)) {
-               tipc_node_lock(n);
-               l = tipc_node_select_link(n, selector, &bearer_id, &maddr);
-               if (likely(l))
-                       rc = tipc_link_xmit(l, list, &xmitq);
-               tipc_node_unlock(n);
+               tipc_node_read_lock(n);
+               bearer_id = n->active_links[selector & 1];
+               if (bearer_id >= 0) {
+                       le = &n->links[bearer_id];
+                       spin_lock_bh(&le->lock);
+                       rc = tipc_link_xmit(le->link, list, &xmitq);
+                       spin_unlock_bh(&le->lock);
+               }
+               tipc_node_read_unlock(n);
+               if (likely(!skb_queue_empty(&xmitq))) {
+                       tipc_bearer_xmit(net, bearer_id, &xmitq, &le->maddr);
+                       return 0;
+               }
                if (unlikely(rc == -ENOBUFS))
                        tipc_node_link_down(n, bearer_id, false);
                tipc_node_put(n);
+               return rc;
        }
-       if (likely(!rc)) {
-               tipc_bearer_xmit(net, bearer_id, &xmitq, maddr);
-               return 0;
-       }
-       if (likely(in_own_node(net, dnode))) {
-               tipc_sk_rcv(net, list);
-               return 0;
-       }
-       return rc;
+
+       if (unlikely(!in_own_node(net, dnode)))
+               return rc;
+       tipc_sk_rcv(net, list);
+       return 0;
 }
 
 /* tipc_node_xmit_skb(): send single buffer to destination
@@ -1075,6 +1102,30 @@ int tipc_node_xmit_skb(struct net *net, struct sk_buff *skb, u32 dnode,
        return 0;
 }
 
+void tipc_node_broadcast(struct net *net, struct sk_buff *skb)
+{
+       struct sk_buff *txskb;
+       struct tipc_node *n;
+       u32 dst;
+
+       rcu_read_lock();
+       list_for_each_entry_rcu(n, tipc_nodes(net), list) {
+               dst = n->addr;
+               if (in_own_node(net, dst))
+                       continue;
+               if (!tipc_node_is_up(n))
+                       continue;
+               txskb = pskb_copy(skb, GFP_ATOMIC);
+               if (!txskb)
+                       break;
+               msg_set_destnode(buf_msg(txskb), dst);
+               tipc_node_xmit_skb(net, txskb, dst, 0);
+       }
+       rcu_read_unlock();
+
+       kfree_skb(skb);
+}
+
 /**
  * tipc_node_bc_rcv - process TIPC broadcast packet arriving from off-node
  * @net: the applicable net namespace
@@ -1116,9 +1167,9 @@ static void tipc_node_bc_rcv(struct net *net, struct sk_buff *skb, int bearer_id
 
        /* Broadcast ACKs are sent on a unicast link */
        if (rc & TIPC_LINK_SND_BC_ACK) {
-               tipc_node_lock(n);
+               tipc_node_read_lock(n);
                tipc_link_build_ack_msg(le->link, &xmitq);
-               tipc_node_unlock(n);
+               tipc_node_read_unlock(n);
        }
 
        if (!skb_queue_empty(&xmitq))
@@ -1174,7 +1225,7 @@ static bool tipc_node_check_state(struct tipc_node *n, struct sk_buff *skb,
                }
        }
 
-       /* Update node accesibility if applicable */
+       /* Check and update node accesibility if applicable */
        if (state == SELF_UP_PEER_COMING) {
                if (!tipc_link_is_up(l))
                        return true;
@@ -1187,8 +1238,12 @@ static bool tipc_node_check_state(struct tipc_node *n, struct sk_buff *skb,
                if (msg_peer_node_is_up(hdr))
                        return false;
                tipc_node_fsm_evt(n, PEER_LOST_CONTACT_EVT);
+               return true;
        }
 
+       if (state == SELF_LEAVING_PEER_DOWN)
+               return false;
+
        /* Ignore duplicate packets */
        if ((usr != LINK_PROTOCOL) && less(oseqno, rcv_nxt))
                return true;
@@ -1232,12 +1287,10 @@ static bool tipc_node_check_state(struct tipc_node *n, struct sk_buff *skb,
                        tipc_link_fsm_evt(l, LINK_SYNCH_BEGIN_EVT);
                        tipc_node_fsm_evt(n, NODE_SYNCH_BEGIN_EVT);
                }
-               if (less(syncpt, n->sync_point))
-                       n->sync_point = syncpt;
        }
 
        /* Open tunnel link when parallel link reaches synch point */
-       if ((n->state == NODE_SYNCHING) && tipc_link_is_synching(l)) {
+       if (n->state == NODE_SYNCHING) {
                if (tipc_link_is_synching(l)) {
                        tnl = l;
                } else {
@@ -1307,19 +1360,29 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b)
        else if (unlikely(n->bc_entry.link->acked != bc_ack))
                tipc_bcast_ack_rcv(net, n->bc_entry.link, bc_ack);
 
-       tipc_node_lock(n);
-
-       /* Is reception permitted at the moment ? */
-       if (!tipc_node_filter_pkt(n, hdr))
-               goto unlock;
-
-       /* Check and if necessary update node state */
-       if (likely(tipc_node_check_state(n, skb, bearer_id, &xmitq))) {
-               rc = tipc_link_rcv(le->link, skb, &xmitq);
-               skb = NULL;
+       /* Receive packet directly if conditions permit */
+       tipc_node_read_lock(n);
+       if (likely((n->state == SELF_UP_PEER_UP) && (usr != TUNNEL_PROTOCOL))) {
+               spin_lock_bh(&le->lock);
+               if (le->link) {
+                       rc = tipc_link_rcv(le->link, skb, &xmitq);
+                       skb = NULL;
+               }
+               spin_unlock_bh(&le->lock);
+       }
+       tipc_node_read_unlock(n);
+
+       /* Check/update node state before receiving */
+       if (unlikely(skb)) {
+               tipc_node_write_lock(n);
+               if (tipc_node_check_state(n, skb, bearer_id, &xmitq)) {
+                       if (le->link) {
+                               rc = tipc_link_rcv(le->link, skb, &xmitq);
+                               skb = NULL;
+                       }
+               }
+               tipc_node_write_unlock(n);
        }
-unlock:
-       tipc_node_unlock(n);
 
        if (unlikely(rc & TIPC_LINK_UP_EVT))
                tipc_node_link_up(n, bearer_id, &xmitq);
@@ -1384,15 +1447,15 @@ int tipc_nl_node_dump(struct sk_buff *skb, struct netlink_callback *cb)
                                continue;
                }
 
-               tipc_node_lock(node);
+               tipc_node_read_lock(node);
                err = __tipc_nl_add_node(&msg, node);
                if (err) {
                        last_addr = node->addr;
-                       tipc_node_unlock(node);
+                       tipc_node_read_unlock(node);
                        goto out;
                }
 
-               tipc_node_unlock(node);
+               tipc_node_read_unlock(node);
        }
        done = 1;
 out: