From 2312bf61ae365fdd6b9bfb24558a417859759447 Mon Sep 17 00:00:00 2001 From: Jon Paul Maloy Date: Thu, 19 Nov 2015 14:30:43 -0500 Subject: [PATCH] tipc: introduce per-link spinlock As a preparation to allow parallel links to work more independently from each other we introduce a per-link spinlock, to be stored in the struct nodes's link entry area. Since the node lock still is a regular spinlock there is no increase in parallellism at this stage. Reviewed-by: Ying Xue Signed-off-by: Jon Maloy Signed-off-by: David S. Miller --- net/tipc/link.c | 9 +++++---- net/tipc/node.c | 39 ++++++++++++++++++--------------------- net/tipc/node.h | 3 ++- 3 files changed, 25 insertions(+), 26 deletions(-) diff --git a/net/tipc/link.c b/net/tipc/link.c index fa452fb5f34e..b5e895c6f1aa 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -1995,6 +1995,7 @@ int tipc_nl_link_reset_stats(struct sk_buff *skb, struct genl_info *info) struct tipc_node *node; struct nlattr *attrs[TIPC_NLA_LINK_MAX + 1]; struct net *net = sock_net(skb->sk); + struct tipc_link_entry *le; if (!info->attrs[TIPC_NLA_LINK]) return -EINVAL; @@ -2020,17 +2021,17 @@ int tipc_nl_link_reset_stats(struct sk_buff *skb, struct genl_info *info) node = tipc_link_find_owner(net, link_name, &bearer_id); if (!node) return -EINVAL; - + le = &node->links[bearer_id]; tipc_node_lock(node); - - link = node->links[bearer_id].link; + spin_lock_bh(&le->lock); + link = le->link; if (!link) { tipc_node_unlock(node); return -EINVAL; } link_reset_statistics(link); - + spin_unlock_bh(&le->lock); tipc_node_unlock(node); return 0; diff --git a/net/tipc/node.c b/net/tipc/node.c index 932195258551..572063a0190e 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -339,11 +339,13 @@ static void tipc_node_timeout(unsigned long data) for (bearer_id = 0; bearer_id < MAX_BEARERS; bearer_id++) { tipc_node_lock(n); le = &n->links[bearer_id]; + spin_lock_bh(&le->lock); if (le->link) { /* Link tolerance may change asynchronously: */ tipc_node_calculate_timer(n, le->link); rc = tipc_link_timeout(le->link, &xmitq); } + spin_unlock_bh(&le->lock); tipc_node_unlock(n); tipc_bearer_xmit(n->net, bearer_id, &xmitq, &le->maddr); if (rc & TIPC_LINK_DOWN_EVT) @@ -654,6 +656,7 @@ void tipc_node_check_dest(struct net *net, u32 onode, if (n->state == NODE_FAILINGOVER) tipc_link_fsm_evt(l, LINK_FAILOVER_BEGIN_EVT); le->link = l; + spin_lock_init(&le->lock); n->link_cnt++; tipc_node_calculate_timer(n, l); if (n->link_cnt == 1) @@ -1033,20 +1036,6 @@ msg_full: return -EMSGSIZE; } -static struct tipc_link *tipc_node_select_link(struct tipc_node *n, int sel, - int *bearer_id, - struct tipc_media_addr **maddr) -{ - int id = n->active_links[sel & 1]; - - if (unlikely(id < 0)) - return NULL; - - *bearer_id = id; - *maddr = &n->links[id].maddr; - return n->links[id].link; -} - /** * tipc_node_xmit() is the general link level function for message sending * @net: the applicable net namespace @@ -1059,26 +1048,32 @@ static struct tipc_link *tipc_node_select_link(struct tipc_node *n, int sel, int tipc_node_xmit(struct net *net, struct sk_buff_head *list, u32 dnode, int selector) { - struct tipc_link *l = NULL; + struct tipc_link_entry *le; struct tipc_node *n; struct sk_buff_head xmitq; - struct tipc_media_addr *maddr; - int bearer_id; + struct tipc_media_addr *maddr = NULL; + int bearer_id = -1; int rc = -EHOSTUNREACH; __skb_queue_head_init(&xmitq); n = tipc_node_find(net, dnode); if (likely(n)) { tipc_node_lock(n); - l = tipc_node_select_link(n, selector, &bearer_id, &maddr); - if (likely(l)) - rc = tipc_link_xmit(l, list, &xmitq); + bearer_id = n->active_links[selector & 1]; + if (bearer_id >= 0) { + le = &n->links[bearer_id]; + maddr = &le->maddr; + spin_lock_bh(&le->lock); + if (likely(le->link)) + rc = tipc_link_xmit(le->link, list, &xmitq); + spin_unlock_bh(&le->lock); + } tipc_node_unlock(n); if (unlikely(rc == -ENOBUFS)) tipc_node_link_down(n, bearer_id, false); tipc_node_put(n); } - if (likely(!rc)) { + if (likely(!skb_queue_empty(&xmitq))) { tipc_bearer_xmit(net, bearer_id, &xmitq, maddr); return 0; } @@ -1374,7 +1369,9 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b) /* Check and if necessary update node state */ if (likely(tipc_node_check_state(n, skb, bearer_id, &xmitq))) { + spin_lock_bh(&le->lock); rc = tipc_link_rcv(le->link, skb, &xmitq); + spin_unlock_bh(&le->lock); skb = NULL; } unlock: diff --git a/net/tipc/node.h b/net/tipc/node.h index dd79e9742bd6..8784907486c0 100644 --- a/net/tipc/node.h +++ b/net/tipc/node.h @@ -69,6 +69,7 @@ enum { struct tipc_link_entry { struct tipc_link *link; + spinlock_t lock; /* per-link */ u32 mtu; struct sk_buff_head inputq; struct tipc_media_addr maddr; @@ -86,7 +87,7 @@ struct tipc_bclink_entry { * struct tipc_node - TIPC node structure * @addr: network address of node * @ref: reference counter to node object - * @lock: spinlock governing access to structure + * @lock: rwlock governing access to structure * @net: the applicable net namespace * @hash: links to adjacent nodes in unsorted hash chain * @inputq: pointer to input queue containing messages for msg event -- 2.20.1