Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/s390/linux
[cascardo/linux.git] / net / tipc / socket.c
index 679a220..f73e975 100644 (file)
@@ -1,7 +1,7 @@
 /*
  * net/tipc/socket.c: TIPC socket API
  *
- * Copyright (c) 2001-2007, 2012-2014, Ericsson AB
+ * Copyright (c) 2001-2007, 2012-2015, Ericsson AB
  * Copyright (c) 2004-2008, 2010-2013, Wind River Systems
  * All rights reserved.
  *
@@ -40,7 +40,7 @@
 #include "name_table.h"
 #include "node.h"
 #include "link.h"
-#include "config.h"
+#include "name_distr.h"
 #include "socket.h"
 
 #define SS_LISTENING           -1      /* socket is listening */
@@ -69,8 +69,6 @@
  * @pub_count: total # of publications port has made during its lifetime
  * @probing_state:
  * @probing_intv:
- * @port: port - interacts with 'sk' and with the rest of the TIPC stack
- * @peer_name: the peer of the connection, if any
  * @conn_timeout: the time we can wait for an unresponded setup request
  * @dupl_rcvcnt: number of bytes counted twice, in both backlog and rcv queue
  * @link_cong: non-zero if owner must sleep because of link congestion
@@ -177,6 +175,11 @@ static const struct nla_policy tipc_nl_sock_policy[TIPC_NLA_SOCK_MAX + 1] = {
  *   - port reference
  */
 
+static u32 tsk_own_node(struct tipc_sock *tsk)
+{
+       return msg_prevnode(&tsk->phdr);
+}
+
 static u32 tsk_peer_node(struct tipc_sock *tsk)
 {
        return msg_destnode(&tsk->phdr);
@@ -249,11 +252,11 @@ static void tsk_rej_rx_queue(struct sock *sk)
 {
        struct sk_buff *skb;
        u32 dnode;
-       struct net *net = sock_net(sk);
+       u32 own_node = tsk_own_node(tipc_sk(sk));
 
        while ((skb = __skb_dequeue(&sk->sk_receive_queue))) {
-               if (tipc_msg_reverse(net, skb, &dnode, TIPC_ERR_NO_PORT))
-                       tipc_link_xmit_skb(net, skb, dnode, 0);
+               if (tipc_msg_reverse(own_node, skb, &dnode, TIPC_ERR_NO_PORT))
+                       tipc_link_xmit_skb(sock_net(sk), skb, dnode, 0);
        }
 }
 
@@ -305,6 +308,7 @@ static bool tsk_peer_msg(struct tipc_sock *tsk, struct tipc_msg *msg)
 static int tipc_sk_create(struct net *net, struct socket *sock,
                          int protocol, int kern)
 {
+       struct tipc_net *tn;
        const struct proto_ops *ops;
        socket_state state;
        struct sock *sk;
@@ -346,7 +350,8 @@ static int tipc_sk_create(struct net *net, struct socket *sock,
        tsk->max_pkt = MAX_PKT_DEFAULT;
        INIT_LIST_HEAD(&tsk->publications);
        msg = &tsk->phdr;
-       tipc_msg_init(net, msg, TIPC_LOW_IMPORTANCE, TIPC_NAMED_MSG,
+       tn = net_generic(sock_net(sk), tipc_net_id);
+       tipc_msg_init(tn->own_addr, msg, TIPC_LOW_IMPORTANCE, TIPC_NAMED_MSG,
                      NAMED_H_SIZE, 0);
 
        /* Finish initializing socket data structures */
@@ -471,7 +476,6 @@ static int tipc_release(struct socket *sock)
 {
        struct sock *sk = sock->sk;
        struct net *net;
-       struct tipc_net *tn;
        struct tipc_sock *tsk;
        struct sk_buff *skb;
        u32 dnode, probing_state;
@@ -484,8 +488,6 @@ static int tipc_release(struct socket *sock)
                return 0;
 
        net = sock_net(sk);
-       tn = net_generic(net, tipc_net_id);
-
        tsk = tipc_sk(sk);
        lock_sock(sk);
 
@@ -507,7 +509,7 @@ static int tipc_release(struct socket *sock)
                                tsk->connected = 0;
                                tipc_node_remove_conn(net, dnode, tsk->portid);
                        }
-                       if (tipc_msg_reverse(net, skb, &dnode,
+                       if (tipc_msg_reverse(tsk_own_node(tsk), skb, &dnode,
                                             TIPC_ERR_NO_PORT))
                                tipc_link_xmit_skb(net, skb, dnode, 0);
                }
@@ -520,9 +522,9 @@ static int tipc_release(struct socket *sock)
                sock_put(sk);
        tipc_sk_remove(tsk);
        if (tsk->connected) {
-               skb = tipc_msg_create(net, TIPC_CRITICAL_IMPORTANCE,
+               skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE,
                                      TIPC_CONN_MSG, SHORT_H_SIZE, 0, dnode,
-                                     tn->own_addr, tsk_peer_port(tsk),
+                                     tsk_own_node(tsk), tsk_peer_port(tsk),
                                      tsk->portid, TIPC_ERR_NO_PORT);
                if (skb)
                        tipc_link_xmit_skb(net, skb, dnode, tsk->portid);
@@ -730,9 +732,11 @@ static int tipc_sendmcast(struct  socket *sock, struct tipc_name_seq *seq,
                          struct msghdr *msg, size_t dsz, long timeo)
 {
        struct sock *sk = sock->sk;
+       struct tipc_sock *tsk = tipc_sk(sk);
        struct net *net = sock_net(sk);
-       struct tipc_msg *mhdr = &tipc_sk(sk)->phdr;
-       struct sk_buff_head head;
+       struct tipc_msg *mhdr = &tsk->phdr;
+       struct sk_buff_head *pktchain = &sk->sk_write_queue;
+       struct iov_iter save = msg->msg_iter;
        uint mtu;
        int rc;
 
@@ -747,80 +751,97 @@ static int tipc_sendmcast(struct  socket *sock, struct tipc_name_seq *seq,
 
 new_mtu:
        mtu = tipc_bclink_get_mtu();
-       __skb_queue_head_init(&head);
-       rc = tipc_msg_build(net, mhdr, msg, 0, dsz, mtu, &head);
+       rc = tipc_msg_build(mhdr, msg, 0, dsz, mtu, pktchain);
        if (unlikely(rc < 0))
                return rc;
 
        do {
-               rc = tipc_bclink_xmit(net, &head);
+               rc = tipc_bclink_xmit(net, pktchain);
                if (likely(rc >= 0)) {
                        rc = dsz;
                        break;
                }
-               if (rc == -EMSGSIZE)
+               if (rc == -EMSGSIZE) {
+                       msg->msg_iter = save;
                        goto new_mtu;
+               }
                if (rc != -ELINKCONG)
                        break;
                tipc_sk(sk)->link_cong = 1;
                rc = tipc_wait_for_sndmsg(sock, &timeo);
                if (rc)
-                       __skb_queue_purge(&head);
+                       __skb_queue_purge(pktchain);
        } while (!rc);
        return rc;
 }
 
-/* tipc_sk_mcast_rcv - Deliver multicast message to all destination sockets
+/**
+ * tipc_sk_mcast_rcv - Deliver multicast messages to all destination sockets
+ * @arrvq: queue with arriving messages, to be cloned after destination lookup
+ * @inputq: queue with cloned messages, delivered to socket after dest lookup
+ *
+ * Multi-threaded: parallel calls with reference to same queues may occur
  */
-void tipc_sk_mcast_rcv(struct net *net, struct sk_buff *buf)
+void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
+                      struct sk_buff_head *inputq)
 {
-       struct tipc_msg *msg = buf_msg(buf);
-       struct tipc_port_list dports = {0, NULL, };
-       struct tipc_port_list *item;
-       struct sk_buff *b;
-       uint i, last, dst = 0;
+       struct tipc_msg *msg;
+       struct tipc_plist dports;
+       u32 portid;
        u32 scope = TIPC_CLUSTER_SCOPE;
-
-       if (in_own_node(net, msg_orignode(msg)))
-               scope = TIPC_NODE_SCOPE;
-
-       /* Create destination port list: */
-       tipc_nametbl_mc_translate(net, msg_nametype(msg), msg_namelower(msg),
-                                 msg_nameupper(msg), scope, &dports);
-       last = dports.count;
-       if (!last) {
-               kfree_skb(buf);
-               return;
-       }
-
-       for (item = &dports; item; item = item->next) {
-               for (i = 0; i < PLSIZE && ++dst <= last; i++) {
-                       b = (dst != last) ? skb_clone(buf, GFP_ATOMIC) : buf;
-                       if (!b) {
-                               pr_warn("Failed do clone mcast rcv buffer\n");
+       struct sk_buff_head tmpq;
+       uint hsz;
+       struct sk_buff *skb, *_skb;
+
+       __skb_queue_head_init(&tmpq);
+       tipc_plist_init(&dports);
+
+       skb = tipc_skb_peek(arrvq, &inputq->lock);
+       for (; skb; skb = tipc_skb_peek(arrvq, &inputq->lock)) {
+               msg = buf_msg(skb);
+               hsz = skb_headroom(skb) + msg_hdr_sz(msg);
+
+               if (in_own_node(net, msg_orignode(msg)))
+                       scope = TIPC_NODE_SCOPE;
+
+               /* Create destination port list and message clones: */
+               tipc_nametbl_mc_translate(net,
+                                         msg_nametype(msg), msg_namelower(msg),
+                                         msg_nameupper(msg), scope, &dports);
+               portid = tipc_plist_pop(&dports);
+               for (; portid; portid = tipc_plist_pop(&dports)) {
+                       _skb = __pskb_copy(skb, hsz, GFP_ATOMIC);
+                       if (_skb) {
+                               msg_set_destport(buf_msg(_skb), portid);
+                               __skb_queue_tail(&tmpq, _skb);
                                continue;
                        }
-                       msg_set_destport(msg, item->ports[i]);
-                       tipc_sk_rcv(net, b);
+                       pr_warn("Failed to clone mcast rcv buffer\n");
+               }
+               /* Append to inputq if not already done by other thread */
+               spin_lock_bh(&inputq->lock);
+               if (skb_peek(arrvq) == skb) {
+                       skb_queue_splice_tail_init(&tmpq, inputq);
+                       kfree_skb(__skb_dequeue(arrvq));
                }
+               spin_unlock_bh(&inputq->lock);
+               __skb_queue_purge(&tmpq);
+               kfree_skb(skb);
        }
-       tipc_port_list_free(&dports);
+       tipc_sk_rcv(net, inputq);
 }
 
 /**
  * tipc_sk_proto_rcv - receive a connection mng protocol message
  * @tsk: receiving socket
- * @dnode: node to send response message to, if any
- * @buf: buffer containing protocol message
- * Returns 0 (TIPC_OK) if message was consumed, 1 (TIPC_FWD_MSG) if
- * (CONN_PROBE_REPLY) message should be forwarded.
+ * @skb: pointer to message buffer. Set to NULL if buffer is consumed.
  */
-static int tipc_sk_proto_rcv(struct tipc_sock *tsk, u32 *dnode,
-                            struct sk_buff *buf)
+static void tipc_sk_proto_rcv(struct tipc_sock *tsk, struct sk_buff **skb)
 {
-       struct tipc_msg *msg = buf_msg(buf);
+       struct tipc_msg *msg = buf_msg(*skb);
        int conn_cong;
-
+       u32 dnode;
+       u32 own_node = tsk_own_node(tsk);
        /* Ignore if connection cannot be validated: */
        if (!tsk_peer_msg(tsk, msg))
                goto exit;
@@ -833,15 +854,15 @@ static int tipc_sk_proto_rcv(struct tipc_sock *tsk, u32 *dnode,
                if (conn_cong)
                        tsk->sk.sk_write_space(&tsk->sk);
        } else if (msg_type(msg) == CONN_PROBE) {
-               if (!tipc_msg_reverse(sock_net(&tsk->sk), buf, dnode, TIPC_OK))
-                       return TIPC_OK;
-               msg_set_type(msg, CONN_PROBE_REPLY);
-               return TIPC_FWD_MSG;
+               if (tipc_msg_reverse(own_node, *skb, &dnode, TIPC_OK)) {
+                       msg_set_type(msg, CONN_PROBE_REPLY);
+                       return;
+               }
        }
        /* Do nothing if msg_type() == CONN_PROBE_REPLY */
 exit:
-       kfree_skb(buf);
-       return TIPC_OK;
+       kfree_skb(*skb);
+       *skb = NULL;
 }
 
 static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p)
@@ -892,9 +913,10 @@ static int tipc_sendmsg(struct kiocb *iocb, struct socket *sock,
        struct net *net = sock_net(sk);
        struct tipc_msg *mhdr = &tsk->phdr;
        u32 dnode, dport;
-       struct sk_buff_head head;
+       struct sk_buff_head *pktchain = &sk->sk_write_queue;
        struct sk_buff *skb;
        struct tipc_name_seq *seq = &dest->addr.nameseq;
+       struct iov_iter save;
        u32 mtu;
        long timeo;
        int rc;
@@ -963,31 +985,33 @@ static int tipc_sendmsg(struct kiocb *iocb, struct socket *sock,
                msg_set_hdr_sz(mhdr, BASIC_H_SIZE);
        }
 
+       save = m->msg_iter;
 new_mtu:
        mtu = tipc_node_get_mtu(net, dnode, tsk->portid);
-       __skb_queue_head_init(&head);
-       rc = tipc_msg_build(net, mhdr, m, 0, dsz, mtu, &head);
+       rc = tipc_msg_build(mhdr, m, 0, dsz, mtu, pktchain);
        if (rc < 0)
                goto exit;
 
        do {
-               skb = skb_peek(&head);
+               skb = skb_peek(pktchain);
                TIPC_SKB_CB(skb)->wakeup_pending = tsk->link_cong;
-               rc = tipc_link_xmit(net, &head, dnode, tsk->portid);
+               rc = tipc_link_xmit(net, pktchain, dnode, tsk->portid);
                if (likely(rc >= 0)) {
                        if (sock->state != SS_READY)
                                sock->state = SS_CONNECTING;
                        rc = dsz;
                        break;
                }
-               if (rc == -EMSGSIZE)
+               if (rc == -EMSGSIZE) {
+                       m->msg_iter = save;
                        goto new_mtu;
+               }
                if (rc != -ELINKCONG)
                        break;
                tsk->link_cong = 1;
                rc = tipc_wait_for_sndmsg(sock, &timeo);
                if (rc)
-                       __skb_queue_purge(&head);
+                       __skb_queue_purge(pktchain);
        } while (!rc);
 exit:
        if (iocb)
@@ -1045,13 +1069,14 @@ static int tipc_send_stream(struct kiocb *iocb, struct socket *sock,
        struct net *net = sock_net(sk);
        struct tipc_sock *tsk = tipc_sk(sk);
        struct tipc_msg *mhdr = &tsk->phdr;
-       struct sk_buff_head head;
+       struct sk_buff_head *pktchain = &sk->sk_write_queue;
        DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
        u32 portid = tsk->portid;
        int rc = -EINVAL;
        long timeo;
        u32 dnode;
        uint mtu, send, sent = 0;
+       struct iov_iter save;
 
        /* Handle implied connection establishment */
        if (unlikely(dest)) {
@@ -1078,15 +1103,15 @@ static int tipc_send_stream(struct kiocb *iocb, struct socket *sock,
        dnode = tsk_peer_node(tsk);
 
 next:
+       save = m->msg_iter;
        mtu = tsk->max_pkt;
        send = min_t(uint, dsz - sent, TIPC_MAX_USER_MSG_SIZE);
-       __skb_queue_head_init(&head);
-       rc = tipc_msg_build(net, mhdr, m, sent, send, mtu, &head);
+       rc = tipc_msg_build(mhdr, m, sent, send, mtu, pktchain);
        if (unlikely(rc < 0))
                goto exit;
        do {
                if (likely(!tsk_conn_cong(tsk))) {
-                       rc = tipc_link_xmit(net, &head, dnode, portid);
+                       rc = tipc_link_xmit(net, pktchain, dnode, portid);
                        if (likely(!rc)) {
                                tsk->sent_unacked++;
                                sent += send;
@@ -1097,6 +1122,7 @@ next:
                        if (rc == -EMSGSIZE) {
                                tsk->max_pkt = tipc_node_get_mtu(net, dnode,
                                                                 portid);
+                               m->msg_iter = save;
                                goto next;
                        }
                        if (rc != -ELINKCONG)
@@ -1105,7 +1131,7 @@ next:
                }
                rc = tipc_wait_for_sndpkt(sock, &timeo);
                if (rc)
-                       __skb_queue_purge(&head);
+                       __skb_queue_purge(pktchain);
        } while (!rc);
 exit:
        if (iocb)
@@ -1253,7 +1279,6 @@ static int tipc_sk_anc_data_recv(struct msghdr *m, struct tipc_msg *msg,
 static void tipc_sk_send_ack(struct tipc_sock *tsk, uint ack)
 {
        struct net *net = sock_net(&tsk->sk);
-       struct tipc_net *tn = net_generic(net, tipc_net_id);
        struct sk_buff *skb = NULL;
        struct tipc_msg *msg;
        u32 peer_port = tsk_peer_port(tsk);
@@ -1261,9 +1286,9 @@ static void tipc_sk_send_ack(struct tipc_sock *tsk, uint ack)
 
        if (!tsk->connected)
                return;
-       skb = tipc_msg_create(net, CONN_MANAGER, CONN_ACK, INT_H_SIZE, 0,
-                             dnode, tn->own_addr, peer_port, tsk->portid,
-                             TIPC_OK);
+       skb = tipc_msg_create(CONN_MANAGER, CONN_ACK, INT_H_SIZE, 0,
+                             dnode, tsk_own_node(tsk), peer_port,
+                             tsk->portid, TIPC_OK);
        if (!skb)
                return;
        msg = buf_msg(skb);
@@ -1554,16 +1579,16 @@ static void tipc_data_ready(struct sock *sk)
 /**
  * filter_connect - Handle all incoming messages for a connection-based socket
  * @tsk: TIPC socket
- * @msg: message
+ * @skb: pointer to message buffer. Set to NULL if buffer is consumed
  *
  * Returns 0 (TIPC_OK) if everything ok, -TIPC_ERR_NO_PORT otherwise
  */
-static int filter_connect(struct tipc_sock *tsk, struct sk_buff **buf)
+static int filter_connect(struct tipc_sock *tsk, struct sk_buff **skb)
 {
        struct sock *sk = &tsk->sk;
        struct net *net = sock_net(sk);
        struct socket *sock = sk->sk_socket;
-       struct tipc_msg *msg = buf_msg(*buf);
+       struct tipc_msg *msg = buf_msg(*skb);
        int retval = -TIPC_ERR_NO_PORT;
 
        if (msg_mcast(msg))
@@ -1613,8 +1638,8 @@ static int filter_connect(struct tipc_sock *tsk, struct sk_buff **buf)
                 * connect() routine if sleeping.
                 */
                if (msg_data_sz(msg) == 0) {
-                       kfree_skb(*buf);
-                       *buf = NULL;
+                       kfree_skb(*skb);
+                       *skb = NULL;
                        if (waitqueue_active(sk_sleep(sk)))
                                wake_up_interruptible(sk_sleep(sk));
                }
@@ -1666,32 +1691,33 @@ static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *buf)
 /**
  * filter_rcv - validate incoming message
  * @sk: socket
- * @buf: message
+ * @skb: pointer to message. Set to NULL if buffer is consumed.
  *
  * Enqueues message on receive queue if acceptable; optionally handles
  * disconnect indication for a connected socket.
  *
- * Called with socket lock already taken; port lock may also be taken.
+ * Called with socket lock already taken
  *
- * Returns 0 (TIPC_OK) if message was consumed, -TIPC error code if message
- * to be rejected, 1 (TIPC_FWD_MSG) if (CONN_MANAGER) message to be forwarded
+ * Returns 0 (TIPC_OK) if message was ok, -TIPC error code if rejected
  */
-static int filter_rcv(struct sock *sk, struct sk_buff *buf)
+static int filter_rcv(struct sock *sk, struct sk_buff **skb)
 {
        struct socket *sock = sk->sk_socket;
        struct tipc_sock *tsk = tipc_sk(sk);
-       struct tipc_msg *msg = buf_msg(buf);
-       unsigned int limit = rcvbuf_limit(sk, buf);
-       u32 onode;
+       struct tipc_msg *msg = buf_msg(*skb);
+       unsigned int limit = rcvbuf_limit(sk, *skb);
        int rc = TIPC_OK;
 
-       if (unlikely(msg_user(msg) == CONN_MANAGER))
-               return tipc_sk_proto_rcv(tsk, &onode, buf);
+       if (unlikely(msg_user(msg) == CONN_MANAGER)) {
+               tipc_sk_proto_rcv(tsk, skb);
+               return TIPC_OK;
+       }
 
        if (unlikely(msg_user(msg) == SOCK_WAKEUP)) {
-               kfree_skb(buf);
+               kfree_skb(*skb);
                tsk->link_cong = 0;
                sk->sk_write_space(sk);
+               *skb = NULL;
                return TIPC_OK;
        }
 
@@ -1703,21 +1729,22 @@ static int filter_rcv(struct sock *sk, struct sk_buff *buf)
                if (msg_connected(msg))
                        return -TIPC_ERR_NO_PORT;
        } else {
-               rc = filter_connect(tsk, &buf);
-               if (rc != TIPC_OK || buf == NULL)
+               rc = filter_connect(tsk, skb);
+               if (rc != TIPC_OK || !*skb)
                        return rc;
        }
 
        /* Reject message if there isn't room to queue it */
-       if (sk_rmem_alloc_get(sk) + buf->truesize >= limit)
+       if (sk_rmem_alloc_get(sk) + (*skb)->truesize >= limit)
                return -TIPC_ERR_OVERLOAD;
 
        /* Enqueue message */
-       TIPC_SKB_CB(buf)->handle = NULL;
-       __skb_queue_tail(&sk->sk_receive_queue, buf);
-       skb_set_owner_r(buf, sk);
+       TIPC_SKB_CB(*skb)->handle = NULL;
+       __skb_queue_tail(&sk->sk_receive_queue, *skb);
+       skb_set_owner_r(*skb, sk);
 
        sk->sk_data_ready(sk);
+       *skb = NULL;
        return TIPC_OK;
 }
 
@@ -1726,79 +1753,125 @@ static int filter_rcv(struct sock *sk, struct sk_buff *buf)
  * @sk: socket
  * @skb: message
  *
- * Caller must hold socket lock, but not port lock.
+ * Caller must hold socket lock
  *
  * Returns 0
  */
 static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb)
 {
-       int rc;
-       u32 onode;
+       int err;
+       atomic_t *dcnt;
+       u32 dnode;
        struct tipc_sock *tsk = tipc_sk(sk);
        struct net *net = sock_net(sk);
        uint truesize = skb->truesize;
 
-       rc = filter_rcv(sk, skb);
-
-       if (likely(!rc)) {
-               if (atomic_read(&tsk->dupl_rcvcnt) < TIPC_CONN_OVERLOAD_LIMIT)
-                       atomic_add(truesize, &tsk->dupl_rcvcnt);
+       err = filter_rcv(sk, &skb);
+       if (likely(!skb)) {
+               dcnt = &tsk->dupl_rcvcnt;
+               if (atomic_read(dcnt) < TIPC_CONN_OVERLOAD_LIMIT)
+                       atomic_add(truesize, dcnt);
                return 0;
        }
-
-       if ((rc < 0) && !tipc_msg_reverse(net, skb, &onode, -rc))
-               return 0;
-
-       tipc_link_xmit_skb(net, skb, onode, 0);
-
+       if (!err || tipc_msg_reverse(tsk_own_node(tsk), skb, &dnode, -err))
+               tipc_link_xmit_skb(net, skb, dnode, tsk->portid);
        return 0;
 }
 
 /**
- * tipc_sk_rcv - handle incoming message
- * @skb: buffer containing arriving message
- * Consumes buffer
- * Returns 0 if success, or errno: -EHOSTUNREACH
+ * tipc_sk_enqueue - extract all buffers with destination 'dport' from
+ *                   inputq and try adding them to socket or backlog queue
+ * @inputq: list of incoming buffers with potentially different destinations
+ * @sk: socket where the buffers should be enqueued
+ * @dport: port number for the socket
+ * @_skb: returned buffer to be forwarded or rejected, if applicable
+ *
+ * Caller must hold socket lock
+ *
+ * Returns TIPC_OK if all buffers enqueued, otherwise -TIPC_ERR_OVERLOAD
+ * or -TIPC_ERR_NO_PORT
  */
-int tipc_sk_rcv(struct net *net, struct sk_buff *skb)
+static int tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk,
+                          u32 dport, struct sk_buff **_skb)
 {
-       struct tipc_sock *tsk;
-       struct sock *sk;
-       u32 dport = msg_destport(buf_msg(skb));
-       int rc = TIPC_OK;
-       uint limit;
-       u32 dnode;
+       unsigned int lim;
+       atomic_t *dcnt;
+       int err;
+       struct sk_buff *skb;
+       unsigned long time_limit = jiffies + 2;
 
-       /* Validate destination and message */
-       tsk = tipc_sk_lookup(net, dport);
-       if (unlikely(!tsk)) {
-               rc = tipc_msg_eval(net, skb, &dnode);
-               goto exit;
+       while (skb_queue_len(inputq)) {
+               if (unlikely(time_after_eq(jiffies, time_limit)))
+                       return TIPC_OK;
+               skb = tipc_skb_dequeue(inputq, dport);
+               if (unlikely(!skb))
+                       return TIPC_OK;
+               if (!sock_owned_by_user(sk)) {
+                       err = filter_rcv(sk, &skb);
+                       if (likely(!skb))
+                               continue;
+                       *_skb = skb;
+                       return err;
+               }
+               dcnt = &tipc_sk(sk)->dupl_rcvcnt;
+               if (sk->sk_backlog.len)
+                       atomic_set(dcnt, 0);
+               lim = rcvbuf_limit(sk, skb) + atomic_read(dcnt);
+               if (likely(!sk_add_backlog(sk, skb, lim)))
+                       continue;
+               *_skb = skb;
+               return -TIPC_ERR_OVERLOAD;
        }
-       sk = &tsk->sk;
+       return TIPC_OK;
+}
 
-       /* Queue message */
-       spin_lock_bh(&sk->sk_lock.slock);
+/**
+ * tipc_sk_rcv - handle a chain of incoming buffers
+ * @inputq: buffer list containing the buffers
+ * Consumes all buffers in list until inputq is empty
+ * Note: may be called in multiple threads referring to the same queue
+ * Returns 0 if last buffer was accepted, otherwise -EHOSTUNREACH
+ * Only node local calls check the return value, sending single-buffer queues
+ */
+int tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq)
+{
+       u32 dnode, dport = 0;
+       int err = -TIPC_ERR_NO_PORT;
+       struct sk_buff *skb;
+       struct tipc_sock *tsk;
+       struct tipc_net *tn;
+       struct sock *sk;
 
-       if (!sock_owned_by_user(sk)) {
-               rc = filter_rcv(sk, skb);
-       } else {
-               if (sk->sk_backlog.len == 0)
-                       atomic_set(&tsk->dupl_rcvcnt, 0);
-               limit = rcvbuf_limit(sk, skb) + atomic_read(&tsk->dupl_rcvcnt);
-               if (sk_add_backlog(sk, skb, limit))
-                       rc = -TIPC_ERR_OVERLOAD;
+       while (skb_queue_len(inputq)) {
+               skb = NULL;
+               dport = tipc_skb_peek_port(inputq, dport);
+               tsk = tipc_sk_lookup(net, dport);
+               if (likely(tsk)) {
+                       sk = &tsk->sk;
+                       if (likely(spin_trylock_bh(&sk->sk_lock.slock))) {
+                               err = tipc_sk_enqueue(inputq, sk, dport, &skb);
+                               spin_unlock_bh(&sk->sk_lock.slock);
+                               dport = 0;
+                       }
+                       sock_put(sk);
+               } else {
+                       skb = tipc_skb_dequeue(inputq, dport);
+               }
+               if (likely(!skb))
+                       continue;
+               if (tipc_msg_lookup_dest(net, skb, &dnode, &err))
+                       goto xmit;
+               if (!err) {
+                       dnode = msg_destnode(buf_msg(skb));
+                       goto xmit;
+               }
+               tn = net_generic(net, tipc_net_id);
+               if (!tipc_msg_reverse(tn->own_addr, skb, &dnode, -err))
+                       continue;
+xmit:
+               tipc_link_xmit_skb(net, skb, dnode, dport);
        }
-       spin_unlock_bh(&sk->sk_lock.slock);
-       sock_put(sk);
-       if (likely(!rc))
-               return 0;
-exit:
-       if ((rc < 0) && !tipc_msg_reverse(net, skb, &dnode, -rc))
-               return -EHOSTUNREACH;
-
-       tipc_link_xmit_skb(net, skb, dnode, 0);
-       return (rc < 0) ? -EHOSTUNREACH : 0;
+       return err ? -EHOSTUNREACH : 0;
 }
 
 static int tipc_wait_for_connect(struct socket *sock, long *timeo_p)
@@ -2055,7 +2128,6 @@ static int tipc_shutdown(struct socket *sock, int how)
 {
        struct sock *sk = sock->sk;
        struct net *net = sock_net(sk);
-       struct tipc_net *tn = net_generic(net, tipc_net_id);
        struct tipc_sock *tsk = tipc_sk(sk);
        struct sk_buff *skb;
        u32 dnode;
@@ -2078,16 +2150,17 @@ restart:
                                kfree_skb(skb);
                                goto restart;
                        }
-                       if (tipc_msg_reverse(net, skb, &dnode,
+                       if (tipc_msg_reverse(tsk_own_node(tsk), skb, &dnode,
                                             TIPC_CONN_SHUTDOWN))
                                tipc_link_xmit_skb(net, skb, dnode,
                                                   tsk->portid);
                        tipc_node_remove_conn(net, dnode, tsk->portid);
                } else {
                        dnode = tsk_peer_node(tsk);
-                       skb = tipc_msg_create(net, TIPC_CRITICAL_IMPORTANCE,
+
+                       skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE,
                                              TIPC_CONN_MSG, SHORT_H_SIZE,
-                                             0, dnode, tn->own_addr,
+                                             0, dnode, tsk_own_node(tsk),
                                              tsk_peer_port(tsk),
                                              tsk->portid, TIPC_CONN_SHUTDOWN);
                        tipc_link_xmit_skb(net, skb, dnode, tsk->portid);
@@ -2119,10 +2192,9 @@ static void tipc_sk_timeout(unsigned long data)
 {
        struct tipc_sock *tsk = (struct tipc_sock *)data;
        struct sock *sk = &tsk->sk;
-       struct net *net = sock_net(sk);
-       struct tipc_net *tn = net_generic(net, tipc_net_id);
        struct sk_buff *skb = NULL;
        u32 peer_port, peer_node;
+       u32 own_node = tsk_own_node(tsk);
 
        bh_lock_sock(sk);
        if (!tsk->connected) {
@@ -2134,13 +2206,13 @@ static void tipc_sk_timeout(unsigned long data)
 
        if (tsk->probing_state == TIPC_CONN_PROBING) {
                /* Previous probe not answered -> self abort */
-               skb = tipc_msg_create(net, TIPC_CRITICAL_IMPORTANCE,
+               skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE,
                                      TIPC_CONN_MSG, SHORT_H_SIZE, 0,
-                                     tn->own_addr, peer_node, tsk->portid,
+                                     own_node, peer_node, tsk->portid,
                                      peer_port, TIPC_ERR_NO_PORT);
        } else {
-               skb = tipc_msg_create(net, CONN_MANAGER, CONN_PROBE, INT_H_SIZE,
-                                     0, peer_node, tn->own_addr,
+               skb = tipc_msg_create(CONN_MANAGER, CONN_PROBE,
+                                     INT_H_SIZE, 0, peer_node, own_node,
                                      peer_port, tsk->portid, TIPC_OK);
                tsk->probing_state = TIPC_CONN_PROBING;
                sk_reset_timer(sk, &sk->sk_timer, jiffies + tsk->probing_intv);
@@ -2208,91 +2280,6 @@ static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope,
        return rc;
 }
 
-static int tipc_sk_show(struct tipc_sock *tsk, char *buf,
-                       int len, int full_id)
-{
-       struct net *net = sock_net(&tsk->sk);
-       struct tipc_net *tn = net_generic(net, tipc_net_id);
-       struct publication *publ;
-       int ret;
-
-       if (full_id)
-               ret = tipc_snprintf(buf, len, "<%u.%u.%u:%u>:",
-                                   tipc_zone(tn->own_addr),
-                                   tipc_cluster(tn->own_addr),
-                                   tipc_node(tn->own_addr), tsk->portid);
-       else
-               ret = tipc_snprintf(buf, len, "%-10u:", tsk->portid);
-
-       if (tsk->connected) {
-               u32 dport = tsk_peer_port(tsk);
-               u32 destnode = tsk_peer_node(tsk);
-
-               ret += tipc_snprintf(buf + ret, len - ret,
-                                    " connected to <%u.%u.%u:%u>",
-                                    tipc_zone(destnode),
-                                    tipc_cluster(destnode),
-                                    tipc_node(destnode), dport);
-               if (tsk->conn_type != 0)
-                       ret += tipc_snprintf(buf + ret, len - ret,
-                                            " via {%u,%u}", tsk->conn_type,
-                                            tsk->conn_instance);
-       } else if (tsk->published) {
-               ret += tipc_snprintf(buf + ret, len - ret, " bound to");
-               list_for_each_entry(publ, &tsk->publications, pport_list) {
-                       if (publ->lower == publ->upper)
-                               ret += tipc_snprintf(buf + ret, len - ret,
-                                                    " {%u,%u}", publ->type,
-                                                    publ->lower);
-                       else
-                               ret += tipc_snprintf(buf + ret, len - ret,
-                                                    " {%u,%u,%u}", publ->type,
-                                                    publ->lower, publ->upper);
-               }
-       }
-       ret += tipc_snprintf(buf + ret, len - ret, "\n");
-       return ret;
-}
-
-struct sk_buff *tipc_sk_socks_show(struct net *net)
-{
-       struct tipc_net *tn = net_generic(net, tipc_net_id);
-       const struct bucket_table *tbl;
-       struct rhash_head *pos;
-       struct sk_buff *buf;
-       struct tlv_desc *rep_tlv;
-       char *pb;
-       int pb_len;
-       struct tipc_sock *tsk;
-       int str_len = 0;
-       int i;
-
-       buf = tipc_cfg_reply_alloc(TLV_SPACE(ULTRA_STRING_MAX_LEN));
-       if (!buf)
-               return NULL;
-       rep_tlv = (struct tlv_desc *)buf->data;
-       pb = TLV_DATA(rep_tlv);
-       pb_len = ULTRA_STRING_MAX_LEN;
-
-       rcu_read_lock();
-       tbl = rht_dereference_rcu((&tn->sk_rht)->tbl, &tn->sk_rht);
-       for (i = 0; i < tbl->size; i++) {
-               rht_for_each_entry_rcu(tsk, pos, tbl, i, node) {
-                       spin_lock_bh(&tsk->sk.sk_lock.slock);
-                       str_len += tipc_sk_show(tsk, pb + str_len,
-                                               pb_len - str_len, 0);
-                       spin_unlock_bh(&tsk->sk.sk_lock.slock);
-               }
-       }
-       rcu_read_unlock();
-
-       str_len += 1;   /* for "\0" */
-       skb_put(buf, TLV_SPACE(str_len));
-       TLV_SET(rep_tlv, TIPC_TLV_ULTRA_STRING, NULL, str_len);
-
-       return buf;
-}
-
 /* tipc_sk_reinit: set non-zero address in all existing sockets
  *                 when we go from standalone to network mode.
  */
@@ -2710,7 +2697,7 @@ static int __tipc_nl_add_sk(struct sk_buff *skb, struct netlink_callback *cb,
        struct tipc_net *tn = net_generic(net, tipc_net_id);
 
        hdr = genlmsg_put(skb, NETLINK_CB(cb->skb).portid, cb->nlh->nlmsg_seq,
-                         &tipc_genl_v2_family, NLM_F_MULTI, TIPC_NL_SOCK_GET);
+                         &tipc_genl_family, NLM_F_MULTI, TIPC_NL_SOCK_GET);
        if (!hdr)
                goto msg_cancel;
 
@@ -2791,7 +2778,7 @@ static int __tipc_nl_add_sk_publ(struct sk_buff *skb,
        struct nlattr *attrs;
 
        hdr = genlmsg_put(skb, NETLINK_CB(cb->skb).portid, cb->nlh->nlmsg_seq,
-                         &tipc_genl_v2_family, NLM_F_MULTI, TIPC_NL_PUBL_GET);
+                         &tipc_genl_family, NLM_F_MULTI, TIPC_NL_PUBL_GET);
        if (!hdr)
                goto msg_cancel;