2495006145685729bf7876b6346eac37aba3dd26
[cascardo/linux.git] / net / tipc / socket.c
1 /*
2 * net/tipc/socket.c: TIPC socket API
3  *
4  * Copyright (c) 2001-2007, 2012-2014, Ericsson AB
5  * Copyright (c) 2004-2008, 2010-2013, Wind River Systems
6  * All rights reserved.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in the
15  *    documentation and/or other materials provided with the distribution.
16  * 3. Neither the names of the copyright holders nor the names of its
17  *    contributors may be used to endorse or promote products derived from
18  *    this software without specific prior written permission.
19  *
20  * Alternatively, this software may be distributed under the terms of the
21  * GNU General Public License ("GPL") version 2 as published by the Free
22  * Software Foundation.
23  *
24  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
25  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
26  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
27  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
28  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
29  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
30  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
31  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
32  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
33  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
34  * POSSIBILITY OF SUCH DAMAGE.
35  */
36
37 #include "core.h"
38 #include "port.h"
39 #include "node.h"
40
41 #include <linux/export.h>
42
43 #define SS_LISTENING    -1      /* socket is listening */
44 #define SS_READY        -2      /* socket is connectionless */
45
46 #define CONN_TIMEOUT_DEFAULT    8000    /* default connect timeout = 8s */
47
48 static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb);
49 static void tipc_data_ready(struct sock *sk);
50 static void tipc_write_space(struct sock *sk);
51 static int tipc_release(struct socket *sock);
52 static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags);
53
54 static const struct proto_ops packet_ops;
55 static const struct proto_ops stream_ops;
56 static const struct proto_ops msg_ops;
57
58 static struct proto tipc_proto;
59 static struct proto tipc_proto_kern;
60
61 /*
62  * Revised TIPC socket locking policy:
63  *
64  * Most socket operations take the standard socket lock when they start
65  * and hold it until they finish (or until they need to sleep).  Acquiring
66  * this lock grants the owner exclusive access to the fields of the socket
67  * data structures, with the exception of the backlog queue.  A few socket
68  * operations can be done without taking the socket lock because they only
69  * read socket information that never changes during the life of the socket.
70  *
71  * Socket operations may acquire the lock for the associated TIPC port if they
72  * need to perform an operation on the port.  If any routine needs to acquire
73  * both the socket lock and the port lock it must take the socket lock first
74  * to avoid the risk of deadlock.
75  *
76  * The dispatcher handling incoming messages cannot grab the socket lock in
77  * the standard fashion, since invoked it runs at the BH level and cannot block.
78  * Instead, it checks to see if the socket lock is currently owned by someone,
79  * and either handles the message itself or adds it to the socket's backlog
80  * queue; in the latter case the queued message is processed once the process
81  * owning the socket lock releases it.
82  *
83  * NOTE: Releasing the socket lock while an operation is sleeping overcomes
84  * the problem of a blocked socket operation preventing any other operations
85  * from occurring.  However, applications must be careful if they have
86  * multiple threads trying to send (or receive) on the same socket, as these
87  * operations might interfere with each other.  For example, doing a connect
88  * and a receive at the same time might allow the receive to consume the
89  * ACK message meant for the connect.  While additional work could be done
90  * to try and overcome this, it doesn't seem to be worthwhile at the present.
91  *
92  * NOTE: Releasing the socket lock while an operation is sleeping also ensures
93  * that another operation that must be performed in a non-blocking manner is
94  * not delayed for very long because the lock has already been taken.
95  *
96  * NOTE: This code assumes that certain fields of a port/socket pair are
97  * constant over its lifetime; such fields can be examined without taking
98  * the socket lock and/or port lock, and do not need to be re-read even
99  * after resuming processing after waiting.  These fields include:
100  *   - socket type
101  *   - pointer to socket sk structure (aka tipc_sock structure)
102  *   - pointer to port structure
103  *   - port reference
104  */
105
106 #include "socket.h"
107
108 /**
109  * advance_rx_queue - discard first buffer in socket receive queue
110  *
111  * Caller must hold socket lock
112  */
113 static void advance_rx_queue(struct sock *sk)
114 {
115         kfree_skb(__skb_dequeue(&sk->sk_receive_queue));
116 }
117
118 /**
119  * reject_rx_queue - reject all buffers in socket receive queue
120  *
121  * Caller must hold socket lock
122  */
123 static void reject_rx_queue(struct sock *sk)
124 {
125         struct sk_buff *buf;
126
127         while ((buf = __skb_dequeue(&sk->sk_receive_queue)))
128                 tipc_reject_msg(buf, TIPC_ERR_NO_PORT);
129 }
130
131 /**
132  * tipc_sk_create - create a TIPC socket
133  * @net: network namespace (must be default network)
134  * @sock: pre-allocated socket structure
135  * @protocol: protocol indicator (must be 0)
136  * @kern: caused by kernel or by userspace?
137  *
138  * This routine creates additional data structures used by the TIPC socket,
139  * initializes them, and links them together.
140  *
141  * Returns 0 on success, errno otherwise
142  */
143 static int tipc_sk_create(struct net *net, struct socket *sock,
144                           int protocol, int kern)
145 {
146         const struct proto_ops *ops;
147         socket_state state;
148         struct sock *sk;
149         struct tipc_sock *tsk;
150         struct tipc_port *port;
151         u32 ref;
152
153         /* Validate arguments */
154         if (unlikely(protocol != 0))
155                 return -EPROTONOSUPPORT;
156
157         switch (sock->type) {
158         case SOCK_STREAM:
159                 ops = &stream_ops;
160                 state = SS_UNCONNECTED;
161                 break;
162         case SOCK_SEQPACKET:
163                 ops = &packet_ops;
164                 state = SS_UNCONNECTED;
165                 break;
166         case SOCK_DGRAM:
167         case SOCK_RDM:
168                 ops = &msg_ops;
169                 state = SS_READY;
170                 break;
171         default:
172                 return -EPROTOTYPE;
173         }
174
175         /* Allocate socket's protocol area */
176         if (!kern)
177                 sk = sk_alloc(net, AF_TIPC, GFP_KERNEL, &tipc_proto);
178         else
179                 sk = sk_alloc(net, AF_TIPC, GFP_KERNEL, &tipc_proto_kern);
180
181         if (sk == NULL)
182                 return -ENOMEM;
183
184         tsk = tipc_sk(sk);
185         port = &tsk->port;
186
187         ref = tipc_port_init(port, TIPC_LOW_IMPORTANCE);
188         if (!ref) {
189                 pr_warn("Socket registration failed, ref. table exhausted\n");
190                 sk_free(sk);
191                 return -ENOMEM;
192         }
193
194         /* Finish initializing socket data structures */
195         sock->ops = ops;
196         sock->state = state;
197
198         sock_init_data(sock, sk);
199         sk->sk_backlog_rcv = tipc_backlog_rcv;
200         sk->sk_rcvbuf = sysctl_tipc_rmem[1];
201         sk->sk_data_ready = tipc_data_ready;
202         sk->sk_write_space = tipc_write_space;
203         tsk->conn_timeout = CONN_TIMEOUT_DEFAULT;
204         atomic_set(&tsk->dupl_rcvcnt, 0);
205         tipc_port_unlock(port);
206
207         if (sock->state == SS_READY) {
208                 tipc_port_set_unreturnable(port, true);
209                 if (sock->type == SOCK_DGRAM)
210                         tipc_port_set_unreliable(port, true);
211         }
212         return 0;
213 }
214
215 /**
216  * tipc_sock_create_local - create TIPC socket from inside TIPC module
217  * @type: socket type - SOCK_RDM or SOCK_SEQPACKET
218  *
219  * We cannot use sock_creat_kern here because it bumps module user count.
220  * Since socket owner and creator is the same module we must make sure
221  * that module count remains zero for module local sockets, otherwise
222  * we cannot do rmmod.
223  *
224  * Returns 0 on success, errno otherwise
225  */
226 int tipc_sock_create_local(int type, struct socket **res)
227 {
228         int rc;
229
230         rc = sock_create_lite(AF_TIPC, type, 0, res);
231         if (rc < 0) {
232                 pr_err("Failed to create kernel socket\n");
233                 return rc;
234         }
235         tipc_sk_create(&init_net, *res, 0, 1);
236
237         return 0;
238 }
239
240 /**
241  * tipc_sock_release_local - release socket created by tipc_sock_create_local
242  * @sock: the socket to be released.
243  *
244  * Module reference count is not incremented when such sockets are created,
245  * so we must keep it from being decremented when they are released.
246  */
247 void tipc_sock_release_local(struct socket *sock)
248 {
249         tipc_release(sock);
250         sock->ops = NULL;
251         sock_release(sock);
252 }
253
254 /**
255  * tipc_sock_accept_local - accept a connection on a socket created
256  * with tipc_sock_create_local. Use this function to avoid that
257  * module reference count is inadvertently incremented.
258  *
259  * @sock:    the accepting socket
260  * @newsock: reference to the new socket to be created
261  * @flags:   socket flags
262  */
263
264 int tipc_sock_accept_local(struct socket *sock, struct socket **newsock,
265                            int flags)
266 {
267         struct sock *sk = sock->sk;
268         int ret;
269
270         ret = sock_create_lite(sk->sk_family, sk->sk_type,
271                                sk->sk_protocol, newsock);
272         if (ret < 0)
273                 return ret;
274
275         ret = tipc_accept(sock, *newsock, flags);
276         if (ret < 0) {
277                 sock_release(*newsock);
278                 return ret;
279         }
280         (*newsock)->ops = sock->ops;
281         return ret;
282 }
283
284 /**
285  * tipc_release - destroy a TIPC socket
286  * @sock: socket to destroy
287  *
288  * This routine cleans up any messages that are still queued on the socket.
289  * For DGRAM and RDM socket types, all queued messages are rejected.
290  * For SEQPACKET and STREAM socket types, the first message is rejected
291  * and any others are discarded.  (If the first message on a STREAM socket
292  * is partially-read, it is discarded and the next one is rejected instead.)
293  *
294  * NOTE: Rejected messages are not necessarily returned to the sender!  They
295  * are returned or discarded according to the "destination droppable" setting
296  * specified for the message by the sender.
297  *
298  * Returns 0 on success, errno otherwise
299  */
300 static int tipc_release(struct socket *sock)
301 {
302         struct sock *sk = sock->sk;
303         struct tipc_sock *tsk;
304         struct tipc_port *port;
305         struct sk_buff *buf;
306
307         /*
308          * Exit if socket isn't fully initialized (occurs when a failed accept()
309          * releases a pre-allocated child socket that was never used)
310          */
311         if (sk == NULL)
312                 return 0;
313
314         tsk = tipc_sk(sk);
315         port = &tsk->port;
316         lock_sock(sk);
317
318         /*
319          * Reject all unreceived messages, except on an active connection
320          * (which disconnects locally & sends a 'FIN+' to peer)
321          */
322         while (sock->state != SS_DISCONNECTING) {
323                 buf = __skb_dequeue(&sk->sk_receive_queue);
324                 if (buf == NULL)
325                         break;
326                 if (TIPC_SKB_CB(buf)->handle != NULL)
327                         kfree_skb(buf);
328                 else {
329                         if ((sock->state == SS_CONNECTING) ||
330                             (sock->state == SS_CONNECTED)) {
331                                 sock->state = SS_DISCONNECTING;
332                                 tipc_port_disconnect(port->ref);
333                         }
334                         tipc_reject_msg(buf, TIPC_ERR_NO_PORT);
335                 }
336         }
337
338         /* Destroy TIPC port; also disconnects an active connection and
339          * sends a 'FIN-' to peer.
340          */
341         tipc_port_destroy(port);
342
343         /* Discard any remaining (connection-based) messages in receive queue */
344         __skb_queue_purge(&sk->sk_receive_queue);
345
346         /* Reject any messages that accumulated in backlog queue */
347         sock->state = SS_DISCONNECTING;
348         release_sock(sk);
349
350         sock_put(sk);
351         sock->sk = NULL;
352
353         return 0;
354 }
355
356 /**
357  * tipc_bind - associate or disassocate TIPC name(s) with a socket
358  * @sock: socket structure
359  * @uaddr: socket address describing name(s) and desired operation
360  * @uaddr_len: size of socket address data structure
361  *
362  * Name and name sequence binding is indicated using a positive scope value;
363  * a negative scope value unbinds the specified name.  Specifying no name
364  * (i.e. a socket address length of 0) unbinds all names from the socket.
365  *
366  * Returns 0 on success, errno otherwise
367  *
368  * NOTE: This routine doesn't need to take the socket lock since it doesn't
369  *       access any non-constant socket information.
370  */
371 static int tipc_bind(struct socket *sock, struct sockaddr *uaddr,
372                      int uaddr_len)
373 {
374         struct sock *sk = sock->sk;
375         struct sockaddr_tipc *addr = (struct sockaddr_tipc *)uaddr;
376         struct tipc_sock *tsk = tipc_sk(sk);
377         int res = -EINVAL;
378
379         lock_sock(sk);
380         if (unlikely(!uaddr_len)) {
381                 res = tipc_withdraw(&tsk->port, 0, NULL);
382                 goto exit;
383         }
384
385         if (uaddr_len < sizeof(struct sockaddr_tipc)) {
386                 res = -EINVAL;
387                 goto exit;
388         }
389         if (addr->family != AF_TIPC) {
390                 res = -EAFNOSUPPORT;
391                 goto exit;
392         }
393
394         if (addr->addrtype == TIPC_ADDR_NAME)
395                 addr->addr.nameseq.upper = addr->addr.nameseq.lower;
396         else if (addr->addrtype != TIPC_ADDR_NAMESEQ) {
397                 res = -EAFNOSUPPORT;
398                 goto exit;
399         }
400
401         if ((addr->addr.nameseq.type < TIPC_RESERVED_TYPES) &&
402             (addr->addr.nameseq.type != TIPC_TOP_SRV) &&
403             (addr->addr.nameseq.type != TIPC_CFG_SRV)) {
404                 res = -EACCES;
405                 goto exit;
406         }
407
408         res = (addr->scope > 0) ?
409                 tipc_publish(&tsk->port, addr->scope, &addr->addr.nameseq) :
410                 tipc_withdraw(&tsk->port, -addr->scope, &addr->addr.nameseq);
411 exit:
412         release_sock(sk);
413         return res;
414 }
415
416 /**
417  * tipc_getname - get port ID of socket or peer socket
418  * @sock: socket structure
419  * @uaddr: area for returned socket address
420  * @uaddr_len: area for returned length of socket address
421  * @peer: 0 = own ID, 1 = current peer ID, 2 = current/former peer ID
422  *
423  * Returns 0 on success, errno otherwise
424  *
425  * NOTE: This routine doesn't need to take the socket lock since it only
426  *       accesses socket information that is unchanging (or which changes in
427  *       a completely predictable manner).
428  */
429 static int tipc_getname(struct socket *sock, struct sockaddr *uaddr,
430                         int *uaddr_len, int peer)
431 {
432         struct sockaddr_tipc *addr = (struct sockaddr_tipc *)uaddr;
433         struct tipc_sock *tsk = tipc_sk(sock->sk);
434
435         memset(addr, 0, sizeof(*addr));
436         if (peer) {
437                 if ((sock->state != SS_CONNECTED) &&
438                         ((peer != 2) || (sock->state != SS_DISCONNECTING)))
439                         return -ENOTCONN;
440                 addr->addr.id.ref = tipc_port_peerport(&tsk->port);
441                 addr->addr.id.node = tipc_port_peernode(&tsk->port);
442         } else {
443                 addr->addr.id.ref = tsk->port.ref;
444                 addr->addr.id.node = tipc_own_addr;
445         }
446
447         *uaddr_len = sizeof(*addr);
448         addr->addrtype = TIPC_ADDR_ID;
449         addr->family = AF_TIPC;
450         addr->scope = 0;
451         addr->addr.name.domain = 0;
452
453         return 0;
454 }
455
456 /**
457  * tipc_poll - read and possibly block on pollmask
458  * @file: file structure associated with the socket
459  * @sock: socket for which to calculate the poll bits
460  * @wait: ???
461  *
462  * Returns pollmask value
463  *
464  * COMMENTARY:
465  * It appears that the usual socket locking mechanisms are not useful here
466  * since the pollmask info is potentially out-of-date the moment this routine
467  * exits.  TCP and other protocols seem to rely on higher level poll routines
468  * to handle any preventable race conditions, so TIPC will do the same ...
469  *
470  * TIPC sets the returned events as follows:
471  *
472  * socket state         flags set
473  * ------------         ---------
474  * unconnected          no read flags
475  *                      POLLOUT if port is not congested
476  *
477  * connecting           POLLIN/POLLRDNORM if ACK/NACK in rx queue
478  *                      no write flags
479  *
480  * connected            POLLIN/POLLRDNORM if data in rx queue
481  *                      POLLOUT if port is not congested
482  *
483  * disconnecting        POLLIN/POLLRDNORM/POLLHUP
484  *                      no write flags
485  *
486  * listening            POLLIN if SYN in rx queue
487  *                      no write flags
488  *
489  * ready                POLLIN/POLLRDNORM if data in rx queue
490  * [connectionless]     POLLOUT (since port cannot be congested)
491  *
492  * IMPORTANT: The fact that a read or write operation is indicated does NOT
493  * imply that the operation will succeed, merely that it should be performed
494  * and will not block.
495  */
496 static unsigned int tipc_poll(struct file *file, struct socket *sock,
497                               poll_table *wait)
498 {
499         struct sock *sk = sock->sk;
500         struct tipc_sock *tsk = tipc_sk(sk);
501         u32 mask = 0;
502
503         sock_poll_wait(file, sk_sleep(sk), wait);
504
505         switch ((int)sock->state) {
506         case SS_UNCONNECTED:
507                 if (!tsk->port.congested)
508                         mask |= POLLOUT;
509                 break;
510         case SS_READY:
511         case SS_CONNECTED:
512                 if (!tsk->port.congested)
513                         mask |= POLLOUT;
514                 /* fall thru' */
515         case SS_CONNECTING:
516         case SS_LISTENING:
517                 if (!skb_queue_empty(&sk->sk_receive_queue))
518                         mask |= (POLLIN | POLLRDNORM);
519                 break;
520         case SS_DISCONNECTING:
521                 mask = (POLLIN | POLLRDNORM | POLLHUP);
522                 break;
523         }
524
525         return mask;
526 }
527
528 /**
529  * dest_name_check - verify user is permitted to send to specified port name
530  * @dest: destination address
531  * @m: descriptor for message to be sent
532  *
533  * Prevents restricted configuration commands from being issued by
534  * unauthorized users.
535  *
536  * Returns 0 if permission is granted, otherwise errno
537  */
538 static int dest_name_check(struct sockaddr_tipc *dest, struct msghdr *m)
539 {
540         struct tipc_cfg_msg_hdr hdr;
541
542         if (likely(dest->addr.name.name.type >= TIPC_RESERVED_TYPES))
543                 return 0;
544         if (likely(dest->addr.name.name.type == TIPC_TOP_SRV))
545                 return 0;
546         if (likely(dest->addr.name.name.type != TIPC_CFG_SRV))
547                 return -EACCES;
548
549         if (!m->msg_iovlen || (m->msg_iov[0].iov_len < sizeof(hdr)))
550                 return -EMSGSIZE;
551         if (copy_from_user(&hdr, m->msg_iov[0].iov_base, sizeof(hdr)))
552                 return -EFAULT;
553         if ((ntohs(hdr.tcm_type) & 0xC000) && (!capable(CAP_NET_ADMIN)))
554                 return -EACCES;
555
556         return 0;
557 }
558
559 static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p)
560 {
561         struct sock *sk = sock->sk;
562         struct tipc_sock *tsk = tipc_sk(sk);
563         DEFINE_WAIT(wait);
564         int done;
565
566         do {
567                 int err = sock_error(sk);
568                 if (err)
569                         return err;
570                 if (sock->state == SS_DISCONNECTING)
571                         return -EPIPE;
572                 if (!*timeo_p)
573                         return -EAGAIN;
574                 if (signal_pending(current))
575                         return sock_intr_errno(*timeo_p);
576
577                 prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
578                 done = sk_wait_event(sk, timeo_p, !tsk->port.congested);
579                 finish_wait(sk_sleep(sk), &wait);
580         } while (!done);
581         return 0;
582 }
583
584
585 /**
586  * tipc_sendmsg - send message in connectionless manner
587  * @iocb: if NULL, indicates that socket lock is already held
588  * @sock: socket structure
589  * @m: message to send
590  * @total_len: length of message
591  *
592  * Message must have an destination specified explicitly.
593  * Used for SOCK_RDM and SOCK_DGRAM messages,
594  * and for 'SYN' messages on SOCK_SEQPACKET and SOCK_STREAM connections.
595  * (Note: 'SYN+' is prohibited on SOCK_STREAM.)
596  *
597  * Returns the number of bytes sent on success, or errno otherwise
598  */
599 static int tipc_sendmsg(struct kiocb *iocb, struct socket *sock,
600                         struct msghdr *m, size_t total_len)
601 {
602         struct sock *sk = sock->sk;
603         struct tipc_sock *tsk = tipc_sk(sk);
604         struct tipc_port *port = &tsk->port;
605         DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
606         int needs_conn;
607         long timeo;
608         int res = -EINVAL;
609
610         if (unlikely(!dest))
611                 return -EDESTADDRREQ;
612         if (unlikely((m->msg_namelen < sizeof(*dest)) ||
613                      (dest->family != AF_TIPC)))
614                 return -EINVAL;
615         if (total_len > TIPC_MAX_USER_MSG_SIZE)
616                 return -EMSGSIZE;
617
618         if (iocb)
619                 lock_sock(sk);
620
621         needs_conn = (sock->state != SS_READY);
622         if (unlikely(needs_conn)) {
623                 if (sock->state == SS_LISTENING) {
624                         res = -EPIPE;
625                         goto exit;
626                 }
627                 if (sock->state != SS_UNCONNECTED) {
628                         res = -EISCONN;
629                         goto exit;
630                 }
631                 if (tsk->port.published) {
632                         res = -EOPNOTSUPP;
633                         goto exit;
634                 }
635                 if (dest->addrtype == TIPC_ADDR_NAME) {
636                         tsk->port.conn_type = dest->addr.name.name.type;
637                         tsk->port.conn_instance = dest->addr.name.name.instance;
638                 }
639
640                 /* Abort any pending connection attempts (very unlikely) */
641                 reject_rx_queue(sk);
642         }
643
644         timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
645         do {
646                 if (dest->addrtype == TIPC_ADDR_NAME) {
647                         res = dest_name_check(dest, m);
648                         if (res)
649                                 break;
650                         res = tipc_send2name(port,
651                                              &dest->addr.name.name,
652                                              dest->addr.name.domain,
653                                              m->msg_iov,
654                                              total_len);
655                 } else if (dest->addrtype == TIPC_ADDR_ID) {
656                         res = tipc_send2port(port,
657                                              &dest->addr.id,
658                                              m->msg_iov,
659                                              total_len);
660                 } else if (dest->addrtype == TIPC_ADDR_MCAST) {
661                         if (needs_conn) {
662                                 res = -EOPNOTSUPP;
663                                 break;
664                         }
665                         res = dest_name_check(dest, m);
666                         if (res)
667                                 break;
668                         res = tipc_port_mcast_xmit(port,
669                                                    &dest->addr.nameseq,
670                                                    m->msg_iov,
671                                                    total_len);
672                 }
673                 if (likely(res != -ELINKCONG)) {
674                         if (needs_conn && (res >= 0))
675                                 sock->state = SS_CONNECTING;
676                         break;
677                 }
678                 res = tipc_wait_for_sndmsg(sock, &timeo);
679                 if (res)
680                         break;
681         } while (1);
682
683 exit:
684         if (iocb)
685                 release_sock(sk);
686         return res;
687 }
688
689 static int tipc_wait_for_sndpkt(struct socket *sock, long *timeo_p)
690 {
691         struct sock *sk = sock->sk;
692         struct tipc_sock *tsk = tipc_sk(sk);
693         struct tipc_port *port = &tsk->port;
694         DEFINE_WAIT(wait);
695         int done;
696
697         do {
698                 int err = sock_error(sk);
699                 if (err)
700                         return err;
701                 if (sock->state == SS_DISCONNECTING)
702                         return -EPIPE;
703                 else if (sock->state != SS_CONNECTED)
704                         return -ENOTCONN;
705                 if (!*timeo_p)
706                         return -EAGAIN;
707                 if (signal_pending(current))
708                         return sock_intr_errno(*timeo_p);
709
710                 prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
711                 done = sk_wait_event(sk, timeo_p,
712                                      (!port->congested || !port->connected));
713                 finish_wait(sk_sleep(sk), &wait);
714         } while (!done);
715         return 0;
716 }
717
718 /**
719  * tipc_send_packet - send a connection-oriented message
720  * @iocb: if NULL, indicates that socket lock is already held
721  * @sock: socket structure
722  * @m: message to send
723  * @total_len: length of message
724  *
725  * Used for SOCK_SEQPACKET messages and SOCK_STREAM data.
726  *
727  * Returns the number of bytes sent on success, or errno otherwise
728  */
729 static int tipc_send_packet(struct kiocb *iocb, struct socket *sock,
730                             struct msghdr *m, size_t total_len)
731 {
732         struct sock *sk = sock->sk;
733         struct tipc_sock *tsk = tipc_sk(sk);
734         DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
735         int res = -EINVAL;
736         long timeo;
737
738         /* Handle implied connection establishment */
739         if (unlikely(dest))
740                 return tipc_sendmsg(iocb, sock, m, total_len);
741
742         if (total_len > TIPC_MAX_USER_MSG_SIZE)
743                 return -EMSGSIZE;
744
745         if (iocb)
746                 lock_sock(sk);
747
748         if (unlikely(sock->state != SS_CONNECTED)) {
749                 if (sock->state == SS_DISCONNECTING)
750                         res = -EPIPE;
751                 else
752                         res = -ENOTCONN;
753                 goto exit;
754         }
755
756         timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
757         do {
758                 res = tipc_send(&tsk->port, m->msg_iov, total_len);
759                 if (likely(res != -ELINKCONG))
760                         break;
761                 res = tipc_wait_for_sndpkt(sock, &timeo);
762                 if (res)
763                         break;
764         } while (1);
765 exit:
766         if (iocb)
767                 release_sock(sk);
768         return res;
769 }
770
771 /**
772  * tipc_send_stream - send stream-oriented data
773  * @iocb: (unused)
774  * @sock: socket structure
775  * @m: data to send
776  * @total_len: total length of data to be sent
777  *
778  * Used for SOCK_STREAM data.
779  *
780  * Returns the number of bytes sent on success (or partial success),
781  * or errno if no data sent
782  */
783 static int tipc_send_stream(struct kiocb *iocb, struct socket *sock,
784                             struct msghdr *m, size_t total_len)
785 {
786         struct sock *sk = sock->sk;
787         struct tipc_sock *tsk = tipc_sk(sk);
788         struct msghdr my_msg;
789         struct iovec my_iov;
790         struct iovec *curr_iov;
791         int curr_iovlen;
792         char __user *curr_start;
793         u32 hdr_size;
794         int curr_left;
795         int bytes_to_send;
796         int bytes_sent;
797         int res;
798
799         lock_sock(sk);
800
801         /* Handle special cases where there is no connection */
802         if (unlikely(sock->state != SS_CONNECTED)) {
803                 if (sock->state == SS_UNCONNECTED)
804                         res = tipc_send_packet(NULL, sock, m, total_len);
805                 else
806                         res = sock->state == SS_DISCONNECTING ? -EPIPE : -ENOTCONN;
807                 goto exit;
808         }
809
810         if (unlikely(m->msg_name)) {
811                 res = -EISCONN;
812                 goto exit;
813         }
814
815         if (total_len > (unsigned int)INT_MAX) {
816                 res = -EMSGSIZE;
817                 goto exit;
818         }
819
820         /*
821          * Send each iovec entry using one or more messages
822          *
823          * Note: This algorithm is good for the most likely case
824          * (i.e. one large iovec entry), but could be improved to pass sets
825          * of small iovec entries into send_packet().
826          */
827         curr_iov = m->msg_iov;
828         curr_iovlen = m->msg_iovlen;
829         my_msg.msg_iov = &my_iov;
830         my_msg.msg_iovlen = 1;
831         my_msg.msg_flags = m->msg_flags;
832         my_msg.msg_name = NULL;
833         bytes_sent = 0;
834
835         hdr_size = msg_hdr_sz(&tsk->port.phdr);
836
837         while (curr_iovlen--) {
838                 curr_start = curr_iov->iov_base;
839                 curr_left = curr_iov->iov_len;
840
841                 while (curr_left) {
842                         bytes_to_send = tsk->port.max_pkt - hdr_size;
843                         if (bytes_to_send > TIPC_MAX_USER_MSG_SIZE)
844                                 bytes_to_send = TIPC_MAX_USER_MSG_SIZE;
845                         if (curr_left < bytes_to_send)
846                                 bytes_to_send = curr_left;
847                         my_iov.iov_base = curr_start;
848                         my_iov.iov_len = bytes_to_send;
849                         res = tipc_send_packet(NULL, sock, &my_msg,
850                                                bytes_to_send);
851                         if (res < 0) {
852                                 if (bytes_sent)
853                                         res = bytes_sent;
854                                 goto exit;
855                         }
856                         curr_left -= bytes_to_send;
857                         curr_start += bytes_to_send;
858                         bytes_sent += bytes_to_send;
859                 }
860
861                 curr_iov++;
862         }
863         res = bytes_sent;
864 exit:
865         release_sock(sk);
866         return res;
867 }
868
869 /**
870  * auto_connect - complete connection setup to a remote port
871  * @tsk: tipc socket structure
872  * @msg: peer's response message
873  *
874  * Returns 0 on success, errno otherwise
875  */
876 static int auto_connect(struct tipc_sock *tsk, struct tipc_msg *msg)
877 {
878         struct tipc_port *port = &tsk->port;
879         struct socket *sock = tsk->sk.sk_socket;
880         struct tipc_portid peer;
881
882         peer.ref = msg_origport(msg);
883         peer.node = msg_orignode(msg);
884
885         __tipc_port_connect(port->ref, port, &peer);
886
887         if (msg_importance(msg) > TIPC_CRITICAL_IMPORTANCE)
888                 return -EINVAL;
889         msg_set_importance(&port->phdr, (u32)msg_importance(msg));
890         sock->state = SS_CONNECTED;
891         return 0;
892 }
893
894 /**
895  * set_orig_addr - capture sender's address for received message
896  * @m: descriptor for message info
897  * @msg: received message header
898  *
899  * Note: Address is not captured if not requested by receiver.
900  */
901 static void set_orig_addr(struct msghdr *m, struct tipc_msg *msg)
902 {
903         DECLARE_SOCKADDR(struct sockaddr_tipc *, addr, m->msg_name);
904
905         if (addr) {
906                 addr->family = AF_TIPC;
907                 addr->addrtype = TIPC_ADDR_ID;
908                 memset(&addr->addr, 0, sizeof(addr->addr));
909                 addr->addr.id.ref = msg_origport(msg);
910                 addr->addr.id.node = msg_orignode(msg);
911                 addr->addr.name.domain = 0;     /* could leave uninitialized */
912                 addr->scope = 0;                /* could leave uninitialized */
913                 m->msg_namelen = sizeof(struct sockaddr_tipc);
914         }
915 }
916
917 /**
918  * anc_data_recv - optionally capture ancillary data for received message
919  * @m: descriptor for message info
920  * @msg: received message header
921  * @tport: TIPC port associated with message
922  *
923  * Note: Ancillary data is not captured if not requested by receiver.
924  *
925  * Returns 0 if successful, otherwise errno
926  */
927 static int anc_data_recv(struct msghdr *m, struct tipc_msg *msg,
928                          struct tipc_port *tport)
929 {
930         u32 anc_data[3];
931         u32 err;
932         u32 dest_type;
933         int has_name;
934         int res;
935
936         if (likely(m->msg_controllen == 0))
937                 return 0;
938
939         /* Optionally capture errored message object(s) */
940         err = msg ? msg_errcode(msg) : 0;
941         if (unlikely(err)) {
942                 anc_data[0] = err;
943                 anc_data[1] = msg_data_sz(msg);
944                 res = put_cmsg(m, SOL_TIPC, TIPC_ERRINFO, 8, anc_data);
945                 if (res)
946                         return res;
947                 if (anc_data[1]) {
948                         res = put_cmsg(m, SOL_TIPC, TIPC_RETDATA, anc_data[1],
949                                        msg_data(msg));
950                         if (res)
951                                 return res;
952                 }
953         }
954
955         /* Optionally capture message destination object */
956         dest_type = msg ? msg_type(msg) : TIPC_DIRECT_MSG;
957         switch (dest_type) {
958         case TIPC_NAMED_MSG:
959                 has_name = 1;
960                 anc_data[0] = msg_nametype(msg);
961                 anc_data[1] = msg_namelower(msg);
962                 anc_data[2] = msg_namelower(msg);
963                 break;
964         case TIPC_MCAST_MSG:
965                 has_name = 1;
966                 anc_data[0] = msg_nametype(msg);
967                 anc_data[1] = msg_namelower(msg);
968                 anc_data[2] = msg_nameupper(msg);
969                 break;
970         case TIPC_CONN_MSG:
971                 has_name = (tport->conn_type != 0);
972                 anc_data[0] = tport->conn_type;
973                 anc_data[1] = tport->conn_instance;
974                 anc_data[2] = tport->conn_instance;
975                 break;
976         default:
977                 has_name = 0;
978         }
979         if (has_name) {
980                 res = put_cmsg(m, SOL_TIPC, TIPC_DESTNAME, 12, anc_data);
981                 if (res)
982                         return res;
983         }
984
985         return 0;
986 }
987
988 static int tipc_wait_for_rcvmsg(struct socket *sock, long timeo)
989 {
990         struct sock *sk = sock->sk;
991         DEFINE_WAIT(wait);
992         int err;
993
994         for (;;) {
995                 prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
996                 if (timeo && skb_queue_empty(&sk->sk_receive_queue)) {
997                         if (sock->state == SS_DISCONNECTING) {
998                                 err = -ENOTCONN;
999                                 break;
1000                         }
1001                         release_sock(sk);
1002                         timeo = schedule_timeout(timeo);
1003                         lock_sock(sk);
1004                 }
1005                 err = 0;
1006                 if (!skb_queue_empty(&sk->sk_receive_queue))
1007                         break;
1008                 err = sock_intr_errno(timeo);
1009                 if (signal_pending(current))
1010                         break;
1011                 err = -EAGAIN;
1012                 if (!timeo)
1013                         break;
1014         }
1015         finish_wait(sk_sleep(sk), &wait);
1016         return err;
1017 }
1018
1019 /**
1020  * tipc_recvmsg - receive packet-oriented message
1021  * @iocb: (unused)
1022  * @m: descriptor for message info
1023  * @buf_len: total size of user buffer area
1024  * @flags: receive flags
1025  *
1026  * Used for SOCK_DGRAM, SOCK_RDM, and SOCK_SEQPACKET messages.
1027  * If the complete message doesn't fit in user area, truncate it.
1028  *
1029  * Returns size of returned message data, errno otherwise
1030  */
1031 static int tipc_recvmsg(struct kiocb *iocb, struct socket *sock,
1032                         struct msghdr *m, size_t buf_len, int flags)
1033 {
1034         struct sock *sk = sock->sk;
1035         struct tipc_sock *tsk = tipc_sk(sk);
1036         struct tipc_port *port = &tsk->port;
1037         struct sk_buff *buf;
1038         struct tipc_msg *msg;
1039         long timeo;
1040         unsigned int sz;
1041         u32 err;
1042         int res;
1043
1044         /* Catch invalid receive requests */
1045         if (unlikely(!buf_len))
1046                 return -EINVAL;
1047
1048         lock_sock(sk);
1049
1050         if (unlikely(sock->state == SS_UNCONNECTED)) {
1051                 res = -ENOTCONN;
1052                 goto exit;
1053         }
1054
1055         timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
1056 restart:
1057
1058         /* Look for a message in receive queue; wait if necessary */
1059         res = tipc_wait_for_rcvmsg(sock, timeo);
1060         if (res)
1061                 goto exit;
1062
1063         /* Look at first message in receive queue */
1064         buf = skb_peek(&sk->sk_receive_queue);
1065         msg = buf_msg(buf);
1066         sz = msg_data_sz(msg);
1067         err = msg_errcode(msg);
1068
1069         /* Discard an empty non-errored message & try again */
1070         if ((!sz) && (!err)) {
1071                 advance_rx_queue(sk);
1072                 goto restart;
1073         }
1074
1075         /* Capture sender's address (optional) */
1076         set_orig_addr(m, msg);
1077
1078         /* Capture ancillary data (optional) */
1079         res = anc_data_recv(m, msg, port);
1080         if (res)
1081                 goto exit;
1082
1083         /* Capture message data (if valid) & compute return value (always) */
1084         if (!err) {
1085                 if (unlikely(buf_len < sz)) {
1086                         sz = buf_len;
1087                         m->msg_flags |= MSG_TRUNC;
1088                 }
1089                 res = skb_copy_datagram_iovec(buf, msg_hdr_sz(msg),
1090                                               m->msg_iov, sz);
1091                 if (res)
1092                         goto exit;
1093                 res = sz;
1094         } else {
1095                 if ((sock->state == SS_READY) ||
1096                     ((err == TIPC_CONN_SHUTDOWN) || m->msg_control))
1097                         res = 0;
1098                 else
1099                         res = -ECONNRESET;
1100         }
1101
1102         /* Consume received message (optional) */
1103         if (likely(!(flags & MSG_PEEK))) {
1104                 if ((sock->state != SS_READY) &&
1105                     (++port->conn_unacked >= TIPC_CONNACK_INTV))
1106                         tipc_acknowledge(port->ref, port->conn_unacked);
1107                 advance_rx_queue(sk);
1108         }
1109 exit:
1110         release_sock(sk);
1111         return res;
1112 }
1113
1114 /**
1115  * tipc_recv_stream - receive stream-oriented data
1116  * @iocb: (unused)
1117  * @m: descriptor for message info
1118  * @buf_len: total size of user buffer area
1119  * @flags: receive flags
1120  *
1121  * Used for SOCK_STREAM messages only.  If not enough data is available
1122  * will optionally wait for more; never truncates data.
1123  *
1124  * Returns size of returned message data, errno otherwise
1125  */
1126 static int tipc_recv_stream(struct kiocb *iocb, struct socket *sock,
1127                             struct msghdr *m, size_t buf_len, int flags)
1128 {
1129         struct sock *sk = sock->sk;
1130         struct tipc_sock *tsk = tipc_sk(sk);
1131         struct tipc_port *port = &tsk->port;
1132         struct sk_buff *buf;
1133         struct tipc_msg *msg;
1134         long timeo;
1135         unsigned int sz;
1136         int sz_to_copy, target, needed;
1137         int sz_copied = 0;
1138         u32 err;
1139         int res = 0;
1140
1141         /* Catch invalid receive attempts */
1142         if (unlikely(!buf_len))
1143                 return -EINVAL;
1144
1145         lock_sock(sk);
1146
1147         if (unlikely(sock->state == SS_UNCONNECTED)) {
1148                 res = -ENOTCONN;
1149                 goto exit;
1150         }
1151
1152         target = sock_rcvlowat(sk, flags & MSG_WAITALL, buf_len);
1153         timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
1154
1155 restart:
1156         /* Look for a message in receive queue; wait if necessary */
1157         res = tipc_wait_for_rcvmsg(sock, timeo);
1158         if (res)
1159                 goto exit;
1160
1161         /* Look at first message in receive queue */
1162         buf = skb_peek(&sk->sk_receive_queue);
1163         msg = buf_msg(buf);
1164         sz = msg_data_sz(msg);
1165         err = msg_errcode(msg);
1166
1167         /* Discard an empty non-errored message & try again */
1168         if ((!sz) && (!err)) {
1169                 advance_rx_queue(sk);
1170                 goto restart;
1171         }
1172
1173         /* Optionally capture sender's address & ancillary data of first msg */
1174         if (sz_copied == 0) {
1175                 set_orig_addr(m, msg);
1176                 res = anc_data_recv(m, msg, port);
1177                 if (res)
1178                         goto exit;
1179         }
1180
1181         /* Capture message data (if valid) & compute return value (always) */
1182         if (!err) {
1183                 u32 offset = (u32)(unsigned long)(TIPC_SKB_CB(buf)->handle);
1184
1185                 sz -= offset;
1186                 needed = (buf_len - sz_copied);
1187                 sz_to_copy = (sz <= needed) ? sz : needed;
1188
1189                 res = skb_copy_datagram_iovec(buf, msg_hdr_sz(msg) + offset,
1190                                               m->msg_iov, sz_to_copy);
1191                 if (res)
1192                         goto exit;
1193
1194                 sz_copied += sz_to_copy;
1195
1196                 if (sz_to_copy < sz) {
1197                         if (!(flags & MSG_PEEK))
1198                                 TIPC_SKB_CB(buf)->handle =
1199                                 (void *)(unsigned long)(offset + sz_to_copy);
1200                         goto exit;
1201                 }
1202         } else {
1203                 if (sz_copied != 0)
1204                         goto exit; /* can't add error msg to valid data */
1205
1206                 if ((err == TIPC_CONN_SHUTDOWN) || m->msg_control)
1207                         res = 0;
1208                 else
1209                         res = -ECONNRESET;
1210         }
1211
1212         /* Consume received message (optional) */
1213         if (likely(!(flags & MSG_PEEK))) {
1214                 if (unlikely(++port->conn_unacked >= TIPC_CONNACK_INTV))
1215                         tipc_acknowledge(port->ref, port->conn_unacked);
1216                 advance_rx_queue(sk);
1217         }
1218
1219         /* Loop around if more data is required */
1220         if ((sz_copied < buf_len) &&    /* didn't get all requested data */
1221             (!skb_queue_empty(&sk->sk_receive_queue) ||
1222             (sz_copied < target)) &&    /* and more is ready or required */
1223             (!(flags & MSG_PEEK)) &&    /* and aren't just peeking at data */
1224             (!err))                     /* and haven't reached a FIN */
1225                 goto restart;
1226
1227 exit:
1228         release_sock(sk);
1229         return sz_copied ? sz_copied : res;
1230 }
1231
1232 /**
1233  * tipc_write_space - wake up thread if port congestion is released
1234  * @sk: socket
1235  */
1236 static void tipc_write_space(struct sock *sk)
1237 {
1238         struct socket_wq *wq;
1239
1240         rcu_read_lock();
1241         wq = rcu_dereference(sk->sk_wq);
1242         if (wq_has_sleeper(wq))
1243                 wake_up_interruptible_sync_poll(&wq->wait, POLLOUT |
1244                                                 POLLWRNORM | POLLWRBAND);
1245         rcu_read_unlock();
1246 }
1247
1248 /**
1249  * tipc_data_ready - wake up threads to indicate messages have been received
1250  * @sk: socket
1251  * @len: the length of messages
1252  */
1253 static void tipc_data_ready(struct sock *sk)
1254 {
1255         struct socket_wq *wq;
1256
1257         rcu_read_lock();
1258         wq = rcu_dereference(sk->sk_wq);
1259         if (wq_has_sleeper(wq))
1260                 wake_up_interruptible_sync_poll(&wq->wait, POLLIN |
1261                                                 POLLRDNORM | POLLRDBAND);
1262         rcu_read_unlock();
1263 }
1264
1265 /**
1266  * filter_connect - Handle all incoming messages for a connection-based socket
1267  * @tsk: TIPC socket
1268  * @msg: message
1269  *
1270  * Returns TIPC error status code and socket error status code
1271  * once it encounters some errors
1272  */
1273 static u32 filter_connect(struct tipc_sock *tsk, struct sk_buff **buf)
1274 {
1275         struct sock *sk = &tsk->sk;
1276         struct tipc_port *port = &tsk->port;
1277         struct socket *sock = sk->sk_socket;
1278         struct tipc_msg *msg = buf_msg(*buf);
1279
1280         u32 retval = TIPC_ERR_NO_PORT;
1281         int res;
1282
1283         if (msg_mcast(msg))
1284                 return retval;
1285
1286         switch ((int)sock->state) {
1287         case SS_CONNECTED:
1288                 /* Accept only connection-based messages sent by peer */
1289                 if (msg_connected(msg) && tipc_port_peer_msg(port, msg)) {
1290                         if (unlikely(msg_errcode(msg))) {
1291                                 sock->state = SS_DISCONNECTING;
1292                                 __tipc_port_disconnect(port);
1293                         }
1294                         retval = TIPC_OK;
1295                 }
1296                 break;
1297         case SS_CONNECTING:
1298                 /* Accept only ACK or NACK message */
1299                 if (unlikely(msg_errcode(msg))) {
1300                         sock->state = SS_DISCONNECTING;
1301                         sk->sk_err = ECONNREFUSED;
1302                         retval = TIPC_OK;
1303                         break;
1304                 }
1305
1306                 if (unlikely(!msg_connected(msg)))
1307                         break;
1308
1309                 res = auto_connect(tsk, msg);
1310                 if (res) {
1311                         sock->state = SS_DISCONNECTING;
1312                         sk->sk_err = -res;
1313                         retval = TIPC_OK;
1314                         break;
1315                 }
1316
1317                 /* If an incoming message is an 'ACK-', it should be
1318                  * discarded here because it doesn't contain useful
1319                  * data. In addition, we should try to wake up
1320                  * connect() routine if sleeping.
1321                  */
1322                 if (msg_data_sz(msg) == 0) {
1323                         kfree_skb(*buf);
1324                         *buf = NULL;
1325                         if (waitqueue_active(sk_sleep(sk)))
1326                                 wake_up_interruptible(sk_sleep(sk));
1327                 }
1328                 retval = TIPC_OK;
1329                 break;
1330         case SS_LISTENING:
1331         case SS_UNCONNECTED:
1332                 /* Accept only SYN message */
1333                 if (!msg_connected(msg) && !(msg_errcode(msg)))
1334                         retval = TIPC_OK;
1335                 break;
1336         case SS_DISCONNECTING:
1337                 break;
1338         default:
1339                 pr_err("Unknown socket state %u\n", sock->state);
1340         }
1341         return retval;
1342 }
1343
1344 /**
1345  * rcvbuf_limit - get proper overload limit of socket receive queue
1346  * @sk: socket
1347  * @buf: message
1348  *
1349  * For all connection oriented messages, irrespective of importance,
1350  * the default overload value (i.e. 67MB) is set as limit.
1351  *
1352  * For all connectionless messages, by default new queue limits are
1353  * as belows:
1354  *
1355  * TIPC_LOW_IMPORTANCE       (4 MB)
1356  * TIPC_MEDIUM_IMPORTANCE    (8 MB)
1357  * TIPC_HIGH_IMPORTANCE      (16 MB)
1358  * TIPC_CRITICAL_IMPORTANCE  (32 MB)
1359  *
1360  * Returns overload limit according to corresponding message importance
1361  */
1362 static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *buf)
1363 {
1364         struct tipc_msg *msg = buf_msg(buf);
1365
1366         if (msg_connected(msg))
1367                 return sysctl_tipc_rmem[2];
1368
1369         return sk->sk_rcvbuf >> TIPC_CRITICAL_IMPORTANCE <<
1370                 msg_importance(msg);
1371 }
1372
1373 /**
1374  * filter_rcv - validate incoming message
1375  * @sk: socket
1376  * @buf: message
1377  *
1378  * Enqueues message on receive queue if acceptable; optionally handles
1379  * disconnect indication for a connected socket.
1380  *
1381  * Called with socket lock already taken; port lock may also be taken.
1382  *
1383  * Returns TIPC error status code (TIPC_OK if message is not to be rejected)
1384  */
1385 static u32 filter_rcv(struct sock *sk, struct sk_buff *buf)
1386 {
1387         struct socket *sock = sk->sk_socket;
1388         struct tipc_sock *tsk = tipc_sk(sk);
1389         struct tipc_msg *msg = buf_msg(buf);
1390         unsigned int limit = rcvbuf_limit(sk, buf);
1391         u32 res = TIPC_OK;
1392
1393         /* Reject message if it is wrong sort of message for socket */
1394         if (msg_type(msg) > TIPC_DIRECT_MSG)
1395                 return TIPC_ERR_NO_PORT;
1396
1397         if (sock->state == SS_READY) {
1398                 if (msg_connected(msg))
1399                         return TIPC_ERR_NO_PORT;
1400         } else {
1401                 res = filter_connect(tsk, &buf);
1402                 if (res != TIPC_OK || buf == NULL)
1403                         return res;
1404         }
1405
1406         /* Reject message if there isn't room to queue it */
1407         if (sk_rmem_alloc_get(sk) + buf->truesize >= limit)
1408                 return TIPC_ERR_OVERLOAD;
1409
1410         /* Enqueue message */
1411         TIPC_SKB_CB(buf)->handle = NULL;
1412         __skb_queue_tail(&sk->sk_receive_queue, buf);
1413         skb_set_owner_r(buf, sk);
1414
1415         sk->sk_data_ready(sk);
1416         return TIPC_OK;
1417 }
1418
1419 /**
1420  * tipc_backlog_rcv - handle incoming message from backlog queue
1421  * @sk: socket
1422  * @buf: message
1423  *
1424  * Caller must hold socket lock, but not port lock.
1425  *
1426  * Returns 0
1427  */
1428 static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *buf)
1429 {
1430         u32 res;
1431         struct tipc_sock *tsk = tipc_sk(sk);
1432
1433         res = filter_rcv(sk, buf);
1434         if (unlikely(res))
1435                 tipc_reject_msg(buf, res);
1436
1437         if (atomic_read(&tsk->dupl_rcvcnt) < TIPC_CONN_OVERLOAD_LIMIT)
1438                 atomic_add(buf->truesize, &tsk->dupl_rcvcnt);
1439
1440         return 0;
1441 }
1442
1443 /**
1444  * tipc_sk_rcv - handle incoming message
1445  * @sk:  socket receiving message
1446  * @buf: message
1447  *
1448  * Called with port lock already taken.
1449  *
1450  * Returns TIPC error status code (TIPC_OK if message is not to be rejected)
1451  */
1452 u32 tipc_sk_rcv(struct sock *sk, struct sk_buff *buf)
1453 {
1454         struct tipc_sock *tsk = tipc_sk(sk);
1455         u32 res;
1456         uint limit;
1457         /*
1458          * Process message if socket is unlocked; otherwise add to backlog queue
1459          *
1460          * This code is based on sk_receive_skb(), but must be distinct from it
1461          * since a TIPC-specific filter/reject mechanism is utilized
1462          */
1463         bh_lock_sock(sk);
1464         if (!sock_owned_by_user(sk)) {
1465                 res = filter_rcv(sk, buf);
1466         } else {
1467                 if (sk->sk_backlog.len == 0)
1468                         atomic_set(&tsk->dupl_rcvcnt, 0);
1469                 limit = rcvbuf_limit(sk, buf) + atomic_read(&tsk->dupl_rcvcnt);
1470                 if (sk_add_backlog(sk, buf, limit))
1471                         res = TIPC_ERR_OVERLOAD;
1472                 else
1473                         res = TIPC_OK;
1474         }
1475         bh_unlock_sock(sk);
1476
1477         return res;
1478 }
1479
1480 static int tipc_wait_for_connect(struct socket *sock, long *timeo_p)
1481 {
1482         struct sock *sk = sock->sk;
1483         DEFINE_WAIT(wait);
1484         int done;
1485
1486         do {
1487                 int err = sock_error(sk);
1488                 if (err)
1489                         return err;
1490                 if (!*timeo_p)
1491                         return -ETIMEDOUT;
1492                 if (signal_pending(current))
1493                         return sock_intr_errno(*timeo_p);
1494
1495                 prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
1496                 done = sk_wait_event(sk, timeo_p, sock->state != SS_CONNECTING);
1497                 finish_wait(sk_sleep(sk), &wait);
1498         } while (!done);
1499         return 0;
1500 }
1501
1502 /**
1503  * tipc_connect - establish a connection to another TIPC port
1504  * @sock: socket structure
1505  * @dest: socket address for destination port
1506  * @destlen: size of socket address data structure
1507  * @flags: file-related flags associated with socket
1508  *
1509  * Returns 0 on success, errno otherwise
1510  */
1511 static int tipc_connect(struct socket *sock, struct sockaddr *dest,
1512                         int destlen, int flags)
1513 {
1514         struct sock *sk = sock->sk;
1515         struct sockaddr_tipc *dst = (struct sockaddr_tipc *)dest;
1516         struct msghdr m = {NULL,};
1517         long timeout = (flags & O_NONBLOCK) ? 0 : tipc_sk(sk)->conn_timeout;
1518         socket_state previous;
1519         int res;
1520
1521         lock_sock(sk);
1522
1523         /* For now, TIPC does not allow use of connect() with DGRAM/RDM types */
1524         if (sock->state == SS_READY) {
1525                 res = -EOPNOTSUPP;
1526                 goto exit;
1527         }
1528
1529         /*
1530          * Reject connection attempt using multicast address
1531          *
1532          * Note: send_msg() validates the rest of the address fields,
1533          *       so there's no need to do it here
1534          */
1535         if (dst->addrtype == TIPC_ADDR_MCAST) {
1536                 res = -EINVAL;
1537                 goto exit;
1538         }
1539
1540         previous = sock->state;
1541         switch (sock->state) {
1542         case SS_UNCONNECTED:
1543                 /* Send a 'SYN-' to destination */
1544                 m.msg_name = dest;
1545                 m.msg_namelen = destlen;
1546
1547                 /* If connect is in non-blocking case, set MSG_DONTWAIT to
1548                  * indicate send_msg() is never blocked.
1549                  */
1550                 if (!timeout)
1551                         m.msg_flags = MSG_DONTWAIT;
1552
1553                 res = tipc_sendmsg(NULL, sock, &m, 0);
1554                 if ((res < 0) && (res != -EWOULDBLOCK))
1555                         goto exit;
1556
1557                 /* Just entered SS_CONNECTING state; the only
1558                  * difference is that return value in non-blocking
1559                  * case is EINPROGRESS, rather than EALREADY.
1560                  */
1561                 res = -EINPROGRESS;
1562         case SS_CONNECTING:
1563                 if (previous == SS_CONNECTING)
1564                         res = -EALREADY;
1565                 if (!timeout)
1566                         goto exit;
1567                 timeout = msecs_to_jiffies(timeout);
1568                 /* Wait until an 'ACK' or 'RST' arrives, or a timeout occurs */
1569                 res = tipc_wait_for_connect(sock, &timeout);
1570                 break;
1571         case SS_CONNECTED:
1572                 res = -EISCONN;
1573                 break;
1574         default:
1575                 res = -EINVAL;
1576                 break;
1577         }
1578 exit:
1579         release_sock(sk);
1580         return res;
1581 }
1582
1583 /**
1584  * tipc_listen - allow socket to listen for incoming connections
1585  * @sock: socket structure
1586  * @len: (unused)
1587  *
1588  * Returns 0 on success, errno otherwise
1589  */
1590 static int tipc_listen(struct socket *sock, int len)
1591 {
1592         struct sock *sk = sock->sk;
1593         int res;
1594
1595         lock_sock(sk);
1596
1597         if (sock->state != SS_UNCONNECTED)
1598                 res = -EINVAL;
1599         else {
1600                 sock->state = SS_LISTENING;
1601                 res = 0;
1602         }
1603
1604         release_sock(sk);
1605         return res;
1606 }
1607
1608 static int tipc_wait_for_accept(struct socket *sock, long timeo)
1609 {
1610         struct sock *sk = sock->sk;
1611         DEFINE_WAIT(wait);
1612         int err;
1613
1614         /* True wake-one mechanism for incoming connections: only
1615          * one process gets woken up, not the 'whole herd'.
1616          * Since we do not 'race & poll' for established sockets
1617          * anymore, the common case will execute the loop only once.
1618         */
1619         for (;;) {
1620                 prepare_to_wait_exclusive(sk_sleep(sk), &wait,
1621                                           TASK_INTERRUPTIBLE);
1622                 if (timeo && skb_queue_empty(&sk->sk_receive_queue)) {
1623                         release_sock(sk);
1624                         timeo = schedule_timeout(timeo);
1625                         lock_sock(sk);
1626                 }
1627                 err = 0;
1628                 if (!skb_queue_empty(&sk->sk_receive_queue))
1629                         break;
1630                 err = -EINVAL;
1631                 if (sock->state != SS_LISTENING)
1632                         break;
1633                 err = sock_intr_errno(timeo);
1634                 if (signal_pending(current))
1635                         break;
1636                 err = -EAGAIN;
1637                 if (!timeo)
1638                         break;
1639         }
1640         finish_wait(sk_sleep(sk), &wait);
1641         return err;
1642 }
1643
1644 /**
1645  * tipc_accept - wait for connection request
1646  * @sock: listening socket
1647  * @newsock: new socket that is to be connected
1648  * @flags: file-related flags associated with socket
1649  *
1650  * Returns 0 on success, errno otherwise
1651  */
1652 static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags)
1653 {
1654         struct sock *new_sk, *sk = sock->sk;
1655         struct sk_buff *buf;
1656         struct tipc_port *new_port;
1657         struct tipc_msg *msg;
1658         struct tipc_portid peer;
1659         u32 new_ref;
1660         long timeo;
1661         int res;
1662
1663         lock_sock(sk);
1664
1665         if (sock->state != SS_LISTENING) {
1666                 res = -EINVAL;
1667                 goto exit;
1668         }
1669         timeo = sock_rcvtimeo(sk, flags & O_NONBLOCK);
1670         res = tipc_wait_for_accept(sock, timeo);
1671         if (res)
1672                 goto exit;
1673
1674         buf = skb_peek(&sk->sk_receive_queue);
1675
1676         res = tipc_sk_create(sock_net(sock->sk), new_sock, 0, 1);
1677         if (res)
1678                 goto exit;
1679
1680         new_sk = new_sock->sk;
1681         new_port = &tipc_sk(new_sk)->port;
1682         new_ref = new_port->ref;
1683         msg = buf_msg(buf);
1684
1685         /* we lock on new_sk; but lockdep sees the lock on sk */
1686         lock_sock_nested(new_sk, SINGLE_DEPTH_NESTING);
1687
1688         /*
1689          * Reject any stray messages received by new socket
1690          * before the socket lock was taken (very, very unlikely)
1691          */
1692         reject_rx_queue(new_sk);
1693
1694         /* Connect new socket to it's peer */
1695         peer.ref = msg_origport(msg);
1696         peer.node = msg_orignode(msg);
1697         tipc_port_connect(new_ref, &peer);
1698         new_sock->state = SS_CONNECTED;
1699
1700         tipc_port_set_importance(new_port, msg_importance(msg));
1701         if (msg_named(msg)) {
1702                 new_port->conn_type = msg_nametype(msg);
1703                 new_port->conn_instance = msg_nameinst(msg);
1704         }
1705
1706         /*
1707          * Respond to 'SYN-' by discarding it & returning 'ACK'-.
1708          * Respond to 'SYN+' by queuing it on new socket.
1709          */
1710         if (!msg_data_sz(msg)) {
1711                 struct msghdr m = {NULL,};
1712
1713                 advance_rx_queue(sk);
1714                 tipc_send_packet(NULL, new_sock, &m, 0);
1715         } else {
1716                 __skb_dequeue(&sk->sk_receive_queue);
1717                 __skb_queue_head(&new_sk->sk_receive_queue, buf);
1718                 skb_set_owner_r(buf, new_sk);
1719         }
1720         release_sock(new_sk);
1721 exit:
1722         release_sock(sk);
1723         return res;
1724 }
1725
1726 /**
1727  * tipc_shutdown - shutdown socket connection
1728  * @sock: socket structure
1729  * @how: direction to close (must be SHUT_RDWR)
1730  *
1731  * Terminates connection (if necessary), then purges socket's receive queue.
1732  *
1733  * Returns 0 on success, errno otherwise
1734  */
1735 static int tipc_shutdown(struct socket *sock, int how)
1736 {
1737         struct sock *sk = sock->sk;
1738         struct tipc_sock *tsk = tipc_sk(sk);
1739         struct tipc_port *port = &tsk->port;
1740         struct sk_buff *buf;
1741         int res;
1742
1743         if (how != SHUT_RDWR)
1744                 return -EINVAL;
1745
1746         lock_sock(sk);
1747
1748         switch (sock->state) {
1749         case SS_CONNECTING:
1750         case SS_CONNECTED:
1751
1752 restart:
1753                 /* Disconnect and send a 'FIN+' or 'FIN-' message to peer */
1754                 buf = __skb_dequeue(&sk->sk_receive_queue);
1755                 if (buf) {
1756                         if (TIPC_SKB_CB(buf)->handle != NULL) {
1757                                 kfree_skb(buf);
1758                                 goto restart;
1759                         }
1760                         tipc_port_disconnect(port->ref);
1761                         tipc_reject_msg(buf, TIPC_CONN_SHUTDOWN);
1762                 } else {
1763                         tipc_port_shutdown(port->ref);
1764                 }
1765
1766                 sock->state = SS_DISCONNECTING;
1767
1768                 /* fall through */
1769
1770         case SS_DISCONNECTING:
1771
1772                 /* Discard any unreceived messages */
1773                 __skb_queue_purge(&sk->sk_receive_queue);
1774
1775                 /* Wake up anyone sleeping in poll */
1776                 sk->sk_state_change(sk);
1777                 res = 0;
1778                 break;
1779
1780         default:
1781                 res = -ENOTCONN;
1782         }
1783
1784         release_sock(sk);
1785         return res;
1786 }
1787
1788 /**
1789  * tipc_setsockopt - set socket option
1790  * @sock: socket structure
1791  * @lvl: option level
1792  * @opt: option identifier
1793  * @ov: pointer to new option value
1794  * @ol: length of option value
1795  *
1796  * For stream sockets only, accepts and ignores all IPPROTO_TCP options
1797  * (to ease compatibility).
1798  *
1799  * Returns 0 on success, errno otherwise
1800  */
1801 static int tipc_setsockopt(struct socket *sock, int lvl, int opt,
1802                            char __user *ov, unsigned int ol)
1803 {
1804         struct sock *sk = sock->sk;
1805         struct tipc_sock *tsk = tipc_sk(sk);
1806         struct tipc_port *port = &tsk->port;
1807         u32 value;
1808         int res;
1809
1810         if ((lvl == IPPROTO_TCP) && (sock->type == SOCK_STREAM))
1811                 return 0;
1812         if (lvl != SOL_TIPC)
1813                 return -ENOPROTOOPT;
1814         if (ol < sizeof(value))
1815                 return -EINVAL;
1816         res = get_user(value, (u32 __user *)ov);
1817         if (res)
1818                 return res;
1819
1820         lock_sock(sk);
1821
1822         switch (opt) {
1823         case TIPC_IMPORTANCE:
1824                 tipc_port_set_importance(port, value);
1825                 break;
1826         case TIPC_SRC_DROPPABLE:
1827                 if (sock->type != SOCK_STREAM)
1828                         tipc_port_set_unreliable(port, value);
1829                 else
1830                         res = -ENOPROTOOPT;
1831                 break;
1832         case TIPC_DEST_DROPPABLE:
1833                 tipc_port_set_unreturnable(port, value);
1834                 break;
1835         case TIPC_CONN_TIMEOUT:
1836                 tipc_sk(sk)->conn_timeout = value;
1837                 /* no need to set "res", since already 0 at this point */
1838                 break;
1839         default:
1840                 res = -EINVAL;
1841         }
1842
1843         release_sock(sk);
1844
1845         return res;
1846 }
1847
1848 /**
1849  * tipc_getsockopt - get socket option
1850  * @sock: socket structure
1851  * @lvl: option level
1852  * @opt: option identifier
1853  * @ov: receptacle for option value
1854  * @ol: receptacle for length of option value
1855  *
1856  * For stream sockets only, returns 0 length result for all IPPROTO_TCP options
1857  * (to ease compatibility).
1858  *
1859  * Returns 0 on success, errno otherwise
1860  */
1861 static int tipc_getsockopt(struct socket *sock, int lvl, int opt,
1862                            char __user *ov, int __user *ol)
1863 {
1864         struct sock *sk = sock->sk;
1865         struct tipc_sock *tsk = tipc_sk(sk);
1866         struct tipc_port *port = &tsk->port;
1867         int len;
1868         u32 value;
1869         int res;
1870
1871         if ((lvl == IPPROTO_TCP) && (sock->type == SOCK_STREAM))
1872                 return put_user(0, ol);
1873         if (lvl != SOL_TIPC)
1874                 return -ENOPROTOOPT;
1875         res = get_user(len, ol);
1876         if (res)
1877                 return res;
1878
1879         lock_sock(sk);
1880
1881         switch (opt) {
1882         case TIPC_IMPORTANCE:
1883                 value = tipc_port_importance(port);
1884                 break;
1885         case TIPC_SRC_DROPPABLE:
1886                 value = tipc_port_unreliable(port);
1887                 break;
1888         case TIPC_DEST_DROPPABLE:
1889                 value = tipc_port_unreturnable(port);
1890                 break;
1891         case TIPC_CONN_TIMEOUT:
1892                 value = tipc_sk(sk)->conn_timeout;
1893                 /* no need to set "res", since already 0 at this point */
1894                 break;
1895         case TIPC_NODE_RECVQ_DEPTH:
1896                 value = 0; /* was tipc_queue_size, now obsolete */
1897                 break;
1898         case TIPC_SOCK_RECVQ_DEPTH:
1899                 value = skb_queue_len(&sk->sk_receive_queue);
1900                 break;
1901         default:
1902                 res = -EINVAL;
1903         }
1904
1905         release_sock(sk);
1906
1907         if (res)
1908                 return res;     /* "get" failed */
1909
1910         if (len < sizeof(value))
1911                 return -EINVAL;
1912
1913         if (copy_to_user(ov, &value, sizeof(value)))
1914                 return -EFAULT;
1915
1916         return put_user(sizeof(value), ol);
1917 }
1918
1919 int tipc_ioctl(struct socket *sk, unsigned int cmd, unsigned long arg)
1920 {
1921         struct tipc_sioc_ln_req lnr;
1922         void __user *argp = (void __user *)arg;
1923
1924         switch (cmd) {
1925         case SIOCGETLINKNAME:
1926                 if (copy_from_user(&lnr, argp, sizeof(lnr)))
1927                         return -EFAULT;
1928                 if (!tipc_node_get_linkname(lnr.bearer_id, lnr.peer,
1929                                             lnr.linkname, TIPC_MAX_LINK_NAME)) {
1930                         if (copy_to_user(argp, &lnr, sizeof(lnr)))
1931                                 return -EFAULT;
1932                         return 0;
1933                 }
1934                 return -EADDRNOTAVAIL;
1935                 break;
1936         default:
1937                 return -ENOIOCTLCMD;
1938         }
1939 }
1940
1941 /* Protocol switches for the various types of TIPC sockets */
1942
1943 static const struct proto_ops msg_ops = {
1944         .owner          = THIS_MODULE,
1945         .family         = AF_TIPC,
1946         .release        = tipc_release,
1947         .bind           = tipc_bind,
1948         .connect        = tipc_connect,
1949         .socketpair     = sock_no_socketpair,
1950         .accept         = sock_no_accept,
1951         .getname        = tipc_getname,
1952         .poll           = tipc_poll,
1953         .ioctl          = tipc_ioctl,
1954         .listen         = sock_no_listen,
1955         .shutdown       = tipc_shutdown,
1956         .setsockopt     = tipc_setsockopt,
1957         .getsockopt     = tipc_getsockopt,
1958         .sendmsg        = tipc_sendmsg,
1959         .recvmsg        = tipc_recvmsg,
1960         .mmap           = sock_no_mmap,
1961         .sendpage       = sock_no_sendpage
1962 };
1963
1964 static const struct proto_ops packet_ops = {
1965         .owner          = THIS_MODULE,
1966         .family         = AF_TIPC,
1967         .release        = tipc_release,
1968         .bind           = tipc_bind,
1969         .connect        = tipc_connect,
1970         .socketpair     = sock_no_socketpair,
1971         .accept         = tipc_accept,
1972         .getname        = tipc_getname,
1973         .poll           = tipc_poll,
1974         .ioctl          = tipc_ioctl,
1975         .listen         = tipc_listen,
1976         .shutdown       = tipc_shutdown,
1977         .setsockopt     = tipc_setsockopt,
1978         .getsockopt     = tipc_getsockopt,
1979         .sendmsg        = tipc_send_packet,
1980         .recvmsg        = tipc_recvmsg,
1981         .mmap           = sock_no_mmap,
1982         .sendpage       = sock_no_sendpage
1983 };
1984
1985 static const struct proto_ops stream_ops = {
1986         .owner          = THIS_MODULE,
1987         .family         = AF_TIPC,
1988         .release        = tipc_release,
1989         .bind           = tipc_bind,
1990         .connect        = tipc_connect,
1991         .socketpair     = sock_no_socketpair,
1992         .accept         = tipc_accept,
1993         .getname        = tipc_getname,
1994         .poll           = tipc_poll,
1995         .ioctl          = tipc_ioctl,
1996         .listen         = tipc_listen,
1997         .shutdown       = tipc_shutdown,
1998         .setsockopt     = tipc_setsockopt,
1999         .getsockopt     = tipc_getsockopt,
2000         .sendmsg        = tipc_send_stream,
2001         .recvmsg        = tipc_recv_stream,
2002         .mmap           = sock_no_mmap,
2003         .sendpage       = sock_no_sendpage
2004 };
2005
2006 static const struct net_proto_family tipc_family_ops = {
2007         .owner          = THIS_MODULE,
2008         .family         = AF_TIPC,
2009         .create         = tipc_sk_create
2010 };
2011
2012 static struct proto tipc_proto = {
2013         .name           = "TIPC",
2014         .owner          = THIS_MODULE,
2015         .obj_size       = sizeof(struct tipc_sock),
2016         .sysctl_rmem    = sysctl_tipc_rmem
2017 };
2018
2019 static struct proto tipc_proto_kern = {
2020         .name           = "TIPC",
2021         .obj_size       = sizeof(struct tipc_sock),
2022         .sysctl_rmem    = sysctl_tipc_rmem
2023 };
2024
2025 /**
2026  * tipc_socket_init - initialize TIPC socket interface
2027  *
2028  * Returns 0 on success, errno otherwise
2029  */
2030 int tipc_socket_init(void)
2031 {
2032         int res;
2033
2034         res = proto_register(&tipc_proto, 1);
2035         if (res) {
2036                 pr_err("Failed to register TIPC protocol type\n");
2037                 goto out;
2038         }
2039
2040         res = sock_register(&tipc_family_ops);
2041         if (res) {
2042                 pr_err("Failed to register TIPC socket type\n");
2043                 proto_unregister(&tipc_proto);
2044                 goto out;
2045         }
2046  out:
2047         return res;
2048 }
2049
2050 /**
2051  * tipc_socket_stop - stop TIPC socket interface
2052  */
2053 void tipc_socket_stop(void)
2054 {
2055         sock_unregister(tipc_family_ops.family);
2056         proto_unregister(&tipc_proto);
2057 }