tipc: make media xmit call outside node spinlock context
[cascardo/linux.git] / net / tipc / socket.c
1 /*
2  * net/tipc/socket.c: TIPC socket API
3  *
4  * Copyright (c) 2001-2007, 2012-2015, Ericsson AB
5  * Copyright (c) 2004-2008, 2010-2013, Wind River Systems
6  * All rights reserved.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in the
15  *    documentation and/or other materials provided with the distribution.
16  * 3. Neither the names of the copyright holders nor the names of its
17  *    contributors may be used to endorse or promote products derived from
18  *    this software without specific prior written permission.
19  *
20  * Alternatively, this software may be distributed under the terms of the
21  * GNU General Public License ("GPL") version 2 as published by the Free
22  * Software Foundation.
23  *
24  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
25  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
26  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
27  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
28  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
29  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
30  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
31  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
32  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
33  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
34  * POSSIBILITY OF SUCH DAMAGE.
35  */
36
37 #include <linux/rhashtable.h>
38 #include "core.h"
39 #include "name_table.h"
40 #include "node.h"
41 #include "link.h"
42 #include "name_distr.h"
43 #include "socket.h"
44 #include "bcast.h"
45
46 #define SS_LISTENING            -1      /* socket is listening */
47 #define SS_READY                -2      /* socket is connectionless */
48
49 #define CONN_TIMEOUT_DEFAULT    8000    /* default connect timeout = 8s */
50 #define CONN_PROBING_INTERVAL   msecs_to_jiffies(3600000)  /* [ms] => 1 h */
51 #define TIPC_FWD_MSG            1
52 #define TIPC_CONN_OK            0
53 #define TIPC_CONN_PROBING       1
54 #define TIPC_MAX_PORT           0xffffffff
55 #define TIPC_MIN_PORT           1
56
57 /**
58  * struct tipc_sock - TIPC socket structure
59  * @sk: socket - interacts with 'port' and with user via the socket API
60  * @connected: non-zero if port is currently connected to a peer port
61  * @conn_type: TIPC type used when connection was established
62  * @conn_instance: TIPC instance used when connection was established
63  * @published: non-zero if port has one or more associated names
64  * @max_pkt: maximum packet size "hint" used when building messages sent by port
65  * @portid: unique port identity in TIPC socket hash table
66  * @phdr: preformatted message header used when sending messages
67  * @port_list: adjacent ports in TIPC's global list of ports
68  * @publications: list of publications for port
69  * @pub_count: total # of publications port has made during its lifetime
70  * @probing_state:
71  * @probing_intv:
72  * @conn_timeout: the time we can wait for an unresponded setup request
73  * @dupl_rcvcnt: number of bytes counted twice, in both backlog and rcv queue
74  * @link_cong: non-zero if owner must sleep because of link congestion
75  * @sent_unacked: # messages sent by socket, and not yet acked by peer
76  * @rcv_unacked: # messages read by user, but not yet acked back to peer
77  * @remote: 'connected' peer for dgram/rdm
78  * @node: hash table node
79  * @rcu: rcu struct for tipc_sock
80  */
81 struct tipc_sock {
82         struct sock sk;
83         int connected;
84         u32 conn_type;
85         u32 conn_instance;
86         int published;
87         u32 max_pkt;
88         u32 portid;
89         struct tipc_msg phdr;
90         struct list_head sock_list;
91         struct list_head publications;
92         u32 pub_count;
93         u32 probing_state;
94         unsigned long probing_intv;
95         uint conn_timeout;
96         atomic_t dupl_rcvcnt;
97         bool link_cong;
98         uint sent_unacked;
99         uint rcv_unacked;
100         struct sockaddr_tipc remote;
101         struct rhash_head node;
102         struct rcu_head rcu;
103 };
104
105 static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb);
106 static void tipc_data_ready(struct sock *sk);
107 static void tipc_write_space(struct sock *sk);
108 static int tipc_release(struct socket *sock);
109 static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags);
110 static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p);
111 static void tipc_sk_timeout(unsigned long data);
112 static int tipc_sk_publish(struct tipc_sock *tsk, uint scope,
113                            struct tipc_name_seq const *seq);
114 static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope,
115                             struct tipc_name_seq const *seq);
116 static struct tipc_sock *tipc_sk_lookup(struct net *net, u32 portid);
117 static int tipc_sk_insert(struct tipc_sock *tsk);
118 static void tipc_sk_remove(struct tipc_sock *tsk);
119 static int __tipc_send_stream(struct socket *sock, struct msghdr *m,
120                               size_t dsz);
121 static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz);
122
123 static const struct proto_ops packet_ops;
124 static const struct proto_ops stream_ops;
125 static const struct proto_ops msg_ops;
126 static struct proto tipc_proto;
127
128 static const struct nla_policy tipc_nl_sock_policy[TIPC_NLA_SOCK_MAX + 1] = {
129         [TIPC_NLA_SOCK_UNSPEC]          = { .type = NLA_UNSPEC },
130         [TIPC_NLA_SOCK_ADDR]            = { .type = NLA_U32 },
131         [TIPC_NLA_SOCK_REF]             = { .type = NLA_U32 },
132         [TIPC_NLA_SOCK_CON]             = { .type = NLA_NESTED },
133         [TIPC_NLA_SOCK_HAS_PUBL]        = { .type = NLA_FLAG }
134 };
135
136 static const struct rhashtable_params tsk_rht_params;
137
138 /*
139  * Revised TIPC socket locking policy:
140  *
141  * Most socket operations take the standard socket lock when they start
142  * and hold it until they finish (or until they need to sleep).  Acquiring
143  * this lock grants the owner exclusive access to the fields of the socket
144  * data structures, with the exception of the backlog queue.  A few socket
145  * operations can be done without taking the socket lock because they only
146  * read socket information that never changes during the life of the socket.
147  *
148  * Socket operations may acquire the lock for the associated TIPC port if they
149  * need to perform an operation on the port.  If any routine needs to acquire
150  * both the socket lock and the port lock it must take the socket lock first
151  * to avoid the risk of deadlock.
152  *
153  * The dispatcher handling incoming messages cannot grab the socket lock in
154  * the standard fashion, since invoked it runs at the BH level and cannot block.
155  * Instead, it checks to see if the socket lock is currently owned by someone,
156  * and either handles the message itself or adds it to the socket's backlog
157  * queue; in the latter case the queued message is processed once the process
158  * owning the socket lock releases it.
159  *
160  * NOTE: Releasing the socket lock while an operation is sleeping overcomes
161  * the problem of a blocked socket operation preventing any other operations
162  * from occurring.  However, applications must be careful if they have
163  * multiple threads trying to send (or receive) on the same socket, as these
164  * operations might interfere with each other.  For example, doing a connect
165  * and a receive at the same time might allow the receive to consume the
166  * ACK message meant for the connect.  While additional work could be done
167  * to try and overcome this, it doesn't seem to be worthwhile at the present.
168  *
169  * NOTE: Releasing the socket lock while an operation is sleeping also ensures
170  * that another operation that must be performed in a non-blocking manner is
171  * not delayed for very long because the lock has already been taken.
172  *
173  * NOTE: This code assumes that certain fields of a port/socket pair are
174  * constant over its lifetime; such fields can be examined without taking
175  * the socket lock and/or port lock, and do not need to be re-read even
176  * after resuming processing after waiting.  These fields include:
177  *   - socket type
178  *   - pointer to socket sk structure (aka tipc_sock structure)
179  *   - pointer to port structure
180  *   - port reference
181  */
182
183 static u32 tsk_own_node(struct tipc_sock *tsk)
184 {
185         return msg_prevnode(&tsk->phdr);
186 }
187
188 static u32 tsk_peer_node(struct tipc_sock *tsk)
189 {
190         return msg_destnode(&tsk->phdr);
191 }
192
193 static u32 tsk_peer_port(struct tipc_sock *tsk)
194 {
195         return msg_destport(&tsk->phdr);
196 }
197
198 static  bool tsk_unreliable(struct tipc_sock *tsk)
199 {
200         return msg_src_droppable(&tsk->phdr) != 0;
201 }
202
203 static void tsk_set_unreliable(struct tipc_sock *tsk, bool unreliable)
204 {
205         msg_set_src_droppable(&tsk->phdr, unreliable ? 1 : 0);
206 }
207
208 static bool tsk_unreturnable(struct tipc_sock *tsk)
209 {
210         return msg_dest_droppable(&tsk->phdr) != 0;
211 }
212
213 static void tsk_set_unreturnable(struct tipc_sock *tsk, bool unreturnable)
214 {
215         msg_set_dest_droppable(&tsk->phdr, unreturnable ? 1 : 0);
216 }
217
218 static int tsk_importance(struct tipc_sock *tsk)
219 {
220         return msg_importance(&tsk->phdr);
221 }
222
223 static int tsk_set_importance(struct tipc_sock *tsk, int imp)
224 {
225         if (imp > TIPC_CRITICAL_IMPORTANCE)
226                 return -EINVAL;
227         msg_set_importance(&tsk->phdr, (u32)imp);
228         return 0;
229 }
230
231 static struct tipc_sock *tipc_sk(const struct sock *sk)
232 {
233         return container_of(sk, struct tipc_sock, sk);
234 }
235
236 static int tsk_conn_cong(struct tipc_sock *tsk)
237 {
238         return tsk->sent_unacked >= TIPC_FLOWCTRL_WIN;
239 }
240
241 /**
242  * tsk_advance_rx_queue - discard first buffer in socket receive queue
243  *
244  * Caller must hold socket lock
245  */
246 static void tsk_advance_rx_queue(struct sock *sk)
247 {
248         kfree_skb(__skb_dequeue(&sk->sk_receive_queue));
249 }
250
251 /**
252  * tsk_rej_rx_queue - reject all buffers in socket receive queue
253  *
254  * Caller must hold socket lock
255  */
256 static void tsk_rej_rx_queue(struct sock *sk)
257 {
258         struct sk_buff *skb;
259         u32 dnode;
260         u32 own_node = tsk_own_node(tipc_sk(sk));
261
262         while ((skb = __skb_dequeue(&sk->sk_receive_queue))) {
263                 if (tipc_msg_reverse(own_node, skb, &dnode, TIPC_ERR_NO_PORT))
264                         tipc_node_xmit_skb(sock_net(sk), skb, dnode, 0);
265         }
266 }
267
268 /* tsk_peer_msg - verify if message was sent by connected port's peer
269  *
270  * Handles cases where the node's network address has changed from
271  * the default of <0.0.0> to its configured setting.
272  */
273 static bool tsk_peer_msg(struct tipc_sock *tsk, struct tipc_msg *msg)
274 {
275         struct tipc_net *tn = net_generic(sock_net(&tsk->sk), tipc_net_id);
276         u32 peer_port = tsk_peer_port(tsk);
277         u32 orig_node;
278         u32 peer_node;
279
280         if (unlikely(!tsk->connected))
281                 return false;
282
283         if (unlikely(msg_origport(msg) != peer_port))
284                 return false;
285
286         orig_node = msg_orignode(msg);
287         peer_node = tsk_peer_node(tsk);
288
289         if (likely(orig_node == peer_node))
290                 return true;
291
292         if (!orig_node && (peer_node == tn->own_addr))
293                 return true;
294
295         if (!peer_node && (orig_node == tn->own_addr))
296                 return true;
297
298         return false;
299 }
300
301 /**
302  * tipc_sk_create - create a TIPC socket
303  * @net: network namespace (must be default network)
304  * @sock: pre-allocated socket structure
305  * @protocol: protocol indicator (must be 0)
306  * @kern: caused by kernel or by userspace?
307  *
308  * This routine creates additional data structures used by the TIPC socket,
309  * initializes them, and links them together.
310  *
311  * Returns 0 on success, errno otherwise
312  */
313 static int tipc_sk_create(struct net *net, struct socket *sock,
314                           int protocol, int kern)
315 {
316         struct tipc_net *tn;
317         const struct proto_ops *ops;
318         socket_state state;
319         struct sock *sk;
320         struct tipc_sock *tsk;
321         struct tipc_msg *msg;
322
323         /* Validate arguments */
324         if (unlikely(protocol != 0))
325                 return -EPROTONOSUPPORT;
326
327         switch (sock->type) {
328         case SOCK_STREAM:
329                 ops = &stream_ops;
330                 state = SS_UNCONNECTED;
331                 break;
332         case SOCK_SEQPACKET:
333                 ops = &packet_ops;
334                 state = SS_UNCONNECTED;
335                 break;
336         case SOCK_DGRAM:
337         case SOCK_RDM:
338                 ops = &msg_ops;
339                 state = SS_READY;
340                 break;
341         default:
342                 return -EPROTOTYPE;
343         }
344
345         /* Allocate socket's protocol area */
346         sk = sk_alloc(net, AF_TIPC, GFP_KERNEL, &tipc_proto, kern);
347         if (sk == NULL)
348                 return -ENOMEM;
349
350         tsk = tipc_sk(sk);
351         tsk->max_pkt = MAX_PKT_DEFAULT;
352         INIT_LIST_HEAD(&tsk->publications);
353         msg = &tsk->phdr;
354         tn = net_generic(sock_net(sk), tipc_net_id);
355         tipc_msg_init(tn->own_addr, msg, TIPC_LOW_IMPORTANCE, TIPC_NAMED_MSG,
356                       NAMED_H_SIZE, 0);
357
358         /* Finish initializing socket data structures */
359         sock->ops = ops;
360         sock->state = state;
361         sock_init_data(sock, sk);
362         if (tipc_sk_insert(tsk)) {
363                 pr_warn("Socket create failed; port numbrer exhausted\n");
364                 return -EINVAL;
365         }
366         msg_set_origport(msg, tsk->portid);
367         setup_timer(&sk->sk_timer, tipc_sk_timeout, (unsigned long)tsk);
368         sk->sk_backlog_rcv = tipc_backlog_rcv;
369         sk->sk_rcvbuf = sysctl_tipc_rmem[1];
370         sk->sk_data_ready = tipc_data_ready;
371         sk->sk_write_space = tipc_write_space;
372         tsk->conn_timeout = CONN_TIMEOUT_DEFAULT;
373         tsk->sent_unacked = 0;
374         atomic_set(&tsk->dupl_rcvcnt, 0);
375
376         if (sock->state == SS_READY) {
377                 tsk_set_unreturnable(tsk, true);
378                 if (sock->type == SOCK_DGRAM)
379                         tsk_set_unreliable(tsk, true);
380         }
381         return 0;
382 }
383
384 static void tipc_sk_callback(struct rcu_head *head)
385 {
386         struct tipc_sock *tsk = container_of(head, struct tipc_sock, rcu);
387
388         sock_put(&tsk->sk);
389 }
390
391 /**
392  * tipc_release - destroy a TIPC socket
393  * @sock: socket to destroy
394  *
395  * This routine cleans up any messages that are still queued on the socket.
396  * For DGRAM and RDM socket types, all queued messages are rejected.
397  * For SEQPACKET and STREAM socket types, the first message is rejected
398  * and any others are discarded.  (If the first message on a STREAM socket
399  * is partially-read, it is discarded and the next one is rejected instead.)
400  *
401  * NOTE: Rejected messages are not necessarily returned to the sender!  They
402  * are returned or discarded according to the "destination droppable" setting
403  * specified for the message by the sender.
404  *
405  * Returns 0 on success, errno otherwise
406  */
407 static int tipc_release(struct socket *sock)
408 {
409         struct sock *sk = sock->sk;
410         struct net *net;
411         struct tipc_sock *tsk;
412         struct sk_buff *skb;
413         u32 dnode;
414
415         /*
416          * Exit if socket isn't fully initialized (occurs when a failed accept()
417          * releases a pre-allocated child socket that was never used)
418          */
419         if (sk == NULL)
420                 return 0;
421
422         net = sock_net(sk);
423         tsk = tipc_sk(sk);
424         lock_sock(sk);
425
426         /*
427          * Reject all unreceived messages, except on an active connection
428          * (which disconnects locally & sends a 'FIN+' to peer)
429          */
430         dnode = tsk_peer_node(tsk);
431         while (sock->state != SS_DISCONNECTING) {
432                 skb = __skb_dequeue(&sk->sk_receive_queue);
433                 if (skb == NULL)
434                         break;
435                 if (TIPC_SKB_CB(skb)->handle != NULL)
436                         kfree_skb(skb);
437                 else {
438                         if ((sock->state == SS_CONNECTING) ||
439                             (sock->state == SS_CONNECTED)) {
440                                 sock->state = SS_DISCONNECTING;
441                                 tsk->connected = 0;
442                                 tipc_node_remove_conn(net, dnode, tsk->portid);
443                         }
444                         if (tipc_msg_reverse(tsk_own_node(tsk), skb, &dnode,
445                                              TIPC_ERR_NO_PORT))
446                                 tipc_node_xmit_skb(net, skb, dnode, 0);
447                 }
448         }
449
450         tipc_sk_withdraw(tsk, 0, NULL);
451         sk_stop_timer(sk, &sk->sk_timer);
452         tipc_sk_remove(tsk);
453         if (tsk->connected) {
454                 skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE,
455                                       TIPC_CONN_MSG, SHORT_H_SIZE, 0, dnode,
456                                       tsk_own_node(tsk), tsk_peer_port(tsk),
457                                       tsk->portid, TIPC_ERR_NO_PORT);
458                 if (skb)
459                         tipc_node_xmit_skb(net, skb, dnode, tsk->portid);
460                 tipc_node_remove_conn(net, dnode, tsk->portid);
461         }
462
463         /* Discard any remaining (connection-based) messages in receive queue */
464         __skb_queue_purge(&sk->sk_receive_queue);
465
466         /* Reject any messages that accumulated in backlog queue */
467         sock->state = SS_DISCONNECTING;
468         release_sock(sk);
469
470         call_rcu(&tsk->rcu, tipc_sk_callback);
471         sock->sk = NULL;
472
473         return 0;
474 }
475
476 /**
477  * tipc_bind - associate or disassocate TIPC name(s) with a socket
478  * @sock: socket structure
479  * @uaddr: socket address describing name(s) and desired operation
480  * @uaddr_len: size of socket address data structure
481  *
482  * Name and name sequence binding is indicated using a positive scope value;
483  * a negative scope value unbinds the specified name.  Specifying no name
484  * (i.e. a socket address length of 0) unbinds all names from the socket.
485  *
486  * Returns 0 on success, errno otherwise
487  *
488  * NOTE: This routine doesn't need to take the socket lock since it doesn't
489  *       access any non-constant socket information.
490  */
491 static int tipc_bind(struct socket *sock, struct sockaddr *uaddr,
492                      int uaddr_len)
493 {
494         struct sock *sk = sock->sk;
495         struct sockaddr_tipc *addr = (struct sockaddr_tipc *)uaddr;
496         struct tipc_sock *tsk = tipc_sk(sk);
497         int res = -EINVAL;
498
499         lock_sock(sk);
500         if (unlikely(!uaddr_len)) {
501                 res = tipc_sk_withdraw(tsk, 0, NULL);
502                 goto exit;
503         }
504
505         if (uaddr_len < sizeof(struct sockaddr_tipc)) {
506                 res = -EINVAL;
507                 goto exit;
508         }
509         if (addr->family != AF_TIPC) {
510                 res = -EAFNOSUPPORT;
511                 goto exit;
512         }
513
514         if (addr->addrtype == TIPC_ADDR_NAME)
515                 addr->addr.nameseq.upper = addr->addr.nameseq.lower;
516         else if (addr->addrtype != TIPC_ADDR_NAMESEQ) {
517                 res = -EAFNOSUPPORT;
518                 goto exit;
519         }
520
521         if ((addr->addr.nameseq.type < TIPC_RESERVED_TYPES) &&
522             (addr->addr.nameseq.type != TIPC_TOP_SRV) &&
523             (addr->addr.nameseq.type != TIPC_CFG_SRV)) {
524                 res = -EACCES;
525                 goto exit;
526         }
527
528         res = (addr->scope > 0) ?
529                 tipc_sk_publish(tsk, addr->scope, &addr->addr.nameseq) :
530                 tipc_sk_withdraw(tsk, -addr->scope, &addr->addr.nameseq);
531 exit:
532         release_sock(sk);
533         return res;
534 }
535
536 /**
537  * tipc_getname - get port ID of socket or peer socket
538  * @sock: socket structure
539  * @uaddr: area for returned socket address
540  * @uaddr_len: area for returned length of socket address
541  * @peer: 0 = own ID, 1 = current peer ID, 2 = current/former peer ID
542  *
543  * Returns 0 on success, errno otherwise
544  *
545  * NOTE: This routine doesn't need to take the socket lock since it only
546  *       accesses socket information that is unchanging (or which changes in
547  *       a completely predictable manner).
548  */
549 static int tipc_getname(struct socket *sock, struct sockaddr *uaddr,
550                         int *uaddr_len, int peer)
551 {
552         struct sockaddr_tipc *addr = (struct sockaddr_tipc *)uaddr;
553         struct tipc_sock *tsk = tipc_sk(sock->sk);
554         struct tipc_net *tn = net_generic(sock_net(sock->sk), tipc_net_id);
555
556         memset(addr, 0, sizeof(*addr));
557         if (peer) {
558                 if ((sock->state != SS_CONNECTED) &&
559                         ((peer != 2) || (sock->state != SS_DISCONNECTING)))
560                         return -ENOTCONN;
561                 addr->addr.id.ref = tsk_peer_port(tsk);
562                 addr->addr.id.node = tsk_peer_node(tsk);
563         } else {
564                 addr->addr.id.ref = tsk->portid;
565                 addr->addr.id.node = tn->own_addr;
566         }
567
568         *uaddr_len = sizeof(*addr);
569         addr->addrtype = TIPC_ADDR_ID;
570         addr->family = AF_TIPC;
571         addr->scope = 0;
572         addr->addr.name.domain = 0;
573
574         return 0;
575 }
576
577 /**
578  * tipc_poll - read and possibly block on pollmask
579  * @file: file structure associated with the socket
580  * @sock: socket for which to calculate the poll bits
581  * @wait: ???
582  *
583  * Returns pollmask value
584  *
585  * COMMENTARY:
586  * It appears that the usual socket locking mechanisms are not useful here
587  * since the pollmask info is potentially out-of-date the moment this routine
588  * exits.  TCP and other protocols seem to rely on higher level poll routines
589  * to handle any preventable race conditions, so TIPC will do the same ...
590  *
591  * TIPC sets the returned events as follows:
592  *
593  * socket state         flags set
594  * ------------         ---------
595  * unconnected          no read flags
596  *                      POLLOUT if port is not congested
597  *
598  * connecting           POLLIN/POLLRDNORM if ACK/NACK in rx queue
599  *                      no write flags
600  *
601  * connected            POLLIN/POLLRDNORM if data in rx queue
602  *                      POLLOUT if port is not congested
603  *
604  * disconnecting        POLLIN/POLLRDNORM/POLLHUP
605  *                      no write flags
606  *
607  * listening            POLLIN if SYN in rx queue
608  *                      no write flags
609  *
610  * ready                POLLIN/POLLRDNORM if data in rx queue
611  * [connectionless]     POLLOUT (since port cannot be congested)
612  *
613  * IMPORTANT: The fact that a read or write operation is indicated does NOT
614  * imply that the operation will succeed, merely that it should be performed
615  * and will not block.
616  */
617 static unsigned int tipc_poll(struct file *file, struct socket *sock,
618                               poll_table *wait)
619 {
620         struct sock *sk = sock->sk;
621         struct tipc_sock *tsk = tipc_sk(sk);
622         u32 mask = 0;
623
624         sock_poll_wait(file, sk_sleep(sk), wait);
625
626         switch ((int)sock->state) {
627         case SS_UNCONNECTED:
628                 if (!tsk->link_cong)
629                         mask |= POLLOUT;
630                 break;
631         case SS_READY:
632         case SS_CONNECTED:
633                 if (!tsk->link_cong && !tsk_conn_cong(tsk))
634                         mask |= POLLOUT;
635                 /* fall thru' */
636         case SS_CONNECTING:
637         case SS_LISTENING:
638                 if (!skb_queue_empty(&sk->sk_receive_queue))
639                         mask |= (POLLIN | POLLRDNORM);
640                 break;
641         case SS_DISCONNECTING:
642                 mask = (POLLIN | POLLRDNORM | POLLHUP);
643                 break;
644         }
645
646         return mask;
647 }
648
649 /**
650  * tipc_sendmcast - send multicast message
651  * @sock: socket structure
652  * @seq: destination address
653  * @msg: message to send
654  * @dsz: total length of message data
655  * @timeo: timeout to wait for wakeup
656  *
657  * Called from function tipc_sendmsg(), which has done all sanity checks
658  * Returns the number of bytes sent on success, or errno
659  */
660 static int tipc_sendmcast(struct  socket *sock, struct tipc_name_seq *seq,
661                           struct msghdr *msg, size_t dsz, long timeo)
662 {
663         struct sock *sk = sock->sk;
664         struct tipc_sock *tsk = tipc_sk(sk);
665         struct net *net = sock_net(sk);
666         struct tipc_msg *mhdr = &tsk->phdr;
667         struct sk_buff_head *pktchain = &sk->sk_write_queue;
668         struct iov_iter save = msg->msg_iter;
669         uint mtu;
670         int rc;
671
672         msg_set_type(mhdr, TIPC_MCAST_MSG);
673         msg_set_lookup_scope(mhdr, TIPC_CLUSTER_SCOPE);
674         msg_set_destport(mhdr, 0);
675         msg_set_destnode(mhdr, 0);
676         msg_set_nametype(mhdr, seq->type);
677         msg_set_namelower(mhdr, seq->lower);
678         msg_set_nameupper(mhdr, seq->upper);
679         msg_set_hdr_sz(mhdr, MCAST_H_SIZE);
680
681 new_mtu:
682         mtu = tipc_bclink_get_mtu();
683         rc = tipc_msg_build(mhdr, msg, 0, dsz, mtu, pktchain);
684         if (unlikely(rc < 0))
685                 return rc;
686
687         do {
688                 rc = tipc_bclink_xmit(net, pktchain);
689                 if (likely(!rc))
690                         return dsz;
691
692                 if (rc == -ELINKCONG) {
693                         tsk->link_cong = 1;
694                         rc = tipc_wait_for_sndmsg(sock, &timeo);
695                         if (!rc)
696                                 continue;
697                 }
698                 __skb_queue_purge(pktchain);
699                 if (rc == -EMSGSIZE) {
700                         msg->msg_iter = save;
701                         goto new_mtu;
702                 }
703                 break;
704         } while (1);
705         return rc;
706 }
707
708 /**
709  * tipc_sk_mcast_rcv - Deliver multicast messages to all destination sockets
710  * @arrvq: queue with arriving messages, to be cloned after destination lookup
711  * @inputq: queue with cloned messages, delivered to socket after dest lookup
712  *
713  * Multi-threaded: parallel calls with reference to same queues may occur
714  */
715 void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
716                        struct sk_buff_head *inputq)
717 {
718         struct tipc_msg *msg;
719         struct tipc_plist dports;
720         u32 portid;
721         u32 scope = TIPC_CLUSTER_SCOPE;
722         struct sk_buff_head tmpq;
723         uint hsz;
724         struct sk_buff *skb, *_skb;
725
726         __skb_queue_head_init(&tmpq);
727         tipc_plist_init(&dports);
728
729         skb = tipc_skb_peek(arrvq, &inputq->lock);
730         for (; skb; skb = tipc_skb_peek(arrvq, &inputq->lock)) {
731                 msg = buf_msg(skb);
732                 hsz = skb_headroom(skb) + msg_hdr_sz(msg);
733
734                 if (in_own_node(net, msg_orignode(msg)))
735                         scope = TIPC_NODE_SCOPE;
736
737                 /* Create destination port list and message clones: */
738                 tipc_nametbl_mc_translate(net,
739                                           msg_nametype(msg), msg_namelower(msg),
740                                           msg_nameupper(msg), scope, &dports);
741                 portid = tipc_plist_pop(&dports);
742                 for (; portid; portid = tipc_plist_pop(&dports)) {
743                         _skb = __pskb_copy(skb, hsz, GFP_ATOMIC);
744                         if (_skb) {
745                                 msg_set_destport(buf_msg(_skb), portid);
746                                 __skb_queue_tail(&tmpq, _skb);
747                                 continue;
748                         }
749                         pr_warn("Failed to clone mcast rcv buffer\n");
750                 }
751                 /* Append to inputq if not already done by other thread */
752                 spin_lock_bh(&inputq->lock);
753                 if (skb_peek(arrvq) == skb) {
754                         skb_queue_splice_tail_init(&tmpq, inputq);
755                         kfree_skb(__skb_dequeue(arrvq));
756                 }
757                 spin_unlock_bh(&inputq->lock);
758                 __skb_queue_purge(&tmpq);
759                 kfree_skb(skb);
760         }
761         tipc_sk_rcv(net, inputq);
762 }
763
764 /**
765  * tipc_sk_proto_rcv - receive a connection mng protocol message
766  * @tsk: receiving socket
767  * @skb: pointer to message buffer. Set to NULL if buffer is consumed.
768  */
769 static void tipc_sk_proto_rcv(struct tipc_sock *tsk, struct sk_buff **skb)
770 {
771         struct tipc_msg *msg = buf_msg(*skb);
772         int conn_cong;
773         u32 dnode;
774         u32 own_node = tsk_own_node(tsk);
775         /* Ignore if connection cannot be validated: */
776         if (!tsk_peer_msg(tsk, msg))
777                 goto exit;
778
779         tsk->probing_state = TIPC_CONN_OK;
780
781         if (msg_type(msg) == CONN_ACK) {
782                 conn_cong = tsk_conn_cong(tsk);
783                 tsk->sent_unacked -= msg_msgcnt(msg);
784                 if (conn_cong)
785                         tsk->sk.sk_write_space(&tsk->sk);
786         } else if (msg_type(msg) == CONN_PROBE) {
787                 if (tipc_msg_reverse(own_node, *skb, &dnode, TIPC_OK)) {
788                         msg_set_type(msg, CONN_PROBE_REPLY);
789                         return;
790                 }
791         }
792         /* Do nothing if msg_type() == CONN_PROBE_REPLY */
793 exit:
794         kfree_skb(*skb);
795         *skb = NULL;
796 }
797
798 static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p)
799 {
800         struct sock *sk = sock->sk;
801         struct tipc_sock *tsk = tipc_sk(sk);
802         DEFINE_WAIT(wait);
803         int done;
804
805         do {
806                 int err = sock_error(sk);
807                 if (err)
808                         return err;
809                 if (sock->state == SS_DISCONNECTING)
810                         return -EPIPE;
811                 if (!*timeo_p)
812                         return -EAGAIN;
813                 if (signal_pending(current))
814                         return sock_intr_errno(*timeo_p);
815
816                 prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
817                 done = sk_wait_event(sk, timeo_p, !tsk->link_cong);
818                 finish_wait(sk_sleep(sk), &wait);
819         } while (!done);
820         return 0;
821 }
822
823 /**
824  * tipc_sendmsg - send message in connectionless manner
825  * @sock: socket structure
826  * @m: message to send
827  * @dsz: amount of user data to be sent
828  *
829  * Message must have an destination specified explicitly.
830  * Used for SOCK_RDM and SOCK_DGRAM messages,
831  * and for 'SYN' messages on SOCK_SEQPACKET and SOCK_STREAM connections.
832  * (Note: 'SYN+' is prohibited on SOCK_STREAM.)
833  *
834  * Returns the number of bytes sent on success, or errno otherwise
835  */
836 static int tipc_sendmsg(struct socket *sock,
837                         struct msghdr *m, size_t dsz)
838 {
839         struct sock *sk = sock->sk;
840         int ret;
841
842         lock_sock(sk);
843         ret = __tipc_sendmsg(sock, m, dsz);
844         release_sock(sk);
845
846         return ret;
847 }
848
849 static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz)
850 {
851         DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
852         struct sock *sk = sock->sk;
853         struct tipc_sock *tsk = tipc_sk(sk);
854         struct net *net = sock_net(sk);
855         struct tipc_msg *mhdr = &tsk->phdr;
856         u32 dnode, dport;
857         struct sk_buff_head *pktchain = &sk->sk_write_queue;
858         struct sk_buff *skb;
859         struct tipc_name_seq *seq;
860         struct iov_iter save;
861         u32 mtu;
862         long timeo;
863         int rc;
864
865         if (dsz > TIPC_MAX_USER_MSG_SIZE)
866                 return -EMSGSIZE;
867         if (unlikely(!dest)) {
868                 if (tsk->connected && sock->state == SS_READY)
869                         dest = &tsk->remote;
870                 else
871                         return -EDESTADDRREQ;
872         } else if (unlikely(m->msg_namelen < sizeof(*dest)) ||
873                    dest->family != AF_TIPC) {
874                 return -EINVAL;
875         }
876         if (unlikely(sock->state != SS_READY)) {
877                 if (sock->state == SS_LISTENING)
878                         return -EPIPE;
879                 if (sock->state != SS_UNCONNECTED)
880                         return -EISCONN;
881                 if (tsk->published)
882                         return -EOPNOTSUPP;
883                 if (dest->addrtype == TIPC_ADDR_NAME) {
884                         tsk->conn_type = dest->addr.name.name.type;
885                         tsk->conn_instance = dest->addr.name.name.instance;
886                 }
887         }
888         seq = &dest->addr.nameseq;
889         timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
890
891         if (dest->addrtype == TIPC_ADDR_MCAST) {
892                 return tipc_sendmcast(sock, seq, m, dsz, timeo);
893         } else if (dest->addrtype == TIPC_ADDR_NAME) {
894                 u32 type = dest->addr.name.name.type;
895                 u32 inst = dest->addr.name.name.instance;
896                 u32 domain = dest->addr.name.domain;
897
898                 dnode = domain;
899                 msg_set_type(mhdr, TIPC_NAMED_MSG);
900                 msg_set_hdr_sz(mhdr, NAMED_H_SIZE);
901                 msg_set_nametype(mhdr, type);
902                 msg_set_nameinst(mhdr, inst);
903                 msg_set_lookup_scope(mhdr, tipc_addr_scope(domain));
904                 dport = tipc_nametbl_translate(net, type, inst, &dnode);
905                 msg_set_destnode(mhdr, dnode);
906                 msg_set_destport(mhdr, dport);
907                 if (unlikely(!dport && !dnode))
908                         return -EHOSTUNREACH;
909         } else if (dest->addrtype == TIPC_ADDR_ID) {
910                 dnode = dest->addr.id.node;
911                 msg_set_type(mhdr, TIPC_DIRECT_MSG);
912                 msg_set_lookup_scope(mhdr, 0);
913                 msg_set_destnode(mhdr, dnode);
914                 msg_set_destport(mhdr, dest->addr.id.ref);
915                 msg_set_hdr_sz(mhdr, BASIC_H_SIZE);
916         }
917
918         save = m->msg_iter;
919 new_mtu:
920         mtu = tipc_node_get_mtu(net, dnode, tsk->portid);
921         rc = tipc_msg_build(mhdr, m, 0, dsz, mtu, pktchain);
922         if (rc < 0)
923                 return rc;
924
925         do {
926                 skb = skb_peek(pktchain);
927                 TIPC_SKB_CB(skb)->wakeup_pending = tsk->link_cong;
928                 rc = tipc_node_xmit(net, pktchain, dnode, tsk->portid);
929                 if (likely(!rc)) {
930                         if (sock->state != SS_READY)
931                                 sock->state = SS_CONNECTING;
932                         return dsz;
933                 }
934                 if (rc == -ELINKCONG) {
935                         tsk->link_cong = 1;
936                         rc = tipc_wait_for_sndmsg(sock, &timeo);
937                         if (!rc)
938                                 continue;
939                 }
940                 __skb_queue_purge(pktchain);
941                 if (rc == -EMSGSIZE) {
942                         m->msg_iter = save;
943                         goto new_mtu;
944                 }
945                 break;
946         } while (1);
947
948         return rc;
949 }
950
951 static int tipc_wait_for_sndpkt(struct socket *sock, long *timeo_p)
952 {
953         struct sock *sk = sock->sk;
954         struct tipc_sock *tsk = tipc_sk(sk);
955         DEFINE_WAIT(wait);
956         int done;
957
958         do {
959                 int err = sock_error(sk);
960                 if (err)
961                         return err;
962                 if (sock->state == SS_DISCONNECTING)
963                         return -EPIPE;
964                 else if (sock->state != SS_CONNECTED)
965                         return -ENOTCONN;
966                 if (!*timeo_p)
967                         return -EAGAIN;
968                 if (signal_pending(current))
969                         return sock_intr_errno(*timeo_p);
970
971                 prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
972                 done = sk_wait_event(sk, timeo_p,
973                                      (!tsk->link_cong &&
974                                       !tsk_conn_cong(tsk)) ||
975                                      !tsk->connected);
976                 finish_wait(sk_sleep(sk), &wait);
977         } while (!done);
978         return 0;
979 }
980
981 /**
982  * tipc_send_stream - send stream-oriented data
983  * @sock: socket structure
984  * @m: data to send
985  * @dsz: total length of data to be transmitted
986  *
987  * Used for SOCK_STREAM data.
988  *
989  * Returns the number of bytes sent on success (or partial success),
990  * or errno if no data sent
991  */
992 static int tipc_send_stream(struct socket *sock, struct msghdr *m, size_t dsz)
993 {
994         struct sock *sk = sock->sk;
995         int ret;
996
997         lock_sock(sk);
998         ret = __tipc_send_stream(sock, m, dsz);
999         release_sock(sk);
1000
1001         return ret;
1002 }
1003
1004 static int __tipc_send_stream(struct socket *sock, struct msghdr *m, size_t dsz)
1005 {
1006         struct sock *sk = sock->sk;
1007         struct net *net = sock_net(sk);
1008         struct tipc_sock *tsk = tipc_sk(sk);
1009         struct tipc_msg *mhdr = &tsk->phdr;
1010         struct sk_buff_head *pktchain = &sk->sk_write_queue;
1011         DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
1012         u32 portid = tsk->portid;
1013         int rc = -EINVAL;
1014         long timeo;
1015         u32 dnode;
1016         uint mtu, send, sent = 0;
1017         struct iov_iter save;
1018
1019         /* Handle implied connection establishment */
1020         if (unlikely(dest)) {
1021                 rc = __tipc_sendmsg(sock, m, dsz);
1022                 if (dsz && (dsz == rc))
1023                         tsk->sent_unacked = 1;
1024                 return rc;
1025         }
1026         if (dsz > (uint)INT_MAX)
1027                 return -EMSGSIZE;
1028
1029         if (unlikely(sock->state != SS_CONNECTED)) {
1030                 if (sock->state == SS_DISCONNECTING)
1031                         return -EPIPE;
1032                 else
1033                         return -ENOTCONN;
1034         }
1035
1036         timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
1037         dnode = tsk_peer_node(tsk);
1038
1039 next:
1040         save = m->msg_iter;
1041         mtu = tsk->max_pkt;
1042         send = min_t(uint, dsz - sent, TIPC_MAX_USER_MSG_SIZE);
1043         rc = tipc_msg_build(mhdr, m, sent, send, mtu, pktchain);
1044         if (unlikely(rc < 0))
1045                 return rc;
1046         do {
1047                 if (likely(!tsk_conn_cong(tsk))) {
1048                         rc = tipc_node_xmit(net, pktchain, dnode, portid);
1049                         if (likely(!rc)) {
1050                                 tsk->sent_unacked++;
1051                                 sent += send;
1052                                 if (sent == dsz)
1053                                         return dsz;
1054                                 goto next;
1055                         }
1056                         if (rc == -EMSGSIZE) {
1057                                 __skb_queue_purge(pktchain);
1058                                 tsk->max_pkt = tipc_node_get_mtu(net, dnode,
1059                                                                  portid);
1060                                 m->msg_iter = save;
1061                                 goto next;
1062                         }
1063                         if (rc != -ELINKCONG)
1064                                 break;
1065
1066                         tsk->link_cong = 1;
1067                 }
1068                 rc = tipc_wait_for_sndpkt(sock, &timeo);
1069         } while (!rc);
1070
1071         __skb_queue_purge(pktchain);
1072         return sent ? sent : rc;
1073 }
1074
1075 /**
1076  * tipc_send_packet - send a connection-oriented message
1077  * @sock: socket structure
1078  * @m: message to send
1079  * @dsz: length of data to be transmitted
1080  *
1081  * Used for SOCK_SEQPACKET messages.
1082  *
1083  * Returns the number of bytes sent on success, or errno otherwise
1084  */
1085 static int tipc_send_packet(struct socket *sock, struct msghdr *m, size_t dsz)
1086 {
1087         if (dsz > TIPC_MAX_USER_MSG_SIZE)
1088                 return -EMSGSIZE;
1089
1090         return tipc_send_stream(sock, m, dsz);
1091 }
1092
1093 /* tipc_sk_finish_conn - complete the setup of a connection
1094  */
1095 static void tipc_sk_finish_conn(struct tipc_sock *tsk, u32 peer_port,
1096                                 u32 peer_node)
1097 {
1098         struct sock *sk = &tsk->sk;
1099         struct net *net = sock_net(sk);
1100         struct tipc_msg *msg = &tsk->phdr;
1101
1102         msg_set_destnode(msg, peer_node);
1103         msg_set_destport(msg, peer_port);
1104         msg_set_type(msg, TIPC_CONN_MSG);
1105         msg_set_lookup_scope(msg, 0);
1106         msg_set_hdr_sz(msg, SHORT_H_SIZE);
1107
1108         tsk->probing_intv = CONN_PROBING_INTERVAL;
1109         tsk->probing_state = TIPC_CONN_OK;
1110         tsk->connected = 1;
1111         sk_reset_timer(sk, &sk->sk_timer, jiffies + tsk->probing_intv);
1112         tipc_node_add_conn(net, peer_node, tsk->portid, peer_port);
1113         tsk->max_pkt = tipc_node_get_mtu(net, peer_node, tsk->portid);
1114 }
1115
1116 /**
1117  * set_orig_addr - capture sender's address for received message
1118  * @m: descriptor for message info
1119  * @msg: received message header
1120  *
1121  * Note: Address is not captured if not requested by receiver.
1122  */
1123 static void set_orig_addr(struct msghdr *m, struct tipc_msg *msg)
1124 {
1125         DECLARE_SOCKADDR(struct sockaddr_tipc *, addr, m->msg_name);
1126
1127         if (addr) {
1128                 addr->family = AF_TIPC;
1129                 addr->addrtype = TIPC_ADDR_ID;
1130                 memset(&addr->addr, 0, sizeof(addr->addr));
1131                 addr->addr.id.ref = msg_origport(msg);
1132                 addr->addr.id.node = msg_orignode(msg);
1133                 addr->addr.name.domain = 0;     /* could leave uninitialized */
1134                 addr->scope = 0;                /* could leave uninitialized */
1135                 m->msg_namelen = sizeof(struct sockaddr_tipc);
1136         }
1137 }
1138
1139 /**
1140  * tipc_sk_anc_data_recv - optionally capture ancillary data for received message
1141  * @m: descriptor for message info
1142  * @msg: received message header
1143  * @tsk: TIPC port associated with message
1144  *
1145  * Note: Ancillary data is not captured if not requested by receiver.
1146  *
1147  * Returns 0 if successful, otherwise errno
1148  */
1149 static int tipc_sk_anc_data_recv(struct msghdr *m, struct tipc_msg *msg,
1150                                  struct tipc_sock *tsk)
1151 {
1152         u32 anc_data[3];
1153         u32 err;
1154         u32 dest_type;
1155         int has_name;
1156         int res;
1157
1158         if (likely(m->msg_controllen == 0))
1159                 return 0;
1160
1161         /* Optionally capture errored message object(s) */
1162         err = msg ? msg_errcode(msg) : 0;
1163         if (unlikely(err)) {
1164                 anc_data[0] = err;
1165                 anc_data[1] = msg_data_sz(msg);
1166                 res = put_cmsg(m, SOL_TIPC, TIPC_ERRINFO, 8, anc_data);
1167                 if (res)
1168                         return res;
1169                 if (anc_data[1]) {
1170                         res = put_cmsg(m, SOL_TIPC, TIPC_RETDATA, anc_data[1],
1171                                        msg_data(msg));
1172                         if (res)
1173                                 return res;
1174                 }
1175         }
1176
1177         /* Optionally capture message destination object */
1178         dest_type = msg ? msg_type(msg) : TIPC_DIRECT_MSG;
1179         switch (dest_type) {
1180         case TIPC_NAMED_MSG:
1181                 has_name = 1;
1182                 anc_data[0] = msg_nametype(msg);
1183                 anc_data[1] = msg_namelower(msg);
1184                 anc_data[2] = msg_namelower(msg);
1185                 break;
1186         case TIPC_MCAST_MSG:
1187                 has_name = 1;
1188                 anc_data[0] = msg_nametype(msg);
1189                 anc_data[1] = msg_namelower(msg);
1190                 anc_data[2] = msg_nameupper(msg);
1191                 break;
1192         case TIPC_CONN_MSG:
1193                 has_name = (tsk->conn_type != 0);
1194                 anc_data[0] = tsk->conn_type;
1195                 anc_data[1] = tsk->conn_instance;
1196                 anc_data[2] = tsk->conn_instance;
1197                 break;
1198         default:
1199                 has_name = 0;
1200         }
1201         if (has_name) {
1202                 res = put_cmsg(m, SOL_TIPC, TIPC_DESTNAME, 12, anc_data);
1203                 if (res)
1204                         return res;
1205         }
1206
1207         return 0;
1208 }
1209
1210 static void tipc_sk_send_ack(struct tipc_sock *tsk, uint ack)
1211 {
1212         struct net *net = sock_net(&tsk->sk);
1213         struct sk_buff *skb = NULL;
1214         struct tipc_msg *msg;
1215         u32 peer_port = tsk_peer_port(tsk);
1216         u32 dnode = tsk_peer_node(tsk);
1217
1218         if (!tsk->connected)
1219                 return;
1220         skb = tipc_msg_create(CONN_MANAGER, CONN_ACK, INT_H_SIZE, 0,
1221                               dnode, tsk_own_node(tsk), peer_port,
1222                               tsk->portid, TIPC_OK);
1223         if (!skb)
1224                 return;
1225         msg = buf_msg(skb);
1226         msg_set_msgcnt(msg, ack);
1227         tipc_node_xmit_skb(net, skb, dnode, msg_link_selector(msg));
1228 }
1229
1230 static int tipc_wait_for_rcvmsg(struct socket *sock, long *timeop)
1231 {
1232         struct sock *sk = sock->sk;
1233         DEFINE_WAIT(wait);
1234         long timeo = *timeop;
1235         int err;
1236
1237         for (;;) {
1238                 prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
1239                 if (timeo && skb_queue_empty(&sk->sk_receive_queue)) {
1240                         if (sock->state == SS_DISCONNECTING) {
1241                                 err = -ENOTCONN;
1242                                 break;
1243                         }
1244                         release_sock(sk);
1245                         timeo = schedule_timeout(timeo);
1246                         lock_sock(sk);
1247                 }
1248                 err = 0;
1249                 if (!skb_queue_empty(&sk->sk_receive_queue))
1250                         break;
1251                 err = -EAGAIN;
1252                 if (!timeo)
1253                         break;
1254                 err = sock_intr_errno(timeo);
1255                 if (signal_pending(current))
1256                         break;
1257         }
1258         finish_wait(sk_sleep(sk), &wait);
1259         *timeop = timeo;
1260         return err;
1261 }
1262
1263 /**
1264  * tipc_recvmsg - receive packet-oriented message
1265  * @m: descriptor for message info
1266  * @buf_len: total size of user buffer area
1267  * @flags: receive flags
1268  *
1269  * Used for SOCK_DGRAM, SOCK_RDM, and SOCK_SEQPACKET messages.
1270  * If the complete message doesn't fit in user area, truncate it.
1271  *
1272  * Returns size of returned message data, errno otherwise
1273  */
1274 static int tipc_recvmsg(struct socket *sock, struct msghdr *m, size_t buf_len,
1275                         int flags)
1276 {
1277         struct sock *sk = sock->sk;
1278         struct tipc_sock *tsk = tipc_sk(sk);
1279         struct sk_buff *buf;
1280         struct tipc_msg *msg;
1281         long timeo;
1282         unsigned int sz;
1283         u32 err;
1284         int res;
1285
1286         /* Catch invalid receive requests */
1287         if (unlikely(!buf_len))
1288                 return -EINVAL;
1289
1290         lock_sock(sk);
1291
1292         if (unlikely(sock->state == SS_UNCONNECTED)) {
1293                 res = -ENOTCONN;
1294                 goto exit;
1295         }
1296
1297         timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
1298 restart:
1299
1300         /* Look for a message in receive queue; wait if necessary */
1301         res = tipc_wait_for_rcvmsg(sock, &timeo);
1302         if (res)
1303                 goto exit;
1304
1305         /* Look at first message in receive queue */
1306         buf = skb_peek(&sk->sk_receive_queue);
1307         msg = buf_msg(buf);
1308         sz = msg_data_sz(msg);
1309         err = msg_errcode(msg);
1310
1311         /* Discard an empty non-errored message & try again */
1312         if ((!sz) && (!err)) {
1313                 tsk_advance_rx_queue(sk);
1314                 goto restart;
1315         }
1316
1317         /* Capture sender's address (optional) */
1318         set_orig_addr(m, msg);
1319
1320         /* Capture ancillary data (optional) */
1321         res = tipc_sk_anc_data_recv(m, msg, tsk);
1322         if (res)
1323                 goto exit;
1324
1325         /* Capture message data (if valid) & compute return value (always) */
1326         if (!err) {
1327                 if (unlikely(buf_len < sz)) {
1328                         sz = buf_len;
1329                         m->msg_flags |= MSG_TRUNC;
1330                 }
1331                 res = skb_copy_datagram_msg(buf, msg_hdr_sz(msg), m, sz);
1332                 if (res)
1333                         goto exit;
1334                 res = sz;
1335         } else {
1336                 if ((sock->state == SS_READY) ||
1337                     ((err == TIPC_CONN_SHUTDOWN) || m->msg_control))
1338                         res = 0;
1339                 else
1340                         res = -ECONNRESET;
1341         }
1342
1343         /* Consume received message (optional) */
1344         if (likely(!(flags & MSG_PEEK))) {
1345                 if ((sock->state != SS_READY) &&
1346                     (++tsk->rcv_unacked >= TIPC_CONNACK_INTV)) {
1347                         tipc_sk_send_ack(tsk, tsk->rcv_unacked);
1348                         tsk->rcv_unacked = 0;
1349                 }
1350                 tsk_advance_rx_queue(sk);
1351         }
1352 exit:
1353         release_sock(sk);
1354         return res;
1355 }
1356
1357 /**
1358  * tipc_recv_stream - receive stream-oriented data
1359  * @m: descriptor for message info
1360  * @buf_len: total size of user buffer area
1361  * @flags: receive flags
1362  *
1363  * Used for SOCK_STREAM messages only.  If not enough data is available
1364  * will optionally wait for more; never truncates data.
1365  *
1366  * Returns size of returned message data, errno otherwise
1367  */
1368 static int tipc_recv_stream(struct socket *sock, struct msghdr *m,
1369                             size_t buf_len, int flags)
1370 {
1371         struct sock *sk = sock->sk;
1372         struct tipc_sock *tsk = tipc_sk(sk);
1373         struct sk_buff *buf;
1374         struct tipc_msg *msg;
1375         long timeo;
1376         unsigned int sz;
1377         int sz_to_copy, target, needed;
1378         int sz_copied = 0;
1379         u32 err;
1380         int res = 0;
1381
1382         /* Catch invalid receive attempts */
1383         if (unlikely(!buf_len))
1384                 return -EINVAL;
1385
1386         lock_sock(sk);
1387
1388         if (unlikely(sock->state == SS_UNCONNECTED)) {
1389                 res = -ENOTCONN;
1390                 goto exit;
1391         }
1392
1393         target = sock_rcvlowat(sk, flags & MSG_WAITALL, buf_len);
1394         timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
1395
1396 restart:
1397         /* Look for a message in receive queue; wait if necessary */
1398         res = tipc_wait_for_rcvmsg(sock, &timeo);
1399         if (res)
1400                 goto exit;
1401
1402         /* Look at first message in receive queue */
1403         buf = skb_peek(&sk->sk_receive_queue);
1404         msg = buf_msg(buf);
1405         sz = msg_data_sz(msg);
1406         err = msg_errcode(msg);
1407
1408         /* Discard an empty non-errored message & try again */
1409         if ((!sz) && (!err)) {
1410                 tsk_advance_rx_queue(sk);
1411                 goto restart;
1412         }
1413
1414         /* Optionally capture sender's address & ancillary data of first msg */
1415         if (sz_copied == 0) {
1416                 set_orig_addr(m, msg);
1417                 res = tipc_sk_anc_data_recv(m, msg, tsk);
1418                 if (res)
1419                         goto exit;
1420         }
1421
1422         /* Capture message data (if valid) & compute return value (always) */
1423         if (!err) {
1424                 u32 offset = (u32)(unsigned long)(TIPC_SKB_CB(buf)->handle);
1425
1426                 sz -= offset;
1427                 needed = (buf_len - sz_copied);
1428                 sz_to_copy = (sz <= needed) ? sz : needed;
1429
1430                 res = skb_copy_datagram_msg(buf, msg_hdr_sz(msg) + offset,
1431                                             m, sz_to_copy);
1432                 if (res)
1433                         goto exit;
1434
1435                 sz_copied += sz_to_copy;
1436
1437                 if (sz_to_copy < sz) {
1438                         if (!(flags & MSG_PEEK))
1439                                 TIPC_SKB_CB(buf)->handle =
1440                                 (void *)(unsigned long)(offset + sz_to_copy);
1441                         goto exit;
1442                 }
1443         } else {
1444                 if (sz_copied != 0)
1445                         goto exit; /* can't add error msg to valid data */
1446
1447                 if ((err == TIPC_CONN_SHUTDOWN) || m->msg_control)
1448                         res = 0;
1449                 else
1450                         res = -ECONNRESET;
1451         }
1452
1453         /* Consume received message (optional) */
1454         if (likely(!(flags & MSG_PEEK))) {
1455                 if (unlikely(++tsk->rcv_unacked >= TIPC_CONNACK_INTV)) {
1456                         tipc_sk_send_ack(tsk, tsk->rcv_unacked);
1457                         tsk->rcv_unacked = 0;
1458                 }
1459                 tsk_advance_rx_queue(sk);
1460         }
1461
1462         /* Loop around if more data is required */
1463         if ((sz_copied < buf_len) &&    /* didn't get all requested data */
1464             (!skb_queue_empty(&sk->sk_receive_queue) ||
1465             (sz_copied < target)) &&    /* and more is ready or required */
1466             (!(flags & MSG_PEEK)) &&    /* and aren't just peeking at data */
1467             (!err))                     /* and haven't reached a FIN */
1468                 goto restart;
1469
1470 exit:
1471         release_sock(sk);
1472         return sz_copied ? sz_copied : res;
1473 }
1474
1475 /**
1476  * tipc_write_space - wake up thread if port congestion is released
1477  * @sk: socket
1478  */
1479 static void tipc_write_space(struct sock *sk)
1480 {
1481         struct socket_wq *wq;
1482
1483         rcu_read_lock();
1484         wq = rcu_dereference(sk->sk_wq);
1485         if (wq_has_sleeper(wq))
1486                 wake_up_interruptible_sync_poll(&wq->wait, POLLOUT |
1487                                                 POLLWRNORM | POLLWRBAND);
1488         rcu_read_unlock();
1489 }
1490
1491 /**
1492  * tipc_data_ready - wake up threads to indicate messages have been received
1493  * @sk: socket
1494  * @len: the length of messages
1495  */
1496 static void tipc_data_ready(struct sock *sk)
1497 {
1498         struct socket_wq *wq;
1499
1500         rcu_read_lock();
1501         wq = rcu_dereference(sk->sk_wq);
1502         if (wq_has_sleeper(wq))
1503                 wake_up_interruptible_sync_poll(&wq->wait, POLLIN |
1504                                                 POLLRDNORM | POLLRDBAND);
1505         rcu_read_unlock();
1506 }
1507
1508 /**
1509  * filter_connect - Handle all incoming messages for a connection-based socket
1510  * @tsk: TIPC socket
1511  * @skb: pointer to message buffer. Set to NULL if buffer is consumed
1512  *
1513  * Returns 0 (TIPC_OK) if everything ok, -TIPC_ERR_NO_PORT otherwise
1514  */
1515 static int filter_connect(struct tipc_sock *tsk, struct sk_buff **skb)
1516 {
1517         struct sock *sk = &tsk->sk;
1518         struct net *net = sock_net(sk);
1519         struct socket *sock = sk->sk_socket;
1520         struct tipc_msg *msg = buf_msg(*skb);
1521         int retval = -TIPC_ERR_NO_PORT;
1522
1523         if (msg_mcast(msg))
1524                 return retval;
1525
1526         switch ((int)sock->state) {
1527         case SS_CONNECTED:
1528                 /* Accept only connection-based messages sent by peer */
1529                 if (tsk_peer_msg(tsk, msg)) {
1530                         if (unlikely(msg_errcode(msg))) {
1531                                 sock->state = SS_DISCONNECTING;
1532                                 tsk->connected = 0;
1533                                 /* let timer expire on it's own */
1534                                 tipc_node_remove_conn(net, tsk_peer_node(tsk),
1535                                                       tsk->portid);
1536                         }
1537                         retval = TIPC_OK;
1538                 }
1539                 break;
1540         case SS_CONNECTING:
1541                 /* Accept only ACK or NACK message */
1542
1543                 if (unlikely(!msg_connected(msg)))
1544                         break;
1545
1546                 if (unlikely(msg_errcode(msg))) {
1547                         sock->state = SS_DISCONNECTING;
1548                         sk->sk_err = ECONNREFUSED;
1549                         retval = TIPC_OK;
1550                         break;
1551                 }
1552
1553                 if (unlikely(msg_importance(msg) > TIPC_CRITICAL_IMPORTANCE)) {
1554                         sock->state = SS_DISCONNECTING;
1555                         sk->sk_err = EINVAL;
1556                         retval = TIPC_OK;
1557                         break;
1558                 }
1559
1560                 tipc_sk_finish_conn(tsk, msg_origport(msg), msg_orignode(msg));
1561                 msg_set_importance(&tsk->phdr, msg_importance(msg));
1562                 sock->state = SS_CONNECTED;
1563
1564                 /* If an incoming message is an 'ACK-', it should be
1565                  * discarded here because it doesn't contain useful
1566                  * data. In addition, we should try to wake up
1567                  * connect() routine if sleeping.
1568                  */
1569                 if (msg_data_sz(msg) == 0) {
1570                         kfree_skb(*skb);
1571                         *skb = NULL;
1572                         if (waitqueue_active(sk_sleep(sk)))
1573                                 wake_up_interruptible(sk_sleep(sk));
1574                 }
1575                 retval = TIPC_OK;
1576                 break;
1577         case SS_LISTENING:
1578         case SS_UNCONNECTED:
1579                 /* Accept only SYN message */
1580                 if (!msg_connected(msg) && !(msg_errcode(msg)))
1581                         retval = TIPC_OK;
1582                 break;
1583         case SS_DISCONNECTING:
1584                 break;
1585         default:
1586                 pr_err("Unknown socket state %u\n", sock->state);
1587         }
1588         return retval;
1589 }
1590
1591 /**
1592  * rcvbuf_limit - get proper overload limit of socket receive queue
1593  * @sk: socket
1594  * @buf: message
1595  *
1596  * For all connection oriented messages, irrespective of importance,
1597  * the default overload value (i.e. 67MB) is set as limit.
1598  *
1599  * For all connectionless messages, by default new queue limits are
1600  * as belows:
1601  *
1602  * TIPC_LOW_IMPORTANCE       (4 MB)
1603  * TIPC_MEDIUM_IMPORTANCE    (8 MB)
1604  * TIPC_HIGH_IMPORTANCE      (16 MB)
1605  * TIPC_CRITICAL_IMPORTANCE  (32 MB)
1606  *
1607  * Returns overload limit according to corresponding message importance
1608  */
1609 static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *buf)
1610 {
1611         struct tipc_msg *msg = buf_msg(buf);
1612
1613         if (msg_connected(msg))
1614                 return sysctl_tipc_rmem[2];
1615
1616         return sk->sk_rcvbuf >> TIPC_CRITICAL_IMPORTANCE <<
1617                 msg_importance(msg);
1618 }
1619
1620 /**
1621  * filter_rcv - validate incoming message
1622  * @sk: socket
1623  * @skb: pointer to message. Set to NULL if buffer is consumed.
1624  *
1625  * Enqueues message on receive queue if acceptable; optionally handles
1626  * disconnect indication for a connected socket.
1627  *
1628  * Called with socket lock already taken
1629  *
1630  * Returns 0 (TIPC_OK) if message was ok, -TIPC error code if rejected
1631  */
1632 static int filter_rcv(struct sock *sk, struct sk_buff **skb)
1633 {
1634         struct socket *sock = sk->sk_socket;
1635         struct tipc_sock *tsk = tipc_sk(sk);
1636         struct tipc_msg *msg = buf_msg(*skb);
1637         unsigned int limit = rcvbuf_limit(sk, *skb);
1638         int rc = TIPC_OK;
1639
1640         if (unlikely(msg_user(msg) == CONN_MANAGER)) {
1641                 tipc_sk_proto_rcv(tsk, skb);
1642                 return TIPC_OK;
1643         }
1644
1645         if (unlikely(msg_user(msg) == SOCK_WAKEUP)) {
1646                 kfree_skb(*skb);
1647                 tsk->link_cong = 0;
1648                 sk->sk_write_space(sk);
1649                 *skb = NULL;
1650                 return TIPC_OK;
1651         }
1652
1653         /* Reject message if it is wrong sort of message for socket */
1654         if (msg_type(msg) > TIPC_DIRECT_MSG)
1655                 return -TIPC_ERR_NO_PORT;
1656
1657         if (sock->state == SS_READY) {
1658                 if (msg_connected(msg))
1659                         return -TIPC_ERR_NO_PORT;
1660         } else {
1661                 rc = filter_connect(tsk, skb);
1662                 if (rc != TIPC_OK || !*skb)
1663                         return rc;
1664         }
1665
1666         /* Reject message if there isn't room to queue it */
1667         if (sk_rmem_alloc_get(sk) + (*skb)->truesize >= limit)
1668                 return -TIPC_ERR_OVERLOAD;
1669
1670         /* Enqueue message */
1671         TIPC_SKB_CB(*skb)->handle = NULL;
1672         __skb_queue_tail(&sk->sk_receive_queue, *skb);
1673         skb_set_owner_r(*skb, sk);
1674
1675         sk->sk_data_ready(sk);
1676         *skb = NULL;
1677         return TIPC_OK;
1678 }
1679
1680 /**
1681  * tipc_backlog_rcv - handle incoming message from backlog queue
1682  * @sk: socket
1683  * @skb: message
1684  *
1685  * Caller must hold socket lock
1686  *
1687  * Returns 0
1688  */
1689 static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb)
1690 {
1691         int err;
1692         atomic_t *dcnt;
1693         u32 dnode;
1694         struct tipc_sock *tsk = tipc_sk(sk);
1695         struct net *net = sock_net(sk);
1696         uint truesize = skb->truesize;
1697
1698         err = filter_rcv(sk, &skb);
1699         if (likely(!skb)) {
1700                 dcnt = &tsk->dupl_rcvcnt;
1701                 if (atomic_read(dcnt) < TIPC_CONN_OVERLOAD_LIMIT)
1702                         atomic_add(truesize, dcnt);
1703                 return 0;
1704         }
1705         if (!err || tipc_msg_reverse(tsk_own_node(tsk), skb, &dnode, -err))
1706                 tipc_node_xmit_skb(net, skb, dnode, tsk->portid);
1707         return 0;
1708 }
1709
1710 /**
1711  * tipc_sk_enqueue - extract all buffers with destination 'dport' from
1712  *                   inputq and try adding them to socket or backlog queue
1713  * @inputq: list of incoming buffers with potentially different destinations
1714  * @sk: socket where the buffers should be enqueued
1715  * @dport: port number for the socket
1716  * @_skb: returned buffer to be forwarded or rejected, if applicable
1717  *
1718  * Caller must hold socket lock
1719  *
1720  * Returns TIPC_OK if all buffers enqueued, otherwise -TIPC_ERR_OVERLOAD
1721  * or -TIPC_ERR_NO_PORT
1722  */
1723 static int tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk,
1724                            u32 dport, struct sk_buff **_skb)
1725 {
1726         unsigned int lim;
1727         atomic_t *dcnt;
1728         int err;
1729         struct sk_buff *skb;
1730         unsigned long time_limit = jiffies + 2;
1731
1732         while (skb_queue_len(inputq)) {
1733                 if (unlikely(time_after_eq(jiffies, time_limit)))
1734                         return TIPC_OK;
1735                 skb = tipc_skb_dequeue(inputq, dport);
1736                 if (unlikely(!skb))
1737                         return TIPC_OK;
1738                 if (!sock_owned_by_user(sk)) {
1739                         err = filter_rcv(sk, &skb);
1740                         if (likely(!skb))
1741                                 continue;
1742                         *_skb = skb;
1743                         return err;
1744                 }
1745                 dcnt = &tipc_sk(sk)->dupl_rcvcnt;
1746                 if (sk->sk_backlog.len)
1747                         atomic_set(dcnt, 0);
1748                 lim = rcvbuf_limit(sk, skb) + atomic_read(dcnt);
1749                 if (likely(!sk_add_backlog(sk, skb, lim)))
1750                         continue;
1751                 *_skb = skb;
1752                 return -TIPC_ERR_OVERLOAD;
1753         }
1754         return TIPC_OK;
1755 }
1756
1757 /**
1758  * tipc_sk_rcv - handle a chain of incoming buffers
1759  * @inputq: buffer list containing the buffers
1760  * Consumes all buffers in list until inputq is empty
1761  * Note: may be called in multiple threads referring to the same queue
1762  * Returns 0 if last buffer was accepted, otherwise -EHOSTUNREACH
1763  * Only node local calls check the return value, sending single-buffer queues
1764  */
1765 int tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq)
1766 {
1767         u32 dnode, dport = 0;
1768         int err;
1769         struct sk_buff *skb;
1770         struct tipc_sock *tsk;
1771         struct tipc_net *tn;
1772         struct sock *sk;
1773
1774         while (skb_queue_len(inputq)) {
1775                 err = -TIPC_ERR_NO_PORT;
1776                 skb = NULL;
1777                 dport = tipc_skb_peek_port(inputq, dport);
1778                 tsk = tipc_sk_lookup(net, dport);
1779                 if (likely(tsk)) {
1780                         sk = &tsk->sk;
1781                         if (likely(spin_trylock_bh(&sk->sk_lock.slock))) {
1782                                 err = tipc_sk_enqueue(inputq, sk, dport, &skb);
1783                                 spin_unlock_bh(&sk->sk_lock.slock);
1784                                 dport = 0;
1785                         }
1786                         sock_put(sk);
1787                 } else {
1788                         skb = tipc_skb_dequeue(inputq, dport);
1789                 }
1790                 if (likely(!skb))
1791                         continue;
1792                 if (tipc_msg_lookup_dest(net, skb, &dnode, &err))
1793                         goto xmit;
1794                 if (!err) {
1795                         dnode = msg_destnode(buf_msg(skb));
1796                         goto xmit;
1797                 }
1798                 tn = net_generic(net, tipc_net_id);
1799                 if (!tipc_msg_reverse(tn->own_addr, skb, &dnode, -err))
1800                         continue;
1801 xmit:
1802                 tipc_node_xmit_skb(net, skb, dnode, dport);
1803         }
1804         return err ? -EHOSTUNREACH : 0;
1805 }
1806
1807 static int tipc_wait_for_connect(struct socket *sock, long *timeo_p)
1808 {
1809         struct sock *sk = sock->sk;
1810         DEFINE_WAIT(wait);
1811         int done;
1812
1813         do {
1814                 int err = sock_error(sk);
1815                 if (err)
1816                         return err;
1817                 if (!*timeo_p)
1818                         return -ETIMEDOUT;
1819                 if (signal_pending(current))
1820                         return sock_intr_errno(*timeo_p);
1821
1822                 prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
1823                 done = sk_wait_event(sk, timeo_p, sock->state != SS_CONNECTING);
1824                 finish_wait(sk_sleep(sk), &wait);
1825         } while (!done);
1826         return 0;
1827 }
1828
1829 /**
1830  * tipc_connect - establish a connection to another TIPC port
1831  * @sock: socket structure
1832  * @dest: socket address for destination port
1833  * @destlen: size of socket address data structure
1834  * @flags: file-related flags associated with socket
1835  *
1836  * Returns 0 on success, errno otherwise
1837  */
1838 static int tipc_connect(struct socket *sock, struct sockaddr *dest,
1839                         int destlen, int flags)
1840 {
1841         struct sock *sk = sock->sk;
1842         struct tipc_sock *tsk = tipc_sk(sk);
1843         struct sockaddr_tipc *dst = (struct sockaddr_tipc *)dest;
1844         struct msghdr m = {NULL,};
1845         long timeout = (flags & O_NONBLOCK) ? 0 : tsk->conn_timeout;
1846         socket_state previous;
1847         int res = 0;
1848
1849         lock_sock(sk);
1850
1851         /* DGRAM/RDM connect(), just save the destaddr */
1852         if (sock->state == SS_READY) {
1853                 if (dst->family == AF_UNSPEC) {
1854                         memset(&tsk->remote, 0, sizeof(struct sockaddr_tipc));
1855                         tsk->connected = 0;
1856                 } else if (destlen != sizeof(struct sockaddr_tipc)) {
1857                         res = -EINVAL;
1858                 } else {
1859                         memcpy(&tsk->remote, dest, destlen);
1860                         tsk->connected = 1;
1861                 }
1862                 goto exit;
1863         }
1864
1865         /*
1866          * Reject connection attempt using multicast address
1867          *
1868          * Note: send_msg() validates the rest of the address fields,
1869          *       so there's no need to do it here
1870          */
1871         if (dst->addrtype == TIPC_ADDR_MCAST) {
1872                 res = -EINVAL;
1873                 goto exit;
1874         }
1875
1876         previous = sock->state;
1877         switch (sock->state) {
1878         case SS_UNCONNECTED:
1879                 /* Send a 'SYN-' to destination */
1880                 m.msg_name = dest;
1881                 m.msg_namelen = destlen;
1882
1883                 /* If connect is in non-blocking case, set MSG_DONTWAIT to
1884                  * indicate send_msg() is never blocked.
1885                  */
1886                 if (!timeout)
1887                         m.msg_flags = MSG_DONTWAIT;
1888
1889                 res = __tipc_sendmsg(sock, &m, 0);
1890                 if ((res < 0) && (res != -EWOULDBLOCK))
1891                         goto exit;
1892
1893                 /* Just entered SS_CONNECTING state; the only
1894                  * difference is that return value in non-blocking
1895                  * case is EINPROGRESS, rather than EALREADY.
1896                  */
1897                 res = -EINPROGRESS;
1898         case SS_CONNECTING:
1899                 if (previous == SS_CONNECTING)
1900                         res = -EALREADY;
1901                 if (!timeout)
1902                         goto exit;
1903                 timeout = msecs_to_jiffies(timeout);
1904                 /* Wait until an 'ACK' or 'RST' arrives, or a timeout occurs */
1905                 res = tipc_wait_for_connect(sock, &timeout);
1906                 break;
1907         case SS_CONNECTED:
1908                 res = -EISCONN;
1909                 break;
1910         default:
1911                 res = -EINVAL;
1912                 break;
1913         }
1914 exit:
1915         release_sock(sk);
1916         return res;
1917 }
1918
1919 /**
1920  * tipc_listen - allow socket to listen for incoming connections
1921  * @sock: socket structure
1922  * @len: (unused)
1923  *
1924  * Returns 0 on success, errno otherwise
1925  */
1926 static int tipc_listen(struct socket *sock, int len)
1927 {
1928         struct sock *sk = sock->sk;
1929         int res;
1930
1931         lock_sock(sk);
1932
1933         if (sock->state != SS_UNCONNECTED)
1934                 res = -EINVAL;
1935         else {
1936                 sock->state = SS_LISTENING;
1937                 res = 0;
1938         }
1939
1940         release_sock(sk);
1941         return res;
1942 }
1943
1944 static int tipc_wait_for_accept(struct socket *sock, long timeo)
1945 {
1946         struct sock *sk = sock->sk;
1947         DEFINE_WAIT(wait);
1948         int err;
1949
1950         /* True wake-one mechanism for incoming connections: only
1951          * one process gets woken up, not the 'whole herd'.
1952          * Since we do not 'race & poll' for established sockets
1953          * anymore, the common case will execute the loop only once.
1954         */
1955         for (;;) {
1956                 prepare_to_wait_exclusive(sk_sleep(sk), &wait,
1957                                           TASK_INTERRUPTIBLE);
1958                 if (timeo && skb_queue_empty(&sk->sk_receive_queue)) {
1959                         release_sock(sk);
1960                         timeo = schedule_timeout(timeo);
1961                         lock_sock(sk);
1962                 }
1963                 err = 0;
1964                 if (!skb_queue_empty(&sk->sk_receive_queue))
1965                         break;
1966                 err = -EINVAL;
1967                 if (sock->state != SS_LISTENING)
1968                         break;
1969                 err = -EAGAIN;
1970                 if (!timeo)
1971                         break;
1972                 err = sock_intr_errno(timeo);
1973                 if (signal_pending(current))
1974                         break;
1975         }
1976         finish_wait(sk_sleep(sk), &wait);
1977         return err;
1978 }
1979
1980 /**
1981  * tipc_accept - wait for connection request
1982  * @sock: listening socket
1983  * @newsock: new socket that is to be connected
1984  * @flags: file-related flags associated with socket
1985  *
1986  * Returns 0 on success, errno otherwise
1987  */
1988 static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags)
1989 {
1990         struct sock *new_sk, *sk = sock->sk;
1991         struct sk_buff *buf;
1992         struct tipc_sock *new_tsock;
1993         struct tipc_msg *msg;
1994         long timeo;
1995         int res;
1996
1997         lock_sock(sk);
1998
1999         if (sock->state != SS_LISTENING) {
2000                 res = -EINVAL;
2001                 goto exit;
2002         }
2003         timeo = sock_rcvtimeo(sk, flags & O_NONBLOCK);
2004         res = tipc_wait_for_accept(sock, timeo);
2005         if (res)
2006                 goto exit;
2007
2008         buf = skb_peek(&sk->sk_receive_queue);
2009
2010         res = tipc_sk_create(sock_net(sock->sk), new_sock, 0, 1);
2011         if (res)
2012                 goto exit;
2013         security_sk_clone(sock->sk, new_sock->sk);
2014
2015         new_sk = new_sock->sk;
2016         new_tsock = tipc_sk(new_sk);
2017         msg = buf_msg(buf);
2018
2019         /* we lock on new_sk; but lockdep sees the lock on sk */
2020         lock_sock_nested(new_sk, SINGLE_DEPTH_NESTING);
2021
2022         /*
2023          * Reject any stray messages received by new socket
2024          * before the socket lock was taken (very, very unlikely)
2025          */
2026         tsk_rej_rx_queue(new_sk);
2027
2028         /* Connect new socket to it's peer */
2029         tipc_sk_finish_conn(new_tsock, msg_origport(msg), msg_orignode(msg));
2030         new_sock->state = SS_CONNECTED;
2031
2032         tsk_set_importance(new_tsock, msg_importance(msg));
2033         if (msg_named(msg)) {
2034                 new_tsock->conn_type = msg_nametype(msg);
2035                 new_tsock->conn_instance = msg_nameinst(msg);
2036         }
2037
2038         /*
2039          * Respond to 'SYN-' by discarding it & returning 'ACK'-.
2040          * Respond to 'SYN+' by queuing it on new socket.
2041          */
2042         if (!msg_data_sz(msg)) {
2043                 struct msghdr m = {NULL,};
2044
2045                 tsk_advance_rx_queue(sk);
2046                 __tipc_send_stream(new_sock, &m, 0);
2047         } else {
2048                 __skb_dequeue(&sk->sk_receive_queue);
2049                 __skb_queue_head(&new_sk->sk_receive_queue, buf);
2050                 skb_set_owner_r(buf, new_sk);
2051         }
2052         release_sock(new_sk);
2053 exit:
2054         release_sock(sk);
2055         return res;
2056 }
2057
2058 /**
2059  * tipc_shutdown - shutdown socket connection
2060  * @sock: socket structure
2061  * @how: direction to close (must be SHUT_RDWR)
2062  *
2063  * Terminates connection (if necessary), then purges socket's receive queue.
2064  *
2065  * Returns 0 on success, errno otherwise
2066  */
2067 static int tipc_shutdown(struct socket *sock, int how)
2068 {
2069         struct sock *sk = sock->sk;
2070         struct net *net = sock_net(sk);
2071         struct tipc_sock *tsk = tipc_sk(sk);
2072         struct sk_buff *skb;
2073         u32 dnode;
2074         int res;
2075
2076         if (how != SHUT_RDWR)
2077                 return -EINVAL;
2078
2079         lock_sock(sk);
2080
2081         switch (sock->state) {
2082         case SS_CONNECTING:
2083         case SS_CONNECTED:
2084
2085 restart:
2086                 /* Disconnect and send a 'FIN+' or 'FIN-' message to peer */
2087                 skb = __skb_dequeue(&sk->sk_receive_queue);
2088                 if (skb) {
2089                         if (TIPC_SKB_CB(skb)->handle != NULL) {
2090                                 kfree_skb(skb);
2091                                 goto restart;
2092                         }
2093                         if (tipc_msg_reverse(tsk_own_node(tsk), skb, &dnode,
2094                                              TIPC_CONN_SHUTDOWN))
2095                                 tipc_node_xmit_skb(net, skb, dnode,
2096                                                    tsk->portid);
2097                 } else {
2098                         dnode = tsk_peer_node(tsk);
2099
2100                         skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE,
2101                                               TIPC_CONN_MSG, SHORT_H_SIZE,
2102                                               0, dnode, tsk_own_node(tsk),
2103                                               tsk_peer_port(tsk),
2104                                               tsk->portid, TIPC_CONN_SHUTDOWN);
2105                         tipc_node_xmit_skb(net, skb, dnode, tsk->portid);
2106                 }
2107                 tsk->connected = 0;
2108                 sock->state = SS_DISCONNECTING;
2109                 tipc_node_remove_conn(net, dnode, tsk->portid);
2110                 /* fall through */
2111
2112         case SS_DISCONNECTING:
2113
2114                 /* Discard any unreceived messages */
2115                 __skb_queue_purge(&sk->sk_receive_queue);
2116
2117                 /* Wake up anyone sleeping in poll */
2118                 sk->sk_state_change(sk);
2119                 res = 0;
2120                 break;
2121
2122         default:
2123                 res = -ENOTCONN;
2124         }
2125
2126         release_sock(sk);
2127         return res;
2128 }
2129
2130 static void tipc_sk_timeout(unsigned long data)
2131 {
2132         struct tipc_sock *tsk = (struct tipc_sock *)data;
2133         struct sock *sk = &tsk->sk;
2134         struct sk_buff *skb = NULL;
2135         u32 peer_port, peer_node;
2136         u32 own_node = tsk_own_node(tsk);
2137
2138         bh_lock_sock(sk);
2139         if (!tsk->connected) {
2140                 bh_unlock_sock(sk);
2141                 goto exit;
2142         }
2143         peer_port = tsk_peer_port(tsk);
2144         peer_node = tsk_peer_node(tsk);
2145
2146         if (tsk->probing_state == TIPC_CONN_PROBING) {
2147                 if (!sock_owned_by_user(sk)) {
2148                         sk->sk_socket->state = SS_DISCONNECTING;
2149                         tsk->connected = 0;
2150                         tipc_node_remove_conn(sock_net(sk), tsk_peer_node(tsk),
2151                                               tsk_peer_port(tsk));
2152                         sk->sk_state_change(sk);
2153                 } else {
2154                         /* Try again later */
2155                         sk_reset_timer(sk, &sk->sk_timer, (HZ / 20));
2156                 }
2157
2158         } else {
2159                 skb = tipc_msg_create(CONN_MANAGER, CONN_PROBE,
2160                                       INT_H_SIZE, 0, peer_node, own_node,
2161                                       peer_port, tsk->portid, TIPC_OK);
2162                 tsk->probing_state = TIPC_CONN_PROBING;
2163                 sk_reset_timer(sk, &sk->sk_timer, jiffies + tsk->probing_intv);
2164         }
2165         bh_unlock_sock(sk);
2166         if (skb)
2167                 tipc_node_xmit_skb(sock_net(sk), skb, peer_node, tsk->portid);
2168 exit:
2169         sock_put(sk);
2170 }
2171
2172 static int tipc_sk_publish(struct tipc_sock *tsk, uint scope,
2173                            struct tipc_name_seq const *seq)
2174 {
2175         struct net *net = sock_net(&tsk->sk);
2176         struct publication *publ;
2177         u32 key;
2178
2179         if (tsk->connected)
2180                 return -EINVAL;
2181         key = tsk->portid + tsk->pub_count + 1;
2182         if (key == tsk->portid)
2183                 return -EADDRINUSE;
2184
2185         publ = tipc_nametbl_publish(net, seq->type, seq->lower, seq->upper,
2186                                     scope, tsk->portid, key);
2187         if (unlikely(!publ))
2188                 return -EINVAL;
2189
2190         list_add(&publ->pport_list, &tsk->publications);
2191         tsk->pub_count++;
2192         tsk->published = 1;
2193         return 0;
2194 }
2195
2196 static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope,
2197                             struct tipc_name_seq const *seq)
2198 {
2199         struct net *net = sock_net(&tsk->sk);
2200         struct publication *publ;
2201         struct publication *safe;
2202         int rc = -EINVAL;
2203
2204         list_for_each_entry_safe(publ, safe, &tsk->publications, pport_list) {
2205                 if (seq) {
2206                         if (publ->scope != scope)
2207                                 continue;
2208                         if (publ->type != seq->type)
2209                                 continue;
2210                         if (publ->lower != seq->lower)
2211                                 continue;
2212                         if (publ->upper != seq->upper)
2213                                 break;
2214                         tipc_nametbl_withdraw(net, publ->type, publ->lower,
2215                                               publ->ref, publ->key);
2216                         rc = 0;
2217                         break;
2218                 }
2219                 tipc_nametbl_withdraw(net, publ->type, publ->lower,
2220                                       publ->ref, publ->key);
2221                 rc = 0;
2222         }
2223         if (list_empty(&tsk->publications))
2224                 tsk->published = 0;
2225         return rc;
2226 }
2227
2228 /* tipc_sk_reinit: set non-zero address in all existing sockets
2229  *                 when we go from standalone to network mode.
2230  */
2231 void tipc_sk_reinit(struct net *net)
2232 {
2233         struct tipc_net *tn = net_generic(net, tipc_net_id);
2234         const struct bucket_table *tbl;
2235         struct rhash_head *pos;
2236         struct tipc_sock *tsk;
2237         struct tipc_msg *msg;
2238         int i;
2239
2240         rcu_read_lock();
2241         tbl = rht_dereference_rcu((&tn->sk_rht)->tbl, &tn->sk_rht);
2242         for (i = 0; i < tbl->size; i++) {
2243                 rht_for_each_entry_rcu(tsk, pos, tbl, i, node) {
2244                         spin_lock_bh(&tsk->sk.sk_lock.slock);
2245                         msg = &tsk->phdr;
2246                         msg_set_prevnode(msg, tn->own_addr);
2247                         msg_set_orignode(msg, tn->own_addr);
2248                         spin_unlock_bh(&tsk->sk.sk_lock.slock);
2249                 }
2250         }
2251         rcu_read_unlock();
2252 }
2253
2254 static struct tipc_sock *tipc_sk_lookup(struct net *net, u32 portid)
2255 {
2256         struct tipc_net *tn = net_generic(net, tipc_net_id);
2257         struct tipc_sock *tsk;
2258
2259         rcu_read_lock();
2260         tsk = rhashtable_lookup_fast(&tn->sk_rht, &portid, tsk_rht_params);
2261         if (tsk)
2262                 sock_hold(&tsk->sk);
2263         rcu_read_unlock();
2264
2265         return tsk;
2266 }
2267
2268 static int tipc_sk_insert(struct tipc_sock *tsk)
2269 {
2270         struct sock *sk = &tsk->sk;
2271         struct net *net = sock_net(sk);
2272         struct tipc_net *tn = net_generic(net, tipc_net_id);
2273         u32 remaining = (TIPC_MAX_PORT - TIPC_MIN_PORT) + 1;
2274         u32 portid = prandom_u32() % remaining + TIPC_MIN_PORT;
2275
2276         while (remaining--) {
2277                 portid++;
2278                 if ((portid < TIPC_MIN_PORT) || (portid > TIPC_MAX_PORT))
2279                         portid = TIPC_MIN_PORT;
2280                 tsk->portid = portid;
2281                 sock_hold(&tsk->sk);
2282                 if (!rhashtable_lookup_insert_fast(&tn->sk_rht, &tsk->node,
2283                                                    tsk_rht_params))
2284                         return 0;
2285                 sock_put(&tsk->sk);
2286         }
2287
2288         return -1;
2289 }
2290
2291 static void tipc_sk_remove(struct tipc_sock *tsk)
2292 {
2293         struct sock *sk = &tsk->sk;
2294         struct tipc_net *tn = net_generic(sock_net(sk), tipc_net_id);
2295
2296         if (!rhashtable_remove_fast(&tn->sk_rht, &tsk->node, tsk_rht_params)) {
2297                 WARN_ON(atomic_read(&sk->sk_refcnt) == 1);
2298                 __sock_put(sk);
2299         }
2300 }
2301
2302 static const struct rhashtable_params tsk_rht_params = {
2303         .nelem_hint = 192,
2304         .head_offset = offsetof(struct tipc_sock, node),
2305         .key_offset = offsetof(struct tipc_sock, portid),
2306         .key_len = sizeof(u32), /* portid */
2307         .max_size = 1048576,
2308         .min_size = 256,
2309         .automatic_shrinking = true,
2310 };
2311
2312 int tipc_sk_rht_init(struct net *net)
2313 {
2314         struct tipc_net *tn = net_generic(net, tipc_net_id);
2315
2316         return rhashtable_init(&tn->sk_rht, &tsk_rht_params);
2317 }
2318
2319 void tipc_sk_rht_destroy(struct net *net)
2320 {
2321         struct tipc_net *tn = net_generic(net, tipc_net_id);
2322
2323         /* Wait for socket readers to complete */
2324         synchronize_net();
2325
2326         rhashtable_destroy(&tn->sk_rht);
2327 }
2328
2329 /**
2330  * tipc_setsockopt - set socket option
2331  * @sock: socket structure
2332  * @lvl: option level
2333  * @opt: option identifier
2334  * @ov: pointer to new option value
2335  * @ol: length of option value
2336  *
2337  * For stream sockets only, accepts and ignores all IPPROTO_TCP options
2338  * (to ease compatibility).
2339  *
2340  * Returns 0 on success, errno otherwise
2341  */
2342 static int tipc_setsockopt(struct socket *sock, int lvl, int opt,
2343                            char __user *ov, unsigned int ol)
2344 {
2345         struct sock *sk = sock->sk;
2346         struct tipc_sock *tsk = tipc_sk(sk);
2347         u32 value;
2348         int res;
2349
2350         if ((lvl == IPPROTO_TCP) && (sock->type == SOCK_STREAM))
2351                 return 0;
2352         if (lvl != SOL_TIPC)
2353                 return -ENOPROTOOPT;
2354         if (ol < sizeof(value))
2355                 return -EINVAL;
2356         res = get_user(value, (u32 __user *)ov);
2357         if (res)
2358                 return res;
2359
2360         lock_sock(sk);
2361
2362         switch (opt) {
2363         case TIPC_IMPORTANCE:
2364                 res = tsk_set_importance(tsk, value);
2365                 break;
2366         case TIPC_SRC_DROPPABLE:
2367                 if (sock->type != SOCK_STREAM)
2368                         tsk_set_unreliable(tsk, value);
2369                 else
2370                         res = -ENOPROTOOPT;
2371                 break;
2372         case TIPC_DEST_DROPPABLE:
2373                 tsk_set_unreturnable(tsk, value);
2374                 break;
2375         case TIPC_CONN_TIMEOUT:
2376                 tipc_sk(sk)->conn_timeout = value;
2377                 /* no need to set "res", since already 0 at this point */
2378                 break;
2379         default:
2380                 res = -EINVAL;
2381         }
2382
2383         release_sock(sk);
2384
2385         return res;
2386 }
2387
2388 /**
2389  * tipc_getsockopt - get socket option
2390  * @sock: socket structure
2391  * @lvl: option level
2392  * @opt: option identifier
2393  * @ov: receptacle for option value
2394  * @ol: receptacle for length of option value
2395  *
2396  * For stream sockets only, returns 0 length result for all IPPROTO_TCP options
2397  * (to ease compatibility).
2398  *
2399  * Returns 0 on success, errno otherwise
2400  */
2401 static int tipc_getsockopt(struct socket *sock, int lvl, int opt,
2402                            char __user *ov, int __user *ol)
2403 {
2404         struct sock *sk = sock->sk;
2405         struct tipc_sock *tsk = tipc_sk(sk);
2406         int len;
2407         u32 value;
2408         int res;
2409
2410         if ((lvl == IPPROTO_TCP) && (sock->type == SOCK_STREAM))
2411                 return put_user(0, ol);
2412         if (lvl != SOL_TIPC)
2413                 return -ENOPROTOOPT;
2414         res = get_user(len, ol);
2415         if (res)
2416                 return res;
2417
2418         lock_sock(sk);
2419
2420         switch (opt) {
2421         case TIPC_IMPORTANCE:
2422                 value = tsk_importance(tsk);
2423                 break;
2424         case TIPC_SRC_DROPPABLE:
2425                 value = tsk_unreliable(tsk);
2426                 break;
2427         case TIPC_DEST_DROPPABLE:
2428                 value = tsk_unreturnable(tsk);
2429                 break;
2430         case TIPC_CONN_TIMEOUT:
2431                 value = tsk->conn_timeout;
2432                 /* no need to set "res", since already 0 at this point */
2433                 break;
2434         case TIPC_NODE_RECVQ_DEPTH:
2435                 value = 0; /* was tipc_queue_size, now obsolete */
2436                 break;
2437         case TIPC_SOCK_RECVQ_DEPTH:
2438                 value = skb_queue_len(&sk->sk_receive_queue);
2439                 break;
2440         default:
2441                 res = -EINVAL;
2442         }
2443
2444         release_sock(sk);
2445
2446         if (res)
2447                 return res;     /* "get" failed */
2448
2449         if (len < sizeof(value))
2450                 return -EINVAL;
2451
2452         if (copy_to_user(ov, &value, sizeof(value)))
2453                 return -EFAULT;
2454
2455         return put_user(sizeof(value), ol);
2456 }
2457
2458 static int tipc_ioctl(struct socket *sock, unsigned int cmd, unsigned long arg)
2459 {
2460         struct sock *sk = sock->sk;
2461         struct tipc_sioc_ln_req lnr;
2462         void __user *argp = (void __user *)arg;
2463
2464         switch (cmd) {
2465         case SIOCGETLINKNAME:
2466                 if (copy_from_user(&lnr, argp, sizeof(lnr)))
2467                         return -EFAULT;
2468                 if (!tipc_node_get_linkname(sock_net(sk),
2469                                             lnr.bearer_id & 0xffff, lnr.peer,
2470                                             lnr.linkname, TIPC_MAX_LINK_NAME)) {
2471                         if (copy_to_user(argp, &lnr, sizeof(lnr)))
2472                                 return -EFAULT;
2473                         return 0;
2474                 }
2475                 return -EADDRNOTAVAIL;
2476         default:
2477                 return -ENOIOCTLCMD;
2478         }
2479 }
2480
2481 /* Protocol switches for the various types of TIPC sockets */
2482
2483 static const struct proto_ops msg_ops = {
2484         .owner          = THIS_MODULE,
2485         .family         = AF_TIPC,
2486         .release        = tipc_release,
2487         .bind           = tipc_bind,
2488         .connect        = tipc_connect,
2489         .socketpair     = sock_no_socketpair,
2490         .accept         = sock_no_accept,
2491         .getname        = tipc_getname,
2492         .poll           = tipc_poll,
2493         .ioctl          = tipc_ioctl,
2494         .listen         = sock_no_listen,
2495         .shutdown       = tipc_shutdown,
2496         .setsockopt     = tipc_setsockopt,
2497         .getsockopt     = tipc_getsockopt,
2498         .sendmsg        = tipc_sendmsg,
2499         .recvmsg        = tipc_recvmsg,
2500         .mmap           = sock_no_mmap,
2501         .sendpage       = sock_no_sendpage
2502 };
2503
2504 static const struct proto_ops packet_ops = {
2505         .owner          = THIS_MODULE,
2506         .family         = AF_TIPC,
2507         .release        = tipc_release,
2508         .bind           = tipc_bind,
2509         .connect        = tipc_connect,
2510         .socketpair     = sock_no_socketpair,
2511         .accept         = tipc_accept,
2512         .getname        = tipc_getname,
2513         .poll           = tipc_poll,
2514         .ioctl          = tipc_ioctl,
2515         .listen         = tipc_listen,
2516         .shutdown       = tipc_shutdown,
2517         .setsockopt     = tipc_setsockopt,
2518         .getsockopt     = tipc_getsockopt,
2519         .sendmsg        = tipc_send_packet,
2520         .recvmsg        = tipc_recvmsg,
2521         .mmap           = sock_no_mmap,
2522         .sendpage       = sock_no_sendpage
2523 };
2524
2525 static const struct proto_ops stream_ops = {
2526         .owner          = THIS_MODULE,
2527         .family         = AF_TIPC,
2528         .release        = tipc_release,
2529         .bind           = tipc_bind,
2530         .connect        = tipc_connect,
2531         .socketpair     = sock_no_socketpair,
2532         .accept         = tipc_accept,
2533         .getname        = tipc_getname,
2534         .poll           = tipc_poll,
2535         .ioctl          = tipc_ioctl,
2536         .listen         = tipc_listen,
2537         .shutdown       = tipc_shutdown,
2538         .setsockopt     = tipc_setsockopt,
2539         .getsockopt     = tipc_getsockopt,
2540         .sendmsg        = tipc_send_stream,
2541         .recvmsg        = tipc_recv_stream,
2542         .mmap           = sock_no_mmap,
2543         .sendpage       = sock_no_sendpage
2544 };
2545
2546 static const struct net_proto_family tipc_family_ops = {
2547         .owner          = THIS_MODULE,
2548         .family         = AF_TIPC,
2549         .create         = tipc_sk_create
2550 };
2551
2552 static struct proto tipc_proto = {
2553         .name           = "TIPC",
2554         .owner          = THIS_MODULE,
2555         .obj_size       = sizeof(struct tipc_sock),
2556         .sysctl_rmem    = sysctl_tipc_rmem
2557 };
2558
2559 /**
2560  * tipc_socket_init - initialize TIPC socket interface
2561  *
2562  * Returns 0 on success, errno otherwise
2563  */
2564 int tipc_socket_init(void)
2565 {
2566         int res;
2567
2568         res = proto_register(&tipc_proto, 1);
2569         if (res) {
2570                 pr_err("Failed to register TIPC protocol type\n");
2571                 goto out;
2572         }
2573
2574         res = sock_register(&tipc_family_ops);
2575         if (res) {
2576                 pr_err("Failed to register TIPC socket type\n");
2577                 proto_unregister(&tipc_proto);
2578                 goto out;
2579         }
2580  out:
2581         return res;
2582 }
2583
2584 /**
2585  * tipc_socket_stop - stop TIPC socket interface
2586  */
2587 void tipc_socket_stop(void)
2588 {
2589         sock_unregister(tipc_family_ops.family);
2590         proto_unregister(&tipc_proto);
2591 }
2592
2593 /* Caller should hold socket lock for the passed tipc socket. */
2594 static int __tipc_nl_add_sk_con(struct sk_buff *skb, struct tipc_sock *tsk)
2595 {
2596         u32 peer_node;
2597         u32 peer_port;
2598         struct nlattr *nest;
2599
2600         peer_node = tsk_peer_node(tsk);
2601         peer_port = tsk_peer_port(tsk);
2602
2603         nest = nla_nest_start(skb, TIPC_NLA_SOCK_CON);
2604
2605         if (nla_put_u32(skb, TIPC_NLA_CON_NODE, peer_node))
2606                 goto msg_full;
2607         if (nla_put_u32(skb, TIPC_NLA_CON_SOCK, peer_port))
2608                 goto msg_full;
2609
2610         if (tsk->conn_type != 0) {
2611                 if (nla_put_flag(skb, TIPC_NLA_CON_FLAG))
2612                         goto msg_full;
2613                 if (nla_put_u32(skb, TIPC_NLA_CON_TYPE, tsk->conn_type))
2614                         goto msg_full;
2615                 if (nla_put_u32(skb, TIPC_NLA_CON_INST, tsk->conn_instance))
2616                         goto msg_full;
2617         }
2618         nla_nest_end(skb, nest);
2619
2620         return 0;
2621
2622 msg_full:
2623         nla_nest_cancel(skb, nest);
2624
2625         return -EMSGSIZE;
2626 }
2627
2628 /* Caller should hold socket lock for the passed tipc socket. */
2629 static int __tipc_nl_add_sk(struct sk_buff *skb, struct netlink_callback *cb,
2630                             struct tipc_sock *tsk)
2631 {
2632         int err;
2633         void *hdr;
2634         struct nlattr *attrs;
2635         struct net *net = sock_net(skb->sk);
2636         struct tipc_net *tn = net_generic(net, tipc_net_id);
2637
2638         hdr = genlmsg_put(skb, NETLINK_CB(cb->skb).portid, cb->nlh->nlmsg_seq,
2639                           &tipc_genl_family, NLM_F_MULTI, TIPC_NL_SOCK_GET);
2640         if (!hdr)
2641                 goto msg_cancel;
2642
2643         attrs = nla_nest_start(skb, TIPC_NLA_SOCK);
2644         if (!attrs)
2645                 goto genlmsg_cancel;
2646         if (nla_put_u32(skb, TIPC_NLA_SOCK_REF, tsk->portid))
2647                 goto attr_msg_cancel;
2648         if (nla_put_u32(skb, TIPC_NLA_SOCK_ADDR, tn->own_addr))
2649                 goto attr_msg_cancel;
2650
2651         if (tsk->connected) {
2652                 err = __tipc_nl_add_sk_con(skb, tsk);
2653                 if (err)
2654                         goto attr_msg_cancel;
2655         } else if (!list_empty(&tsk->publications)) {
2656                 if (nla_put_flag(skb, TIPC_NLA_SOCK_HAS_PUBL))
2657                         goto attr_msg_cancel;
2658         }
2659         nla_nest_end(skb, attrs);
2660         genlmsg_end(skb, hdr);
2661
2662         return 0;
2663
2664 attr_msg_cancel:
2665         nla_nest_cancel(skb, attrs);
2666 genlmsg_cancel:
2667         genlmsg_cancel(skb, hdr);
2668 msg_cancel:
2669         return -EMSGSIZE;
2670 }
2671
2672 int tipc_nl_sk_dump(struct sk_buff *skb, struct netlink_callback *cb)
2673 {
2674         int err;
2675         struct tipc_sock *tsk;
2676         const struct bucket_table *tbl;
2677         struct rhash_head *pos;
2678         struct net *net = sock_net(skb->sk);
2679         struct tipc_net *tn = net_generic(net, tipc_net_id);
2680         u32 tbl_id = cb->args[0];
2681         u32 prev_portid = cb->args[1];
2682
2683         rcu_read_lock();
2684         tbl = rht_dereference_rcu((&tn->sk_rht)->tbl, &tn->sk_rht);
2685         for (; tbl_id < tbl->size; tbl_id++) {
2686                 rht_for_each_entry_rcu(tsk, pos, tbl, tbl_id, node) {
2687                         spin_lock_bh(&tsk->sk.sk_lock.slock);
2688                         if (prev_portid && prev_portid != tsk->portid) {
2689                                 spin_unlock_bh(&tsk->sk.sk_lock.slock);
2690                                 continue;
2691                         }
2692
2693                         err = __tipc_nl_add_sk(skb, cb, tsk);
2694                         if (err) {
2695                                 prev_portid = tsk->portid;
2696                                 spin_unlock_bh(&tsk->sk.sk_lock.slock);
2697                                 goto out;
2698                         }
2699                         prev_portid = 0;
2700                         spin_unlock_bh(&tsk->sk.sk_lock.slock);
2701                 }
2702         }
2703 out:
2704         rcu_read_unlock();
2705         cb->args[0] = tbl_id;
2706         cb->args[1] = prev_portid;
2707
2708         return skb->len;
2709 }
2710
2711 /* Caller should hold socket lock for the passed tipc socket. */
2712 static int __tipc_nl_add_sk_publ(struct sk_buff *skb,
2713                                  struct netlink_callback *cb,
2714                                  struct publication *publ)
2715 {
2716         void *hdr;
2717         struct nlattr *attrs;
2718
2719         hdr = genlmsg_put(skb, NETLINK_CB(cb->skb).portid, cb->nlh->nlmsg_seq,
2720                           &tipc_genl_family, NLM_F_MULTI, TIPC_NL_PUBL_GET);
2721         if (!hdr)
2722                 goto msg_cancel;
2723
2724         attrs = nla_nest_start(skb, TIPC_NLA_PUBL);
2725         if (!attrs)
2726                 goto genlmsg_cancel;
2727
2728         if (nla_put_u32(skb, TIPC_NLA_PUBL_KEY, publ->key))
2729                 goto attr_msg_cancel;
2730         if (nla_put_u32(skb, TIPC_NLA_PUBL_TYPE, publ->type))
2731                 goto attr_msg_cancel;
2732         if (nla_put_u32(skb, TIPC_NLA_PUBL_LOWER, publ->lower))
2733                 goto attr_msg_cancel;
2734         if (nla_put_u32(skb, TIPC_NLA_PUBL_UPPER, publ->upper))
2735                 goto attr_msg_cancel;
2736
2737         nla_nest_end(skb, attrs);
2738         genlmsg_end(skb, hdr);
2739
2740         return 0;
2741
2742 attr_msg_cancel:
2743         nla_nest_cancel(skb, attrs);
2744 genlmsg_cancel:
2745         genlmsg_cancel(skb, hdr);
2746 msg_cancel:
2747         return -EMSGSIZE;
2748 }
2749
2750 /* Caller should hold socket lock for the passed tipc socket. */
2751 static int __tipc_nl_list_sk_publ(struct sk_buff *skb,
2752                                   struct netlink_callback *cb,
2753                                   struct tipc_sock *tsk, u32 *last_publ)
2754 {
2755         int err;
2756         struct publication *p;
2757
2758         if (*last_publ) {
2759                 list_for_each_entry(p, &tsk->publications, pport_list) {
2760                         if (p->key == *last_publ)
2761                                 break;
2762                 }
2763                 if (p->key != *last_publ) {
2764                         /* We never set seq or call nl_dump_check_consistent()
2765                          * this means that setting prev_seq here will cause the
2766                          * consistence check to fail in the netlink callback
2767                          * handler. Resulting in the last NLMSG_DONE message
2768                          * having the NLM_F_DUMP_INTR flag set.
2769                          */
2770                         cb->prev_seq = 1;
2771                         *last_publ = 0;
2772                         return -EPIPE;
2773                 }
2774         } else {
2775                 p = list_first_entry(&tsk->publications, struct publication,
2776                                      pport_list);
2777         }
2778
2779         list_for_each_entry_from(p, &tsk->publications, pport_list) {
2780                 err = __tipc_nl_add_sk_publ(skb, cb, p);
2781                 if (err) {
2782                         *last_publ = p->key;
2783                         return err;
2784                 }
2785         }
2786         *last_publ = 0;
2787
2788         return 0;
2789 }
2790
2791 int tipc_nl_publ_dump(struct sk_buff *skb, struct netlink_callback *cb)
2792 {
2793         int err;
2794         u32 tsk_portid = cb->args[0];
2795         u32 last_publ = cb->args[1];
2796         u32 done = cb->args[2];
2797         struct net *net = sock_net(skb->sk);
2798         struct tipc_sock *tsk;
2799
2800         if (!tsk_portid) {
2801                 struct nlattr **attrs;
2802                 struct nlattr *sock[TIPC_NLA_SOCK_MAX + 1];
2803
2804                 err = tipc_nlmsg_parse(cb->nlh, &attrs);
2805                 if (err)
2806                         return err;
2807
2808                 err = nla_parse_nested(sock, TIPC_NLA_SOCK_MAX,
2809                                        attrs[TIPC_NLA_SOCK],
2810                                        tipc_nl_sock_policy);
2811                 if (err)
2812                         return err;
2813
2814                 if (!sock[TIPC_NLA_SOCK_REF])
2815                         return -EINVAL;
2816
2817                 tsk_portid = nla_get_u32(sock[TIPC_NLA_SOCK_REF]);
2818         }
2819
2820         if (done)
2821                 return 0;
2822
2823         tsk = tipc_sk_lookup(net, tsk_portid);
2824         if (!tsk)
2825                 return -EINVAL;
2826
2827         lock_sock(&tsk->sk);
2828         err = __tipc_nl_list_sk_publ(skb, cb, tsk, &last_publ);
2829         if (!err)
2830                 done = 1;
2831         release_sock(&tsk->sk);
2832         sock_put(&tsk->sk);
2833
2834         cb->args[0] = tsk_portid;
2835         cb->args[1] = last_publ;
2836         cb->args[2] = done;
2837
2838         return skb->len;
2839 }