Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/s390/linux
[cascardo/linux.git] / net / tipc / socket.c
index 4731cad..f73e975 100644 (file)
@@ -1,7 +1,7 @@
 /*
  * net/tipc/socket.c: TIPC socket API
  *
- * Copyright (c) 2001-2007, 2012-2014, Ericsson AB
+ * Copyright (c) 2001-2007, 2012-2015, Ericsson AB
  * Copyright (c) 2004-2008, 2010-2013, Wind River Systems
  * All rights reserved.
  *
  * POSSIBILITY OF SUCH DAMAGE.
  */
 
+#include <linux/rhashtable.h>
+#include <linux/jhash.h>
 #include "core.h"
 #include "name_table.h"
 #include "node.h"
 #include "link.h"
-#include <linux/export.h>
-#include "config.h"
+#include "name_distr.h"
 #include "socket.h"
 
-#define SS_LISTENING   -1      /* socket is listening */
-#define SS_READY       -2      /* socket is connectionless */
+#define SS_LISTENING           -1      /* socket is listening */
+#define SS_READY               -2      /* socket is connectionless */
 
-#define CONN_TIMEOUT_DEFAULT  8000     /* default connect timeout = 8s */
-#define CONN_PROBING_INTERVAL 3600000  /* [ms] => 1 h */
-#define TIPC_FWD_MSG         1
-#define TIPC_CONN_OK          0
-#define TIPC_CONN_PROBING     1
+#define CONN_TIMEOUT_DEFAULT   8000    /* default connect timeout = 8s */
+#define CONN_PROBING_INTERVAL  msecs_to_jiffies(3600000)  /* [ms] => 1 h */
+#define TIPC_FWD_MSG           1
+#define TIPC_CONN_OK           0
+#define TIPC_CONN_PROBING      1
+#define TIPC_MAX_PORT          0xffffffff
+#define TIPC_MIN_PORT          1
 
 /**
  * struct tipc_sock - TIPC socket structure
  * @conn_instance: TIPC instance used when connection was established
  * @published: non-zero if port has one or more associated names
  * @max_pkt: maximum packet size "hint" used when building messages sent by port
- * @ref: unique reference to port in TIPC object registry
+ * @portid: unique port identity in TIPC socket hash table
  * @phdr: preformatted message header used when sending messages
  * @port_list: adjacent ports in TIPC's global list of ports
  * @publications: list of publications for port
  * @pub_count: total # of publications port has made during its lifetime
  * @probing_state:
- * @probing_interval:
- * @timer:
- * @port: port - interacts with 'sk' and with the rest of the TIPC stack
- * @peer_name: the peer of the connection, if any
+ * @probing_intv:
  * @conn_timeout: the time we can wait for an unresponded setup request
  * @dupl_rcvcnt: number of bytes counted twice, in both backlog and rcv queue
  * @link_cong: non-zero if owner must sleep because of link congestion
  * @sent_unacked: # messages sent by socket, and not yet acked by peer
  * @rcv_unacked: # messages read by user, but not yet acked back to peer
+ * @node: hash table node
+ * @rcu: rcu struct for tipc_sock
  */
 struct tipc_sock {
        struct sock sk;
@@ -82,19 +84,20 @@ struct tipc_sock {
        u32 conn_instance;
        int published;
        u32 max_pkt;
-       u32 ref;
+       u32 portid;
        struct tipc_msg phdr;
        struct list_head sock_list;
        struct list_head publications;
        u32 pub_count;
        u32 probing_state;
-       u32 probing_interval;
-       struct timer_list timer;
+       unsigned long probing_intv;
        uint conn_timeout;
        atomic_t dupl_rcvcnt;
        bool link_cong;
        uint sent_unacked;
        uint rcv_unacked;
+       struct rhash_head node;
+       struct rcu_head rcu;
 };
 
 static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb);
@@ -103,16 +106,14 @@ static void tipc_write_space(struct sock *sk);
 static int tipc_release(struct socket *sock);
 static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags);
 static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p);
-static void tipc_sk_timeout(unsigned long ref);
+static void tipc_sk_timeout(unsigned long data);
 static int tipc_sk_publish(struct tipc_sock *tsk, uint scope,
                           struct tipc_name_seq const *seq);
 static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope,
                            struct tipc_name_seq const *seq);
-static u32 tipc_sk_ref_acquire(struct tipc_sock *tsk);
-static void tipc_sk_ref_discard(u32 ref);
-static struct tipc_sock *tipc_sk_get(u32 ref);
-static struct tipc_sock *tipc_sk_get_next(u32 *ref);
-static void tipc_sk_put(struct tipc_sock *tsk);
+static struct tipc_sock *tipc_sk_lookup(struct net *net, u32 portid);
+static int tipc_sk_insert(struct tipc_sock *tsk);
+static void tipc_sk_remove(struct tipc_sock *tsk);
 
 static const struct proto_ops packet_ops;
 static const struct proto_ops stream_ops;
@@ -174,6 +175,11 @@ static const struct nla_policy tipc_nl_sock_policy[TIPC_NLA_SOCK_MAX + 1] = {
  *   - port reference
  */
 
+static u32 tsk_own_node(struct tipc_sock *tsk)
+{
+       return msg_prevnode(&tsk->phdr);
+}
+
 static u32 tsk_peer_node(struct tipc_sock *tsk)
 {
        return msg_destnode(&tsk->phdr);
@@ -246,10 +252,11 @@ static void tsk_rej_rx_queue(struct sock *sk)
 {
        struct sk_buff *skb;
        u32 dnode;
+       u32 own_node = tsk_own_node(tipc_sk(sk));
 
        while ((skb = __skb_dequeue(&sk->sk_receive_queue))) {
-               if (tipc_msg_reverse(skb, &dnode, TIPC_ERR_NO_PORT))
-                       tipc_link_xmit_skb(skb, dnode, 0);
+               if (tipc_msg_reverse(own_node, skb, &dnode, TIPC_ERR_NO_PORT))
+                       tipc_link_xmit_skb(sock_net(sk), skb, dnode, 0);
        }
 }
 
@@ -260,6 +267,7 @@ static void tsk_rej_rx_queue(struct sock *sk)
  */
 static bool tsk_peer_msg(struct tipc_sock *tsk, struct tipc_msg *msg)
 {
+       struct tipc_net *tn = net_generic(sock_net(&tsk->sk), tipc_net_id);
        u32 peer_port = tsk_peer_port(tsk);
        u32 orig_node;
        u32 peer_node;
@@ -276,10 +284,10 @@ static bool tsk_peer_msg(struct tipc_sock *tsk, struct tipc_msg *msg)
        if (likely(orig_node == peer_node))
                return true;
 
-       if (!orig_node && (peer_node == tipc_own_addr))
+       if (!orig_node && (peer_node == tn->own_addr))
                return true;
 
-       if (!peer_node && (orig_node == tipc_own_addr))
+       if (!peer_node && (orig_node == tn->own_addr))
                return true;
 
        return false;
@@ -300,12 +308,12 @@ static bool tsk_peer_msg(struct tipc_sock *tsk, struct tipc_msg *msg)
 static int tipc_sk_create(struct net *net, struct socket *sock,
                          int protocol, int kern)
 {
+       struct tipc_net *tn;
        const struct proto_ops *ops;
        socket_state state;
        struct sock *sk;
        struct tipc_sock *tsk;
        struct tipc_msg *msg;
-       u32 ref;
 
        /* Validate arguments */
        if (unlikely(protocol != 0))
@@ -339,24 +347,23 @@ static int tipc_sk_create(struct net *net, struct socket *sock,
                return -ENOMEM;
 
        tsk = tipc_sk(sk);
-       ref = tipc_sk_ref_acquire(tsk);
-       if (!ref) {
-               pr_warn("Socket create failed; reference table exhausted\n");
-               return -ENOMEM;
-       }
        tsk->max_pkt = MAX_PKT_DEFAULT;
-       tsk->ref = ref;
        INIT_LIST_HEAD(&tsk->publications);
        msg = &tsk->phdr;
-       tipc_msg_init(msg, TIPC_LOW_IMPORTANCE, TIPC_NAMED_MSG,
+       tn = net_generic(sock_net(sk), tipc_net_id);
+       tipc_msg_init(tn->own_addr, msg, TIPC_LOW_IMPORTANCE, TIPC_NAMED_MSG,
                      NAMED_H_SIZE, 0);
-       msg_set_origport(msg, ref);
 
        /* Finish initializing socket data structures */
        sock->ops = ops;
        sock->state = state;
        sock_init_data(sock, sk);
-       k_init_timer(&tsk->timer, (Handler)tipc_sk_timeout, ref);
+       if (tipc_sk_insert(tsk)) {
+               pr_warn("Socket create failed; port numbrer exhausted\n");
+               return -EINVAL;
+       }
+       msg_set_origport(msg, tsk->portid);
+       setup_timer(&sk->sk_timer, tipc_sk_timeout, (unsigned long)tsk);
        sk->sk_backlog_rcv = tipc_backlog_rcv;
        sk->sk_rcvbuf = sysctl_tipc_rmem[1];
        sk->sk_data_ready = tipc_data_ready;
@@ -384,7 +391,7 @@ static int tipc_sk_create(struct net *net, struct socket *sock,
  *
  * Returns 0 on success, errno otherwise
  */
-int tipc_sock_create_local(int type, struct socket **res)
+int tipc_sock_create_local(struct net *net, int type, struct socket **res)
 {
        int rc;
 
@@ -393,7 +400,7 @@ int tipc_sock_create_local(int type, struct socket **res)
                pr_err("Failed to create kernel socket\n");
                return rc;
        }
-       tipc_sk_create(&init_net, *res, 0, 1);
+       tipc_sk_create(net, *res, 0, 1);
 
        return 0;
 }
@@ -442,6 +449,13 @@ int tipc_sock_accept_local(struct socket *sock, struct socket **newsock,
        return ret;
 }
 
+static void tipc_sk_callback(struct rcu_head *head)
+{
+       struct tipc_sock *tsk = container_of(head, struct tipc_sock, rcu);
+
+       sock_put(&tsk->sk);
+}
+
 /**
  * tipc_release - destroy a TIPC socket
  * @sock: socket to destroy
@@ -461,9 +475,10 @@ int tipc_sock_accept_local(struct socket *sock, struct socket **newsock,
 static int tipc_release(struct socket *sock)
 {
        struct sock *sk = sock->sk;
+       struct net *net;
        struct tipc_sock *tsk;
        struct sk_buff *skb;
-       u32 dnode;
+       u32 dnode, probing_state;
 
        /*
         * Exit if socket isn't fully initialized (occurs when a failed accept()
@@ -472,6 +487,7 @@ static int tipc_release(struct socket *sock)
        if (sk == NULL)
                return 0;
 
+       net = sock_net(sk);
        tsk = tipc_sk(sk);
        lock_sock(sk);
 
@@ -491,26 +507,29 @@ static int tipc_release(struct socket *sock)
                            (sock->state == SS_CONNECTED)) {
                                sock->state = SS_DISCONNECTING;
                                tsk->connected = 0;
-                               tipc_node_remove_conn(dnode, tsk->ref);
+                               tipc_node_remove_conn(net, dnode, tsk->portid);
                        }
-                       if (tipc_msg_reverse(skb, &dnode, TIPC_ERR_NO_PORT))
-                               tipc_link_xmit_skb(skb, dnode, 0);
+                       if (tipc_msg_reverse(tsk_own_node(tsk), skb, &dnode,
+                                            TIPC_ERR_NO_PORT))
+                               tipc_link_xmit_skb(net, skb, dnode, 0);
                }
        }
 
        tipc_sk_withdraw(tsk, 0, NULL);
-       tipc_sk_ref_discard(tsk->ref);
-       k_cancel_timer(&tsk->timer);
+       probing_state = tsk->probing_state;
+       if (del_timer_sync(&sk->sk_timer) &&
+           probing_state != TIPC_CONN_PROBING)
+               sock_put(sk);
+       tipc_sk_remove(tsk);
        if (tsk->connected) {
-               skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, TIPC_CONN_MSG,
-                                     SHORT_H_SIZE, 0, dnode, tipc_own_addr,
-                                     tsk_peer_port(tsk),
-                                     tsk->ref, TIPC_ERR_NO_PORT);
+               skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE,
+                                     TIPC_CONN_MSG, SHORT_H_SIZE, 0, dnode,
+                                     tsk_own_node(tsk), tsk_peer_port(tsk),
+                                     tsk->portid, TIPC_ERR_NO_PORT);
                if (skb)
-                       tipc_link_xmit_skb(skb, dnode, tsk->ref);
-               tipc_node_remove_conn(dnode, tsk->ref);
+                       tipc_link_xmit_skb(net, skb, dnode, tsk->portid);
+               tipc_node_remove_conn(net, dnode, tsk->portid);
        }
-       k_term_timer(&tsk->timer);
 
        /* Discard any remaining (connection-based) messages in receive queue */
        __skb_queue_purge(&sk->sk_receive_queue);
@@ -518,7 +537,8 @@ static int tipc_release(struct socket *sock)
        /* Reject any messages that accumulated in backlog queue */
        sock->state = SS_DISCONNECTING;
        release_sock(sk);
-       sock_put(sk);
+
+       call_rcu(&tsk->rcu, tipc_sk_callback);
        sock->sk = NULL;
 
        return 0;
@@ -602,6 +622,7 @@ static int tipc_getname(struct socket *sock, struct sockaddr *uaddr,
 {
        struct sockaddr_tipc *addr = (struct sockaddr_tipc *)uaddr;
        struct tipc_sock *tsk = tipc_sk(sock->sk);
+       struct tipc_net *tn = net_generic(sock_net(sock->sk), tipc_net_id);
 
        memset(addr, 0, sizeof(*addr));
        if (peer) {
@@ -611,8 +632,8 @@ static int tipc_getname(struct socket *sock, struct sockaddr *uaddr,
                addr->addr.id.ref = tsk_peer_port(tsk);
                addr->addr.id.node = tsk_peer_node(tsk);
        } else {
-               addr->addr.id.ref = tsk->ref;
-               addr->addr.id.node = tipc_own_addr;
+               addr->addr.id.ref = tsk->portid;
+               addr->addr.id.node = tn->own_addr;
        }
 
        *uaddr_len = sizeof(*addr);
@@ -711,8 +732,11 @@ static int tipc_sendmcast(struct  socket *sock, struct tipc_name_seq *seq,
                          struct msghdr *msg, size_t dsz, long timeo)
 {
        struct sock *sk = sock->sk;
-       struct tipc_msg *mhdr = &tipc_sk(sk)->phdr;
-       struct sk_buff_head head;
+       struct tipc_sock *tsk = tipc_sk(sk);
+       struct net *net = sock_net(sk);
+       struct tipc_msg *mhdr = &tsk->phdr;
+       struct sk_buff_head *pktchain = &sk->sk_write_queue;
+       struct iov_iter save = msg->msg_iter;
        uint mtu;
        int rc;
 
@@ -727,83 +751,97 @@ static int tipc_sendmcast(struct  socket *sock, struct tipc_name_seq *seq,
 
 new_mtu:
        mtu = tipc_bclink_get_mtu();
-       __skb_queue_head_init(&head);
-       rc = tipc_msg_build(mhdr, msg, 0, dsz, mtu, &head);
+       rc = tipc_msg_build(mhdr, msg, 0, dsz, mtu, pktchain);
        if (unlikely(rc < 0))
                return rc;
 
        do {
-               rc = tipc_bclink_xmit(&head);
+               rc = tipc_bclink_xmit(net, pktchain);
                if (likely(rc >= 0)) {
                        rc = dsz;
                        break;
                }
-               if (rc == -EMSGSIZE)
+               if (rc == -EMSGSIZE) {
+                       msg->msg_iter = save;
                        goto new_mtu;
+               }
                if (rc != -ELINKCONG)
                        break;
                tipc_sk(sk)->link_cong = 1;
                rc = tipc_wait_for_sndmsg(sock, &timeo);
                if (rc)
-                       __skb_queue_purge(&head);
+                       __skb_queue_purge(pktchain);
        } while (!rc);
        return rc;
 }
 
-/* tipc_sk_mcast_rcv - Deliver multicast message to all destination sockets
+/**
+ * tipc_sk_mcast_rcv - Deliver multicast messages to all destination sockets
+ * @arrvq: queue with arriving messages, to be cloned after destination lookup
+ * @inputq: queue with cloned messages, delivered to socket after dest lookup
+ *
+ * Multi-threaded: parallel calls with reference to same queues may occur
  */
-void tipc_sk_mcast_rcv(struct sk_buff *buf)
+void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
+                      struct sk_buff_head *inputq)
 {
-       struct tipc_msg *msg = buf_msg(buf);
-       struct tipc_port_list dports = {0, NULL, };
-       struct tipc_port_list *item;
-       struct sk_buff *b;
-       uint i, last, dst = 0;
+       struct tipc_msg *msg;
+       struct tipc_plist dports;
+       u32 portid;
        u32 scope = TIPC_CLUSTER_SCOPE;
-
-       if (in_own_node(msg_orignode(msg)))
-               scope = TIPC_NODE_SCOPE;
-
-       /* Create destination port list: */
-       tipc_nametbl_mc_translate(msg_nametype(msg),
-                                 msg_namelower(msg),
-                                 msg_nameupper(msg),
-                                 scope,
-                                 &dports);
-       last = dports.count;
-       if (!last) {
-               kfree_skb(buf);
-               return;
-       }
-
-       for (item = &dports; item; item = item->next) {
-               for (i = 0; i < PLSIZE && ++dst <= last; i++) {
-                       b = (dst != last) ? skb_clone(buf, GFP_ATOMIC) : buf;
-                       if (!b) {
-                               pr_warn("Failed do clone mcast rcv buffer\n");
+       struct sk_buff_head tmpq;
+       uint hsz;
+       struct sk_buff *skb, *_skb;
+
+       __skb_queue_head_init(&tmpq);
+       tipc_plist_init(&dports);
+
+       skb = tipc_skb_peek(arrvq, &inputq->lock);
+       for (; skb; skb = tipc_skb_peek(arrvq, &inputq->lock)) {
+               msg = buf_msg(skb);
+               hsz = skb_headroom(skb) + msg_hdr_sz(msg);
+
+               if (in_own_node(net, msg_orignode(msg)))
+                       scope = TIPC_NODE_SCOPE;
+
+               /* Create destination port list and message clones: */
+               tipc_nametbl_mc_translate(net,
+                                         msg_nametype(msg), msg_namelower(msg),
+                                         msg_nameupper(msg), scope, &dports);
+               portid = tipc_plist_pop(&dports);
+               for (; portid; portid = tipc_plist_pop(&dports)) {
+                       _skb = __pskb_copy(skb, hsz, GFP_ATOMIC);
+                       if (_skb) {
+                               msg_set_destport(buf_msg(_skb), portid);
+                               __skb_queue_tail(&tmpq, _skb);
                                continue;
                        }
-                       msg_set_destport(msg, item->ports[i]);
-                       tipc_sk_rcv(b);
+                       pr_warn("Failed to clone mcast rcv buffer\n");
                }
+               /* Append to inputq if not already done by other thread */
+               spin_lock_bh(&inputq->lock);
+               if (skb_peek(arrvq) == skb) {
+                       skb_queue_splice_tail_init(&tmpq, inputq);
+                       kfree_skb(__skb_dequeue(arrvq));
+               }
+               spin_unlock_bh(&inputq->lock);
+               __skb_queue_purge(&tmpq);
+               kfree_skb(skb);
        }
-       tipc_port_list_free(&dports);
+       tipc_sk_rcv(net, inputq);
 }
 
 /**
  * tipc_sk_proto_rcv - receive a connection mng protocol message
  * @tsk: receiving socket
- * @dnode: node to send response message to, if any
- * @buf: buffer containing protocol message
- * Returns 0 (TIPC_OK) if message was consumed, 1 (TIPC_FWD_MSG) if
- * (CONN_PROBE_REPLY) message should be forwarded.
+ * @skb: pointer to message buffer. Set to NULL if buffer is consumed.
  */
-static int tipc_sk_proto_rcv(struct tipc_sock *tsk, u32 *dnode,
-                            struct sk_buff *buf)
+static void tipc_sk_proto_rcv(struct tipc_sock *tsk, struct sk_buff **skb)
 {
-       struct tipc_msg *msg = buf_msg(buf);
+       struct tipc_msg *msg = buf_msg(*skb);
        int conn_cong;
-
+       u32 dnode;
+       u32 own_node = tsk_own_node(tsk);
        /* Ignore if connection cannot be validated: */
        if (!tsk_peer_msg(tsk, msg))
                goto exit;
@@ -816,15 +854,15 @@ static int tipc_sk_proto_rcv(struct tipc_sock *tsk, u32 *dnode,
                if (conn_cong)
                        tsk->sk.sk_write_space(&tsk->sk);
        } else if (msg_type(msg) == CONN_PROBE) {
-               if (!tipc_msg_reverse(buf, dnode, TIPC_OK))
-                       return TIPC_OK;
-               msg_set_type(msg, CONN_PROBE_REPLY);
-               return TIPC_FWD_MSG;
+               if (tipc_msg_reverse(own_node, *skb, &dnode, TIPC_OK)) {
+                       msg_set_type(msg, CONN_PROBE_REPLY);
+                       return;
+               }
        }
        /* Do nothing if msg_type() == CONN_PROBE_REPLY */
 exit:
-       kfree_skb(buf);
-       return TIPC_OK;
+       kfree_skb(*skb);
+       *skb = NULL;
 }
 
 static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p)
@@ -872,11 +910,13 @@ static int tipc_sendmsg(struct kiocb *iocb, struct socket *sock,
        DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
        struct sock *sk = sock->sk;
        struct tipc_sock *tsk = tipc_sk(sk);
+       struct net *net = sock_net(sk);
        struct tipc_msg *mhdr = &tsk->phdr;
        u32 dnode, dport;
-       struct sk_buff_head head;
+       struct sk_buff_head *pktchain = &sk->sk_write_queue;
        struct sk_buff *skb;
        struct tipc_name_seq *seq = &dest->addr.nameseq;
+       struct iov_iter save;
        u32 mtu;
        long timeo;
        int rc;
@@ -929,7 +969,7 @@ static int tipc_sendmsg(struct kiocb *iocb, struct socket *sock,
                msg_set_nametype(mhdr, type);
                msg_set_nameinst(mhdr, inst);
                msg_set_lookup_scope(mhdr, tipc_addr_scope(domain));
-               dport = tipc_nametbl_translate(type, inst, &dnode);
+               dport = tipc_nametbl_translate(net, type, inst, &dnode);
                msg_set_destnode(mhdr, dnode);
                msg_set_destport(mhdr, dport);
                if (unlikely(!dport && !dnode)) {
@@ -945,31 +985,33 @@ static int tipc_sendmsg(struct kiocb *iocb, struct socket *sock,
                msg_set_hdr_sz(mhdr, BASIC_H_SIZE);
        }
 
+       save = m->msg_iter;
 new_mtu:
-       mtu = tipc_node_get_mtu(dnode, tsk->ref);
-       __skb_queue_head_init(&head);
-       rc = tipc_msg_build(mhdr, m, 0, dsz, mtu, &head);
+       mtu = tipc_node_get_mtu(net, dnode, tsk->portid);
+       rc = tipc_msg_build(mhdr, m, 0, dsz, mtu, pktchain);
        if (rc < 0)
                goto exit;
 
        do {
-               skb = skb_peek(&head);
+               skb = skb_peek(pktchain);
                TIPC_SKB_CB(skb)->wakeup_pending = tsk->link_cong;
-               rc = tipc_link_xmit(&head, dnode, tsk->ref);
+               rc = tipc_link_xmit(net, pktchain, dnode, tsk->portid);
                if (likely(rc >= 0)) {
                        if (sock->state != SS_READY)
                                sock->state = SS_CONNECTING;
                        rc = dsz;
                        break;
                }
-               if (rc == -EMSGSIZE)
+               if (rc == -EMSGSIZE) {
+                       m->msg_iter = save;
                        goto new_mtu;
+               }
                if (rc != -ELINKCONG)
                        break;
                tsk->link_cong = 1;
                rc = tipc_wait_for_sndmsg(sock, &timeo);
                if (rc)
-                       __skb_queue_purge(&head);
+                       __skb_queue_purge(pktchain);
        } while (!rc);
 exit:
        if (iocb)
@@ -1024,15 +1066,17 @@ static int tipc_send_stream(struct kiocb *iocb, struct socket *sock,
                            struct msghdr *m, size_t dsz)
 {
        struct sock *sk = sock->sk;
+       struct net *net = sock_net(sk);
        struct tipc_sock *tsk = tipc_sk(sk);
        struct tipc_msg *mhdr = &tsk->phdr;
-       struct sk_buff_head head;
+       struct sk_buff_head *pktchain = &sk->sk_write_queue;
        DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
-       u32 ref = tsk->ref;
+       u32 portid = tsk->portid;
        int rc = -EINVAL;
        long timeo;
        u32 dnode;
        uint mtu, send, sent = 0;
+       struct iov_iter save;
 
        /* Handle implied connection establishment */
        if (unlikely(dest)) {
@@ -1059,15 +1103,15 @@ static int tipc_send_stream(struct kiocb *iocb, struct socket *sock,
        dnode = tsk_peer_node(tsk);
 
 next:
+       save = m->msg_iter;
        mtu = tsk->max_pkt;
        send = min_t(uint, dsz - sent, TIPC_MAX_USER_MSG_SIZE);
-       __skb_queue_head_init(&head);
-       rc = tipc_msg_build(mhdr, m, sent, send, mtu, &head);
+       rc = tipc_msg_build(mhdr, m, sent, send, mtu, pktchain);
        if (unlikely(rc < 0))
                goto exit;
        do {
                if (likely(!tsk_conn_cong(tsk))) {
-                       rc = tipc_link_xmit(&head, dnode, ref);
+                       rc = tipc_link_xmit(net, pktchain, dnode, portid);
                        if (likely(!rc)) {
                                tsk->sent_unacked++;
                                sent += send;
@@ -1076,7 +1120,9 @@ next:
                                goto next;
                        }
                        if (rc == -EMSGSIZE) {
-                               tsk->max_pkt = tipc_node_get_mtu(dnode, ref);
+                               tsk->max_pkt = tipc_node_get_mtu(net, dnode,
+                                                                portid);
+                               m->msg_iter = save;
                                goto next;
                        }
                        if (rc != -ELINKCONG)
@@ -1085,7 +1131,7 @@ next:
                }
                rc = tipc_wait_for_sndpkt(sock, &timeo);
                if (rc)
-                       __skb_queue_purge(&head);
+                       __skb_queue_purge(pktchain);
        } while (!rc);
 exit:
        if (iocb)
@@ -1118,6 +1164,8 @@ static int tipc_send_packet(struct kiocb *iocb, struct socket *sock,
 static void tipc_sk_finish_conn(struct tipc_sock *tsk, u32 peer_port,
                                u32 peer_node)
 {
+       struct sock *sk = &tsk->sk;
+       struct net *net = sock_net(sk);
        struct tipc_msg *msg = &tsk->phdr;
 
        msg_set_destnode(msg, peer_node);
@@ -1126,12 +1174,12 @@ static void tipc_sk_finish_conn(struct tipc_sock *tsk, u32 peer_port,
        msg_set_lookup_scope(msg, 0);
        msg_set_hdr_sz(msg, SHORT_H_SIZE);
 
-       tsk->probing_interval = CONN_PROBING_INTERVAL;
+       tsk->probing_intv = CONN_PROBING_INTERVAL;
        tsk->probing_state = TIPC_CONN_OK;
        tsk->connected = 1;
-       k_start_timer(&tsk->timer, tsk->probing_interval);
-       tipc_node_add_conn(peer_node, tsk->ref, peer_port);
-       tsk->max_pkt = tipc_node_get_mtu(peer_node, tsk->ref);
+       sk_reset_timer(sk, &sk->sk_timer, jiffies + tsk->probing_intv);
+       tipc_node_add_conn(net, peer_node, tsk->portid, peer_port);
+       tsk->max_pkt = tipc_node_get_mtu(net, peer_node, tsk->portid);
 }
 
 /**
@@ -1230,6 +1278,7 @@ static int tipc_sk_anc_data_recv(struct msghdr *m, struct tipc_msg *msg,
 
 static void tipc_sk_send_ack(struct tipc_sock *tsk, uint ack)
 {
+       struct net *net = sock_net(&tsk->sk);
        struct sk_buff *skb = NULL;
        struct tipc_msg *msg;
        u32 peer_port = tsk_peer_port(tsk);
@@ -1237,13 +1286,14 @@ static void tipc_sk_send_ack(struct tipc_sock *tsk, uint ack)
 
        if (!tsk->connected)
                return;
-       skb = tipc_msg_create(CONN_MANAGER, CONN_ACK, INT_H_SIZE, 0, dnode,
-                             tipc_own_addr, peer_port, tsk->ref, TIPC_OK);
+       skb = tipc_msg_create(CONN_MANAGER, CONN_ACK, INT_H_SIZE, 0,
+                             dnode, tsk_own_node(tsk), peer_port,
+                             tsk->portid, TIPC_OK);
        if (!skb)
                return;
        msg = buf_msg(skb);
        msg_set_msgcnt(msg, ack);
-       tipc_link_xmit_skb(skb, dnode, msg_link_selector(msg));
+       tipc_link_xmit_skb(net, skb, dnode, msg_link_selector(msg));
 }
 
 static int tipc_wait_for_rcvmsg(struct socket *sock, long *timeop)
@@ -1529,15 +1579,16 @@ static void tipc_data_ready(struct sock *sk)
 /**
  * filter_connect - Handle all incoming messages for a connection-based socket
  * @tsk: TIPC socket
- * @msg: message
+ * @skb: pointer to message buffer. Set to NULL if buffer is consumed
  *
  * Returns 0 (TIPC_OK) if everything ok, -TIPC_ERR_NO_PORT otherwise
  */
-static int filter_connect(struct tipc_sock *tsk, struct sk_buff **buf)
+static int filter_connect(struct tipc_sock *tsk, struct sk_buff **skb)
 {
        struct sock *sk = &tsk->sk;
+       struct net *net = sock_net(sk);
        struct socket *sock = sk->sk_socket;
-       struct tipc_msg *msg = buf_msg(*buf);
+       struct tipc_msg *msg = buf_msg(*skb);
        int retval = -TIPC_ERR_NO_PORT;
 
        if (msg_mcast(msg))
@@ -1551,8 +1602,8 @@ static int filter_connect(struct tipc_sock *tsk, struct sk_buff **buf)
                                sock->state = SS_DISCONNECTING;
                                tsk->connected = 0;
                                /* let timer expire on it's own */
-                               tipc_node_remove_conn(tsk_peer_node(tsk),
-                                                     tsk->ref);
+                               tipc_node_remove_conn(net, tsk_peer_node(tsk),
+                                                     tsk->portid);
                        }
                        retval = TIPC_OK;
                }
@@ -1587,8 +1638,8 @@ static int filter_connect(struct tipc_sock *tsk, struct sk_buff **buf)
                 * connect() routine if sleeping.
                 */
                if (msg_data_sz(msg) == 0) {
-                       kfree_skb(*buf);
-                       *buf = NULL;
+                       kfree_skb(*skb);
+                       *skb = NULL;
                        if (waitqueue_active(sk_sleep(sk)))
                                wake_up_interruptible(sk_sleep(sk));
                }
@@ -1640,32 +1691,33 @@ static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *buf)
 /**
  * filter_rcv - validate incoming message
  * @sk: socket
- * @buf: message
+ * @skb: pointer to message. Set to NULL if buffer is consumed.
  *
  * Enqueues message on receive queue if acceptable; optionally handles
  * disconnect indication for a connected socket.
  *
- * Called with socket lock already taken; port lock may also be taken.
+ * Called with socket lock already taken
  *
- * Returns 0 (TIPC_OK) if message was consumed, -TIPC error code if message
- * to be rejected, 1 (TIPC_FWD_MSG) if (CONN_MANAGER) message to be forwarded
+ * Returns 0 (TIPC_OK) if message was ok, -TIPC error code if rejected
  */
-static int filter_rcv(struct sock *sk, struct sk_buff *buf)
+static int filter_rcv(struct sock *sk, struct sk_buff **skb)
 {
        struct socket *sock = sk->sk_socket;
        struct tipc_sock *tsk = tipc_sk(sk);
-       struct tipc_msg *msg = buf_msg(buf);
-       unsigned int limit = rcvbuf_limit(sk, buf);
-       u32 onode;
+       struct tipc_msg *msg = buf_msg(*skb);
+       unsigned int limit = rcvbuf_limit(sk, *skb);
        int rc = TIPC_OK;
 
-       if (unlikely(msg_user(msg) == CONN_MANAGER))
-               return tipc_sk_proto_rcv(tsk, &onode, buf);
+       if (unlikely(msg_user(msg) == CONN_MANAGER)) {
+               tipc_sk_proto_rcv(tsk, skb);
+               return TIPC_OK;
+       }
 
        if (unlikely(msg_user(msg) == SOCK_WAKEUP)) {
-               kfree_skb(buf);
+               kfree_skb(*skb);
                tsk->link_cong = 0;
                sk->sk_write_space(sk);
+               *skb = NULL;
                return TIPC_OK;
        }
 
@@ -1677,21 +1729,22 @@ static int filter_rcv(struct sock *sk, struct sk_buff *buf)
                if (msg_connected(msg))
                        return -TIPC_ERR_NO_PORT;
        } else {
-               rc = filter_connect(tsk, &buf);
-               if (rc != TIPC_OK || buf == NULL)
+               rc = filter_connect(tsk, skb);
+               if (rc != TIPC_OK || !*skb)
                        return rc;
        }
 
        /* Reject message if there isn't room to queue it */
-       if (sk_rmem_alloc_get(sk) + buf->truesize >= limit)
+       if (sk_rmem_alloc_get(sk) + (*skb)->truesize >= limit)
                return -TIPC_ERR_OVERLOAD;
 
        /* Enqueue message */
-       TIPC_SKB_CB(buf)->handle = NULL;
-       __skb_queue_tail(&sk->sk_receive_queue, buf);
-       skb_set_owner_r(buf, sk);
+       TIPC_SKB_CB(*skb)->handle = NULL;
+       __skb_queue_tail(&sk->sk_receive_queue, *skb);
+       skb_set_owner_r(*skb, sk);
 
        sk->sk_data_ready(sk);
+       *skb = NULL;
        return TIPC_OK;
 }
 
@@ -1700,78 +1753,125 @@ static int filter_rcv(struct sock *sk, struct sk_buff *buf)
  * @sk: socket
  * @skb: message
  *
- * Caller must hold socket lock, but not port lock.
+ * Caller must hold socket lock
  *
  * Returns 0
  */
 static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb)
 {
-       int rc;
-       u32 onode;
+       int err;
+       atomic_t *dcnt;
+       u32 dnode;
        struct tipc_sock *tsk = tipc_sk(sk);
+       struct net *net = sock_net(sk);
        uint truesize = skb->truesize;
 
-       rc = filter_rcv(sk, skb);
-
-       if (likely(!rc)) {
-               if (atomic_read(&tsk->dupl_rcvcnt) < TIPC_CONN_OVERLOAD_LIMIT)
-                       atomic_add(truesize, &tsk->dupl_rcvcnt);
+       err = filter_rcv(sk, &skb);
+       if (likely(!skb)) {
+               dcnt = &tsk->dupl_rcvcnt;
+               if (atomic_read(dcnt) < TIPC_CONN_OVERLOAD_LIMIT)
+                       atomic_add(truesize, dcnt);
                return 0;
        }
+       if (!err || tipc_msg_reverse(tsk_own_node(tsk), skb, &dnode, -err))
+               tipc_link_xmit_skb(net, skb, dnode, tsk->portid);
+       return 0;
+}
 
-       if ((rc < 0) && !tipc_msg_reverse(skb, &onode, -rc))
-               return 0;
-
-       tipc_link_xmit_skb(skb, onode, 0);
+/**
+ * tipc_sk_enqueue - extract all buffers with destination 'dport' from
+ *                   inputq and try adding them to socket or backlog queue
+ * @inputq: list of incoming buffers with potentially different destinations
+ * @sk: socket where the buffers should be enqueued
+ * @dport: port number for the socket
+ * @_skb: returned buffer to be forwarded or rejected, if applicable
+ *
+ * Caller must hold socket lock
+ *
+ * Returns TIPC_OK if all buffers enqueued, otherwise -TIPC_ERR_OVERLOAD
+ * or -TIPC_ERR_NO_PORT
+ */
+static int tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk,
+                          u32 dport, struct sk_buff **_skb)
+{
+       unsigned int lim;
+       atomic_t *dcnt;
+       int err;
+       struct sk_buff *skb;
+       unsigned long time_limit = jiffies + 2;
 
-       return 0;
+       while (skb_queue_len(inputq)) {
+               if (unlikely(time_after_eq(jiffies, time_limit)))
+                       return TIPC_OK;
+               skb = tipc_skb_dequeue(inputq, dport);
+               if (unlikely(!skb))
+                       return TIPC_OK;
+               if (!sock_owned_by_user(sk)) {
+                       err = filter_rcv(sk, &skb);
+                       if (likely(!skb))
+                               continue;
+                       *_skb = skb;
+                       return err;
+               }
+               dcnt = &tipc_sk(sk)->dupl_rcvcnt;
+               if (sk->sk_backlog.len)
+                       atomic_set(dcnt, 0);
+               lim = rcvbuf_limit(sk, skb) + atomic_read(dcnt);
+               if (likely(!sk_add_backlog(sk, skb, lim)))
+                       continue;
+               *_skb = skb;
+               return -TIPC_ERR_OVERLOAD;
+       }
+       return TIPC_OK;
 }
 
 /**
- * tipc_sk_rcv - handle incoming message
- * @skb: buffer containing arriving message
- * Consumes buffer
- * Returns 0 if success, or errno: -EHOSTUNREACH
+ * tipc_sk_rcv - handle a chain of incoming buffers
+ * @inputq: buffer list containing the buffers
+ * Consumes all buffers in list until inputq is empty
+ * Note: may be called in multiple threads referring to the same queue
+ * Returns 0 if last buffer was accepted, otherwise -EHOSTUNREACH
+ * Only node local calls check the return value, sending single-buffer queues
  */
-int tipc_sk_rcv(struct sk_buff *skb)
+int tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq)
 {
+       u32 dnode, dport = 0;
+       int err = -TIPC_ERR_NO_PORT;
+       struct sk_buff *skb;
        struct tipc_sock *tsk;
+       struct tipc_net *tn;
        struct sock *sk;
-       u32 dport = msg_destport(buf_msg(skb));
-       int rc = TIPC_OK;
-       uint limit;
-       u32 dnode;
 
-       /* Validate destination and message */
-       tsk = tipc_sk_get(dport);
-       if (unlikely(!tsk)) {
-               rc = tipc_msg_eval(skb, &dnode);
-               goto exit;
+       while (skb_queue_len(inputq)) {
+               skb = NULL;
+               dport = tipc_skb_peek_port(inputq, dport);
+               tsk = tipc_sk_lookup(net, dport);
+               if (likely(tsk)) {
+                       sk = &tsk->sk;
+                       if (likely(spin_trylock_bh(&sk->sk_lock.slock))) {
+                               err = tipc_sk_enqueue(inputq, sk, dport, &skb);
+                               spin_unlock_bh(&sk->sk_lock.slock);
+                               dport = 0;
+                       }
+                       sock_put(sk);
+               } else {
+                       skb = tipc_skb_dequeue(inputq, dport);
+               }
+               if (likely(!skb))
+                       continue;
+               if (tipc_msg_lookup_dest(net, skb, &dnode, &err))
+                       goto xmit;
+               if (!err) {
+                       dnode = msg_destnode(buf_msg(skb));
+                       goto xmit;
+               }
+               tn = net_generic(net, tipc_net_id);
+               if (!tipc_msg_reverse(tn->own_addr, skb, &dnode, -err))
+                       continue;
+xmit:
+               tipc_link_xmit_skb(net, skb, dnode, dport);
        }
-       sk = &tsk->sk;
-
-       /* Queue message */
-       spin_lock_bh(&sk->sk_lock.slock);
-
-       if (!sock_owned_by_user(sk)) {
-               rc = filter_rcv(sk, skb);
-       } else {
-               if (sk->sk_backlog.len == 0)
-                       atomic_set(&tsk->dupl_rcvcnt, 0);
-               limit = rcvbuf_limit(sk, skb) + atomic_read(&tsk->dupl_rcvcnt);
-               if (sk_add_backlog(sk, skb, limit))
-                       rc = -TIPC_ERR_OVERLOAD;
-       }
-       spin_unlock_bh(&sk->sk_lock.slock);
-       tipc_sk_put(tsk);
-       if (likely(!rc))
-               return 0;
-exit:
-       if ((rc < 0) && !tipc_msg_reverse(skb, &dnode, -rc))
-               return -EHOSTUNREACH;
-
-       tipc_link_xmit_skb(skb, dnode, 0);
-       return (rc < 0) ? -EHOSTUNREACH : 0;
+       return err ? -EHOSTUNREACH : 0;
 }
 
 static int tipc_wait_for_connect(struct socket *sock, long *timeo_p)
@@ -2027,6 +2127,7 @@ exit:
 static int tipc_shutdown(struct socket *sock, int how)
 {
        struct sock *sk = sock->sk;
+       struct net *net = sock_net(sk);
        struct tipc_sock *tsk = tipc_sk(sk);
        struct sk_buff *skb;
        u32 dnode;
@@ -2049,21 +2150,24 @@ restart:
                                kfree_skb(skb);
                                goto restart;
                        }
-                       if (tipc_msg_reverse(skb, &dnode, TIPC_CONN_SHUTDOWN))
-                               tipc_link_xmit_skb(skb, dnode, tsk->ref);
-                       tipc_node_remove_conn(dnode, tsk->ref);
+                       if (tipc_msg_reverse(tsk_own_node(tsk), skb, &dnode,
+                                            TIPC_CONN_SHUTDOWN))
+                               tipc_link_xmit_skb(net, skb, dnode,
+                                                  tsk->portid);
+                       tipc_node_remove_conn(net, dnode, tsk->portid);
                } else {
                        dnode = tsk_peer_node(tsk);
+
                        skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE,
                                              TIPC_CONN_MSG, SHORT_H_SIZE,
-                                             0, dnode, tipc_own_addr,
+                                             0, dnode, tsk_own_node(tsk),
                                              tsk_peer_port(tsk),
-                                             tsk->ref, TIPC_CONN_SHUTDOWN);
-                       tipc_link_xmit_skb(skb, dnode, tsk->ref);
+                                             tsk->portid, TIPC_CONN_SHUTDOWN);
+                       tipc_link_xmit_skb(net, skb, dnode, tsk->portid);
                }
                tsk->connected = 0;
                sock->state = SS_DISCONNECTING;
-               tipc_node_remove_conn(dnode, tsk->ref);
+               tipc_node_remove_conn(net, dnode, tsk->portid);
                /* fall through */
 
        case SS_DISCONNECTING:
@@ -2084,18 +2188,14 @@ restart:
        return res;
 }
 
-static void tipc_sk_timeout(unsigned long ref)
+static void tipc_sk_timeout(unsigned long data)
 {
-       struct tipc_sock *tsk;
-       struct sock *sk;
+       struct tipc_sock *tsk = (struct tipc_sock *)data;
+       struct sock *sk = &tsk->sk;
        struct sk_buff *skb = NULL;
        u32 peer_port, peer_node;
+       u32 own_node = tsk_own_node(tsk);
 
-       tsk = tipc_sk_get(ref);
-       if (!tsk)
-               return;
-
-       sk = &tsk->sk;
        bh_lock_sock(sk);
        if (!tsk->connected) {
                bh_unlock_sock(sk);
@@ -2106,38 +2206,39 @@ static void tipc_sk_timeout(unsigned long ref)
 
        if (tsk->probing_state == TIPC_CONN_PROBING) {
                /* Previous probe not answered -> self abort */
-               skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, TIPC_CONN_MSG,
-                                     SHORT_H_SIZE, 0, tipc_own_addr,
-                                     peer_node, ref, peer_port,
-                                     TIPC_ERR_NO_PORT);
+               skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE,
+                                     TIPC_CONN_MSG, SHORT_H_SIZE, 0,
+                                     own_node, peer_node, tsk->portid,
+                                     peer_port, TIPC_ERR_NO_PORT);
        } else {
-               skb = tipc_msg_create(CONN_MANAGER, CONN_PROBE, INT_H_SIZE,
-                                     0, peer_node, tipc_own_addr,
-                                     peer_port, ref, TIPC_OK);
+               skb = tipc_msg_create(CONN_MANAGER, CONN_PROBE,
+                                     INT_H_SIZE, 0, peer_node, own_node,
+                                     peer_port, tsk->portid, TIPC_OK);
                tsk->probing_state = TIPC_CONN_PROBING;
-               k_start_timer(&tsk->timer, tsk->probing_interval);
+               sk_reset_timer(sk, &sk->sk_timer, jiffies + tsk->probing_intv);
        }
        bh_unlock_sock(sk);
        if (skb)
-               tipc_link_xmit_skb(skb, peer_node, ref);
+               tipc_link_xmit_skb(sock_net(sk), skb, peer_node, tsk->portid);
 exit:
-       tipc_sk_put(tsk);
+       sock_put(sk);
 }
 
 static int tipc_sk_publish(struct tipc_sock *tsk, uint scope,
                           struct tipc_name_seq const *seq)
 {
+       struct net *net = sock_net(&tsk->sk);
        struct publication *publ;
        u32 key;
 
        if (tsk->connected)
                return -EINVAL;
-       key = tsk->ref + tsk->pub_count + 1;
-       if (key == tsk->ref)
+       key = tsk->portid + tsk->pub_count + 1;
+       if (key == tsk->portid)
                return -EADDRINUSE;
 
-       publ = tipc_nametbl_publish(seq->type, seq->lower, seq->upper,
-                                   scope, tsk->ref, key);
+       publ = tipc_nametbl_publish(net, seq->type, seq->lower, seq->upper,
+                                   scope, tsk->portid, key);
        if (unlikely(!publ))
                return -EINVAL;
 
@@ -2150,6 +2251,7 @@ static int tipc_sk_publish(struct tipc_sock *tsk, uint scope,
 static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope,
                            struct tipc_name_seq const *seq)
 {
+       struct net *net = sock_net(&tsk->sk);
        struct publication *publ;
        struct publication *safe;
        int rc = -EINVAL;
@@ -2164,12 +2266,12 @@ static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope,
                                continue;
                        if (publ->upper != seq->upper)
                                break;
-                       tipc_nametbl_withdraw(publ->type, publ->lower,
+                       tipc_nametbl_withdraw(net, publ->type, publ->lower,
                                              publ->ref, publ->key);
                        rc = 0;
                        break;
                }
-               tipc_nametbl_withdraw(publ->type, publ->lower,
+               tipc_nametbl_withdraw(net, publ->type, publ->lower,
                                      publ->ref, publ->key);
                rc = 0;
        }
@@ -2178,336 +2280,105 @@ static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope,
        return rc;
 }
 
-static int tipc_sk_show(struct tipc_sock *tsk, char *buf,
-                       int len, int full_id)
-{
-       struct publication *publ;
-       int ret;
-
-       if (full_id)
-               ret = tipc_snprintf(buf, len, "<%u.%u.%u:%u>:",
-                                   tipc_zone(tipc_own_addr),
-                                   tipc_cluster(tipc_own_addr),
-                                   tipc_node(tipc_own_addr), tsk->ref);
-       else
-               ret = tipc_snprintf(buf, len, "%-10u:", tsk->ref);
-
-       if (tsk->connected) {
-               u32 dport = tsk_peer_port(tsk);
-               u32 destnode = tsk_peer_node(tsk);
-
-               ret += tipc_snprintf(buf + ret, len - ret,
-                                    " connected to <%u.%u.%u:%u>",
-                                    tipc_zone(destnode),
-                                    tipc_cluster(destnode),
-                                    tipc_node(destnode), dport);
-               if (tsk->conn_type != 0)
-                       ret += tipc_snprintf(buf + ret, len - ret,
-                                            " via {%u,%u}", tsk->conn_type,
-                                            tsk->conn_instance);
-       } else if (tsk->published) {
-               ret += tipc_snprintf(buf + ret, len - ret, " bound to");
-               list_for_each_entry(publ, &tsk->publications, pport_list) {
-                       if (publ->lower == publ->upper)
-                               ret += tipc_snprintf(buf + ret, len - ret,
-                                                    " {%u,%u}", publ->type,
-                                                    publ->lower);
-                       else
-                               ret += tipc_snprintf(buf + ret, len - ret,
-                                                    " {%u,%u,%u}", publ->type,
-                                                    publ->lower, publ->upper);
-               }
-       }
-       ret += tipc_snprintf(buf + ret, len - ret, "\n");
-       return ret;
-}
-
-struct sk_buff *tipc_sk_socks_show(void)
-{
-       struct sk_buff *buf;
-       struct tlv_desc *rep_tlv;
-       char *pb;
-       int pb_len;
-       struct tipc_sock *tsk;
-       int str_len = 0;
-       u32 ref = 0;
-
-       buf = tipc_cfg_reply_alloc(TLV_SPACE(ULTRA_STRING_MAX_LEN));
-       if (!buf)
-               return NULL;
-       rep_tlv = (struct tlv_desc *)buf->data;
-       pb = TLV_DATA(rep_tlv);
-       pb_len = ULTRA_STRING_MAX_LEN;
-
-       tsk = tipc_sk_get_next(&ref);
-       for (; tsk; tsk = tipc_sk_get_next(&ref)) {
-               lock_sock(&tsk->sk);
-               str_len += tipc_sk_show(tsk, pb + str_len,
-                                       pb_len - str_len, 0);
-               release_sock(&tsk->sk);
-               tipc_sk_put(tsk);
-       }
-       str_len += 1;   /* for "\0" */
-       skb_put(buf, TLV_SPACE(str_len));
-       TLV_SET(rep_tlv, TIPC_TLV_ULTRA_STRING, NULL, str_len);
-
-       return buf;
-}
-
 /* tipc_sk_reinit: set non-zero address in all existing sockets
  *                 when we go from standalone to network mode.
  */
-void tipc_sk_reinit(void)
+void tipc_sk_reinit(struct net *net)
 {
+       struct tipc_net *tn = net_generic(net, tipc_net_id);
+       const struct bucket_table *tbl;
+       struct rhash_head *pos;
+       struct tipc_sock *tsk;
        struct tipc_msg *msg;
-       u32 ref = 0;
-       struct tipc_sock *tsk = tipc_sk_get_next(&ref);
+       int i;
 
-       for (; tsk; tsk = tipc_sk_get_next(&ref)) {
-               lock_sock(&tsk->sk);
-               msg = &tsk->phdr;
-               msg_set_prevnode(msg, tipc_own_addr);
-               msg_set_orignode(msg, tipc_own_addr);
-               release_sock(&tsk->sk);
-               tipc_sk_put(tsk);
+       rcu_read_lock();
+       tbl = rht_dereference_rcu((&tn->sk_rht)->tbl, &tn->sk_rht);
+       for (i = 0; i < tbl->size; i++) {
+               rht_for_each_entry_rcu(tsk, pos, tbl, i, node) {
+                       spin_lock_bh(&tsk->sk.sk_lock.slock);
+                       msg = &tsk->phdr;
+                       msg_set_prevnode(msg, tn->own_addr);
+                       msg_set_orignode(msg, tn->own_addr);
+                       spin_unlock_bh(&tsk->sk.sk_lock.slock);
+               }
        }
+       rcu_read_unlock();
 }
 
-/**
- * struct reference - TIPC socket reference entry
- * @tsk: pointer to socket associated with reference entry
- * @ref: reference value for socket (combines instance & array index info)
- */
-struct reference {
-       struct tipc_sock *tsk;
-       u32 ref;
-};
-
-/**
- * struct tipc_ref_table - table of TIPC socket reference entries
- * @entries: pointer to array of reference entries
- * @capacity: array index of first unusable entry
- * @init_point: array index of first uninitialized entry
- * @first_free: array index of first unused socket reference entry
- * @last_free: array index of last unused socket reference entry
- * @index_mask: bitmask for array index portion of reference values
- * @start_mask: initial value for instance value portion of reference values
- */
-struct ref_table {
-       struct reference *entries;
-       u32 capacity;
-       u32 init_point;
-       u32 first_free;
-       u32 last_free;
-       u32 index_mask;
-       u32 start_mask;
-};
-
-/* Socket reference table consists of 2**N entries.
- *
- * State       Socket ptr      Reference
- * -----        ----------      ---------
- * In use        non-NULL       XXXX|own index
- *                             (XXXX changes each time entry is acquired)
- * Free            NULL         YYYY|next free index
- *                             (YYYY is one more than last used XXXX)
- * Uninitialized   NULL         0
- *
- * Entry 0 is not used; this allows index 0 to denote the end of the free list.
- *
- * Note that a reference value of 0 does not necessarily indicate that an
- * entry is uninitialized, since the last entry in the free list could also
- * have a reference value of 0 (although this is unlikely).
- */
-
-static struct ref_table tipc_ref_table;
-
-static DEFINE_RWLOCK(ref_table_lock);
-
-/**
- * tipc_ref_table_init - create reference table for sockets
- */
-int tipc_sk_ref_table_init(u32 req_sz, u32 start)
+static struct tipc_sock *tipc_sk_lookup(struct net *net, u32 portid)
 {
-       struct reference *table;
-       u32 actual_sz;
-
-       /* account for unused entry, then round up size to a power of 2 */
-
-       req_sz++;
-       for (actual_sz = 16; actual_sz < req_sz; actual_sz <<= 1) {
-               /* do nothing */
-       };
-
-       /* allocate table & mark all entries as uninitialized */
-       table = vzalloc(actual_sz * sizeof(struct reference));
-       if (table == NULL)
-               return -ENOMEM;
-
-       tipc_ref_table.entries = table;
-       tipc_ref_table.capacity = req_sz;
-       tipc_ref_table.init_point = 1;
-       tipc_ref_table.first_free = 0;
-       tipc_ref_table.last_free = 0;
-       tipc_ref_table.index_mask = actual_sz - 1;
-       tipc_ref_table.start_mask = start & ~tipc_ref_table.index_mask;
+       struct tipc_net *tn = net_generic(net, tipc_net_id);
+       struct tipc_sock *tsk;
 
-       return 0;
-}
+       rcu_read_lock();
+       tsk = rhashtable_lookup(&tn->sk_rht, &portid);
+       if (tsk)
+               sock_hold(&tsk->sk);
+       rcu_read_unlock();
 
-/**
- * tipc_ref_table_stop - destroy reference table for sockets
- */
-void tipc_sk_ref_table_stop(void)
-{
-       if (!tipc_ref_table.entries)
-               return;
-       vfree(tipc_ref_table.entries);
-       tipc_ref_table.entries = NULL;
+       return tsk;
 }
 
-/* tipc_ref_acquire - create reference to a socket
- *
- * Register an socket pointer in the reference table.
- * Returns a unique reference value that is used from then on to retrieve the
- * socket pointer, or to determine if the socket has been deregistered.
- */
-u32 tipc_sk_ref_acquire(struct tipc_sock *tsk)
+static int tipc_sk_insert(struct tipc_sock *tsk)
 {
-       u32 index;
-       u32 index_mask;
-       u32 next_plus_upper;
-       u32 ref = 0;
-       struct reference *entry;
-
-       if (unlikely(!tsk)) {
-               pr_err("Attempt to acquire ref. to non-existent obj\n");
-               return 0;
-       }
-       if (unlikely(!tipc_ref_table.entries)) {
-               pr_err("Ref. table not found in acquisition attempt\n");
-               return 0;
-       }
-
-       /* Take a free entry, if available; otherwise initialize a new one */
-       write_lock_bh(&ref_table_lock);
-       index = tipc_ref_table.first_free;
-       entry = &tipc_ref_table.entries[index];
-
-       if (likely(index)) {
-               index = tipc_ref_table.first_free;
-               entry = &tipc_ref_table.entries[index];
-               index_mask = tipc_ref_table.index_mask;
-               next_plus_upper = entry->ref;
-               tipc_ref_table.first_free = next_plus_upper & index_mask;
-               ref = (next_plus_upper & ~index_mask) + index;
-               entry->tsk = tsk;
-       } else if (tipc_ref_table.init_point < tipc_ref_table.capacity) {
-               index = tipc_ref_table.init_point++;
-               entry = &tipc_ref_table.entries[index];
-               ref = tipc_ref_table.start_mask + index;
+       struct sock *sk = &tsk->sk;
+       struct net *net = sock_net(sk);
+       struct tipc_net *tn = net_generic(net, tipc_net_id);
+       u32 remaining = (TIPC_MAX_PORT - TIPC_MIN_PORT) + 1;
+       u32 portid = prandom_u32() % remaining + TIPC_MIN_PORT;
+
+       while (remaining--) {
+               portid++;
+               if ((portid < TIPC_MIN_PORT) || (portid > TIPC_MAX_PORT))
+                       portid = TIPC_MIN_PORT;
+               tsk->portid = portid;
+               sock_hold(&tsk->sk);
+               if (rhashtable_lookup_insert(&tn->sk_rht, &tsk->node))
+                       return 0;
+               sock_put(&tsk->sk);
        }
 
-       if (ref) {
-               entry->ref = ref;
-               entry->tsk = tsk;
-       }
-       write_unlock_bh(&ref_table_lock);
-       return ref;
+       return -1;
 }
 
-/* tipc_sk_ref_discard - invalidate reference to an socket
- *
- * Disallow future references to an socket and free up the entry for re-use.
- */
-void tipc_sk_ref_discard(u32 ref)
+static void tipc_sk_remove(struct tipc_sock *tsk)
 {
-       struct reference *entry;
-       u32 index;
-       u32 index_mask;
-
-       if (unlikely(!tipc_ref_table.entries)) {
-               pr_err("Ref. table not found during discard attempt\n");
-               return;
-       }
-
-       index_mask = tipc_ref_table.index_mask;
-       index = ref & index_mask;
-       entry = &tipc_ref_table.entries[index];
-
-       write_lock_bh(&ref_table_lock);
+       struct sock *sk = &tsk->sk;
+       struct tipc_net *tn = net_generic(sock_net(sk), tipc_net_id);
 
-       if (unlikely(!entry->tsk)) {
-               pr_err("Attempt to discard ref. to non-existent socket\n");
-               goto exit;
+       if (rhashtable_remove(&tn->sk_rht, &tsk->node)) {
+               WARN_ON(atomic_read(&sk->sk_refcnt) == 1);
+               __sock_put(sk);
        }
-       if (unlikely(entry->ref != ref)) {
-               pr_err("Attempt to discard non-existent reference\n");
-               goto exit;
-       }
-
-       /* Mark entry as unused; increment instance part of entry's
-        *   reference to invalidate any subsequent references
-        */
-
-       entry->tsk = NULL;
-       entry->ref = (ref & ~index_mask) + (index_mask + 1);
-
-       /* Append entry to free entry list */
-       if (unlikely(tipc_ref_table.first_free == 0))
-               tipc_ref_table.first_free = index;
-       else
-               tipc_ref_table.entries[tipc_ref_table.last_free].ref |= index;
-       tipc_ref_table.last_free = index;
-exit:
-       write_unlock_bh(&ref_table_lock);
 }
 
-/* tipc_sk_get - find referenced socket and return pointer to it
- */
-struct tipc_sock *tipc_sk_get(u32 ref)
+int tipc_sk_rht_init(struct net *net)
 {
-       struct reference *entry;
-       struct tipc_sock *tsk;
+       struct tipc_net *tn = net_generic(net, tipc_net_id);
+       struct rhashtable_params rht_params = {
+               .nelem_hint = 192,
+               .head_offset = offsetof(struct tipc_sock, node),
+               .key_offset = offsetof(struct tipc_sock, portid),
+               .key_len = sizeof(u32), /* portid */
+               .hashfn = jhash,
+               .max_shift = 20, /* 1M */
+               .min_shift = 8,  /* 256 */
+               .grow_decision = rht_grow_above_75,
+               .shrink_decision = rht_shrink_below_30,
+       };
 
-       if (unlikely(!tipc_ref_table.entries))
-               return NULL;
-       read_lock_bh(&ref_table_lock);
-       entry = &tipc_ref_table.entries[ref & tipc_ref_table.index_mask];
-       tsk = entry->tsk;
-       if (likely(tsk && (entry->ref == ref)))
-               sock_hold(&tsk->sk);
-       else
-               tsk = NULL;
-       read_unlock_bh(&ref_table_lock);
-       return tsk;
+       return rhashtable_init(&tn->sk_rht, &rht_params);
 }
 
-/* tipc_sk_get_next - lock & return next socket after referenced one
-*/
-struct tipc_sock *tipc_sk_get_next(u32 *ref)
+void tipc_sk_rht_destroy(struct net *net)
 {
-       struct reference *entry;
-       struct tipc_sock *tsk = NULL;
-       uint index = *ref & tipc_ref_table.index_mask;
+       struct tipc_net *tn = net_generic(net, tipc_net_id);
 
-       read_lock_bh(&ref_table_lock);
-       while (++index < tipc_ref_table.capacity) {
-               entry = &tipc_ref_table.entries[index];
-               if (!entry->tsk)
-                       continue;
-               tsk = entry->tsk;
-               sock_hold(&tsk->sk);
-               *ref = entry->ref;
-               break;
-       }
-       read_unlock_bh(&ref_table_lock);
-       return tsk;
-}
+       /* Wait for socket readers to complete */
+       synchronize_net();
 
-static void tipc_sk_put(struct tipc_sock *tsk)
-{
-       sock_put(&tsk->sk);
+       rhashtable_destroy(&tn->sk_rht);
 }
 
 /**
@@ -2639,8 +2510,9 @@ static int tipc_getsockopt(struct socket *sock, int lvl, int opt,
        return put_user(sizeof(value), ol);
 }
 
-static int tipc_ioctl(struct socket *sk, unsigned int cmd, unsigned long arg)
+static int tipc_ioctl(struct socket *sock, unsigned int cmd, unsigned long arg)
 {
+       struct sock *sk = sock->sk;
        struct tipc_sioc_ln_req lnr;
        void __user *argp = (void __user *)arg;
 
@@ -2648,7 +2520,8 @@ static int tipc_ioctl(struct socket *sk, unsigned int cmd, unsigned long arg)
        case SIOCGETLINKNAME:
                if (copy_from_user(&lnr, argp, sizeof(lnr)))
                        return -EFAULT;
-               if (!tipc_node_get_linkname(lnr.bearer_id & 0xffff, lnr.peer,
+               if (!tipc_node_get_linkname(sock_net(sk),
+                                           lnr.bearer_id & 0xffff, lnr.peer,
                                            lnr.linkname, TIPC_MAX_LINK_NAME)) {
                        if (copy_to_user(argp, &lnr, sizeof(lnr)))
                                return -EFAULT;
@@ -2820,18 +2693,20 @@ static int __tipc_nl_add_sk(struct sk_buff *skb, struct netlink_callback *cb,
        int err;
        void *hdr;
        struct nlattr *attrs;
+       struct net *net = sock_net(skb->sk);
+       struct tipc_net *tn = net_generic(net, tipc_net_id);
 
        hdr = genlmsg_put(skb, NETLINK_CB(cb->skb).portid, cb->nlh->nlmsg_seq,
-                         &tipc_genl_v2_family, NLM_F_MULTI, TIPC_NL_SOCK_GET);
+                         &tipc_genl_family, NLM_F_MULTI, TIPC_NL_SOCK_GET);
        if (!hdr)
                goto msg_cancel;
 
        attrs = nla_nest_start(skb, TIPC_NLA_SOCK);
        if (!attrs)
                goto genlmsg_cancel;
-       if (nla_put_u32(skb, TIPC_NLA_SOCK_REF, tsk->ref))
+       if (nla_put_u32(skb, TIPC_NLA_SOCK_REF, tsk->portid))
                goto attr_msg_cancel;
-       if (nla_put_u32(skb, TIPC_NLA_SOCK_ADDR, tipc_own_addr))
+       if (nla_put_u32(skb, TIPC_NLA_SOCK_ADDR, tn->own_addr))
                goto attr_msg_cancel;
 
        if (tsk->connected) {
@@ -2859,22 +2734,37 @@ int tipc_nl_sk_dump(struct sk_buff *skb, struct netlink_callback *cb)
 {
        int err;
        struct tipc_sock *tsk;
-       u32 prev_ref = cb->args[0];
-       u32 ref = prev_ref;
-
-       tsk = tipc_sk_get_next(&ref);
-       for (; tsk; tsk = tipc_sk_get_next(&ref)) {
-               lock_sock(&tsk->sk);
-               err = __tipc_nl_add_sk(skb, cb, tsk);
-               release_sock(&tsk->sk);
-               tipc_sk_put(tsk);
-               if (err)
-                       break;
+       const struct bucket_table *tbl;
+       struct rhash_head *pos;
+       struct net *net = sock_net(skb->sk);
+       struct tipc_net *tn = net_generic(net, tipc_net_id);
+       u32 tbl_id = cb->args[0];
+       u32 prev_portid = cb->args[1];
 
-               prev_ref = ref;
-       }
+       rcu_read_lock();
+       tbl = rht_dereference_rcu((&tn->sk_rht)->tbl, &tn->sk_rht);
+       for (; tbl_id < tbl->size; tbl_id++) {
+               rht_for_each_entry_rcu(tsk, pos, tbl, tbl_id, node) {
+                       spin_lock_bh(&tsk->sk.sk_lock.slock);
+                       if (prev_portid && prev_portid != tsk->portid) {
+                               spin_unlock_bh(&tsk->sk.sk_lock.slock);
+                               continue;
+                       }
 
-       cb->args[0] = prev_ref;
+                       err = __tipc_nl_add_sk(skb, cb, tsk);
+                       if (err) {
+                               prev_portid = tsk->portid;
+                               spin_unlock_bh(&tsk->sk.sk_lock.slock);
+                               goto out;
+                       }
+                       prev_portid = 0;
+                       spin_unlock_bh(&tsk->sk.sk_lock.slock);
+               }
+       }
+out:
+       rcu_read_unlock();
+       cb->args[0] = tbl_id;
+       cb->args[1] = prev_portid;
 
        return skb->len;
 }
@@ -2888,7 +2778,7 @@ static int __tipc_nl_add_sk_publ(struct sk_buff *skb,
        struct nlattr *attrs;
 
        hdr = genlmsg_put(skb, NETLINK_CB(cb->skb).portid, cb->nlh->nlmsg_seq,
-                         &tipc_genl_v2_family, NLM_F_MULTI, TIPC_NL_PUBL_GET);
+                         &tipc_genl_family, NLM_F_MULTI, TIPC_NL_PUBL_GET);
        if (!hdr)
                goto msg_cancel;
 
@@ -2962,12 +2852,13 @@ static int __tipc_nl_list_sk_publ(struct sk_buff *skb,
 int tipc_nl_publ_dump(struct sk_buff *skb, struct netlink_callback *cb)
 {
        int err;
-       u32 tsk_ref = cb->args[0];
+       u32 tsk_portid = cb->args[0];
        u32 last_publ = cb->args[1];
        u32 done = cb->args[2];
+       struct net *net = sock_net(skb->sk);
        struct tipc_sock *tsk;
 
-       if (!tsk_ref) {
+       if (!tsk_portid) {
                struct nlattr **attrs;
                struct nlattr *sock[TIPC_NLA_SOCK_MAX + 1];
 
@@ -2984,13 +2875,13 @@ int tipc_nl_publ_dump(struct sk_buff *skb, struct netlink_callback *cb)
                if (!sock[TIPC_NLA_SOCK_REF])
                        return -EINVAL;
 
-               tsk_ref = nla_get_u32(sock[TIPC_NLA_SOCK_REF]);
+               tsk_portid = nla_get_u32(sock[TIPC_NLA_SOCK_REF]);
        }
 
        if (done)
                return 0;
 
-       tsk = tipc_sk_get(tsk_ref);
+       tsk = tipc_sk_lookup(net, tsk_portid);
        if (!tsk)
                return -EINVAL;
 
@@ -2999,9 +2890,9 @@ int tipc_nl_publ_dump(struct sk_buff *skb, struct netlink_callback *cb)
        if (!err)
                done = 1;
        release_sock(&tsk->sk);
-       tipc_sk_put(tsk);
+       sock_put(&tsk->sk);
 
-       cb->args[0] = tsk_ref;
+       cb->args[0] = tsk_portid;
        cb->args[1] = last_publ;
        cb->args[2] = done;