tipc: make subscriber server support net namespace
[cascardo/linux.git] / net / tipc / socket.c
1 /*
2  * net/tipc/socket.c: TIPC socket API
3  *
4  * Copyright (c) 2001-2007, 2012-2014, Ericsson AB
5  * Copyright (c) 2004-2008, 2010-2013, Wind River Systems
6  * All rights reserved.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in the
15  *    documentation and/or other materials provided with the distribution.
16  * 3. Neither the names of the copyright holders nor the names of its
17  *    contributors may be used to endorse or promote products derived from
18  *    this software without specific prior written permission.
19  *
20  * Alternatively, this software may be distributed under the terms of the
21  * GNU General Public License ("GPL") version 2 as published by the Free
22  * Software Foundation.
23  *
24  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
25  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
26  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
27  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
28  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
29  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
30  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
31  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
32  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
33  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
34  * POSSIBILITY OF SUCH DAMAGE.
35  */
36
37 #include <linux/rhashtable.h>
38 #include <linux/jhash.h>
39 #include "core.h"
40 #include "name_table.h"
41 #include "node.h"
42 #include "link.h"
43 #include "config.h"
44 #include "socket.h"
45
46 #define SS_LISTENING            -1      /* socket is listening */
47 #define SS_READY                -2      /* socket is connectionless */
48
49 #define CONN_TIMEOUT_DEFAULT    8000    /* default connect timeout = 8s */
50 #define CONN_PROBING_INTERVAL   msecs_to_jiffies(3600000)  /* [ms] => 1 h */
51 #define TIPC_FWD_MSG            1
52 #define TIPC_CONN_OK            0
53 #define TIPC_CONN_PROBING       1
54 #define TIPC_MAX_PORT           0xffffffff
55 #define TIPC_MIN_PORT           1
56
57 /**
58  * struct tipc_sock - TIPC socket structure
59  * @sk: socket - interacts with 'port' and with user via the socket API
60  * @connected: non-zero if port is currently connected to a peer port
61  * @conn_type: TIPC type used when connection was established
62  * @conn_instance: TIPC instance used when connection was established
63  * @published: non-zero if port has one or more associated names
64  * @max_pkt: maximum packet size "hint" used when building messages sent by port
65  * @portid: unique port identity in TIPC socket hash table
66  * @phdr: preformatted message header used when sending messages
67  * @port_list: adjacent ports in TIPC's global list of ports
68  * @publications: list of publications for port
69  * @pub_count: total # of publications port has made during its lifetime
70  * @probing_state:
71  * @probing_intv:
72  * @timer:
73  * @port: port - interacts with 'sk' and with the rest of the TIPC stack
74  * @peer_name: the peer of the connection, if any
75  * @conn_timeout: the time we can wait for an unresponded setup request
76  * @dupl_rcvcnt: number of bytes counted twice, in both backlog and rcv queue
77  * @link_cong: non-zero if owner must sleep because of link congestion
78  * @sent_unacked: # messages sent by socket, and not yet acked by peer
79  * @rcv_unacked: # messages read by user, but not yet acked back to peer
80  * @node: hash table node
81  * @rcu: rcu struct for tipc_sock
82  */
83 struct tipc_sock {
84         struct sock sk;
85         int connected;
86         u32 conn_type;
87         u32 conn_instance;
88         int published;
89         u32 max_pkt;
90         u32 portid;
91         struct tipc_msg phdr;
92         struct list_head sock_list;
93         struct list_head publications;
94         u32 pub_count;
95         u32 probing_state;
96         unsigned long probing_intv;
97         struct timer_list timer;
98         uint conn_timeout;
99         atomic_t dupl_rcvcnt;
100         bool link_cong;
101         uint sent_unacked;
102         uint rcv_unacked;
103         struct rhash_head node;
104         struct rcu_head rcu;
105 };
106
107 static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb);
108 static void tipc_data_ready(struct sock *sk);
109 static void tipc_write_space(struct sock *sk);
110 static int tipc_release(struct socket *sock);
111 static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags);
112 static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p);
113 static void tipc_sk_timeout(unsigned long data);
114 static int tipc_sk_publish(struct tipc_sock *tsk, uint scope,
115                            struct tipc_name_seq const *seq);
116 static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope,
117                             struct tipc_name_seq const *seq);
118 static struct tipc_sock *tipc_sk_lookup(struct net *net, u32 portid);
119 static int tipc_sk_insert(struct tipc_sock *tsk);
120 static void tipc_sk_remove(struct tipc_sock *tsk);
121
122 static const struct proto_ops packet_ops;
123 static const struct proto_ops stream_ops;
124 static const struct proto_ops msg_ops;
125
126 static struct proto tipc_proto;
127 static struct proto tipc_proto_kern;
128
129 static const struct nla_policy tipc_nl_sock_policy[TIPC_NLA_SOCK_MAX + 1] = {
130         [TIPC_NLA_SOCK_UNSPEC]          = { .type = NLA_UNSPEC },
131         [TIPC_NLA_SOCK_ADDR]            = { .type = NLA_U32 },
132         [TIPC_NLA_SOCK_REF]             = { .type = NLA_U32 },
133         [TIPC_NLA_SOCK_CON]             = { .type = NLA_NESTED },
134         [TIPC_NLA_SOCK_HAS_PUBL]        = { .type = NLA_FLAG }
135 };
136
137 /*
138  * Revised TIPC socket locking policy:
139  *
140  * Most socket operations take the standard socket lock when they start
141  * and hold it until they finish (or until they need to sleep).  Acquiring
142  * this lock grants the owner exclusive access to the fields of the socket
143  * data structures, with the exception of the backlog queue.  A few socket
144  * operations can be done without taking the socket lock because they only
145  * read socket information that never changes during the life of the socket.
146  *
147  * Socket operations may acquire the lock for the associated TIPC port if they
148  * need to perform an operation on the port.  If any routine needs to acquire
149  * both the socket lock and the port lock it must take the socket lock first
150  * to avoid the risk of deadlock.
151  *
152  * The dispatcher handling incoming messages cannot grab the socket lock in
153  * the standard fashion, since invoked it runs at the BH level and cannot block.
154  * Instead, it checks to see if the socket lock is currently owned by someone,
155  * and either handles the message itself or adds it to the socket's backlog
156  * queue; in the latter case the queued message is processed once the process
157  * owning the socket lock releases it.
158  *
159  * NOTE: Releasing the socket lock while an operation is sleeping overcomes
160  * the problem of a blocked socket operation preventing any other operations
161  * from occurring.  However, applications must be careful if they have
162  * multiple threads trying to send (or receive) on the same socket, as these
163  * operations might interfere with each other.  For example, doing a connect
164  * and a receive at the same time might allow the receive to consume the
165  * ACK message meant for the connect.  While additional work could be done
166  * to try and overcome this, it doesn't seem to be worthwhile at the present.
167  *
168  * NOTE: Releasing the socket lock while an operation is sleeping also ensures
169  * that another operation that must be performed in a non-blocking manner is
170  * not delayed for very long because the lock has already been taken.
171  *
172  * NOTE: This code assumes that certain fields of a port/socket pair are
173  * constant over its lifetime; such fields can be examined without taking
174  * the socket lock and/or port lock, and do not need to be re-read even
175  * after resuming processing after waiting.  These fields include:
176  *   - socket type
177  *   - pointer to socket sk structure (aka tipc_sock structure)
178  *   - pointer to port structure
179  *   - port reference
180  */
181
182 static u32 tsk_peer_node(struct tipc_sock *tsk)
183 {
184         return msg_destnode(&tsk->phdr);
185 }
186
187 static u32 tsk_peer_port(struct tipc_sock *tsk)
188 {
189         return msg_destport(&tsk->phdr);
190 }
191
192 static  bool tsk_unreliable(struct tipc_sock *tsk)
193 {
194         return msg_src_droppable(&tsk->phdr) != 0;
195 }
196
197 static void tsk_set_unreliable(struct tipc_sock *tsk, bool unreliable)
198 {
199         msg_set_src_droppable(&tsk->phdr, unreliable ? 1 : 0);
200 }
201
202 static bool tsk_unreturnable(struct tipc_sock *tsk)
203 {
204         return msg_dest_droppable(&tsk->phdr) != 0;
205 }
206
207 static void tsk_set_unreturnable(struct tipc_sock *tsk, bool unreturnable)
208 {
209         msg_set_dest_droppable(&tsk->phdr, unreturnable ? 1 : 0);
210 }
211
212 static int tsk_importance(struct tipc_sock *tsk)
213 {
214         return msg_importance(&tsk->phdr);
215 }
216
217 static int tsk_set_importance(struct tipc_sock *tsk, int imp)
218 {
219         if (imp > TIPC_CRITICAL_IMPORTANCE)
220                 return -EINVAL;
221         msg_set_importance(&tsk->phdr, (u32)imp);
222         return 0;
223 }
224
225 static struct tipc_sock *tipc_sk(const struct sock *sk)
226 {
227         return container_of(sk, struct tipc_sock, sk);
228 }
229
230 static int tsk_conn_cong(struct tipc_sock *tsk)
231 {
232         return tsk->sent_unacked >= TIPC_FLOWCTRL_WIN;
233 }
234
235 /**
236  * tsk_advance_rx_queue - discard first buffer in socket receive queue
237  *
238  * Caller must hold socket lock
239  */
240 static void tsk_advance_rx_queue(struct sock *sk)
241 {
242         kfree_skb(__skb_dequeue(&sk->sk_receive_queue));
243 }
244
245 /**
246  * tsk_rej_rx_queue - reject all buffers in socket receive queue
247  *
248  * Caller must hold socket lock
249  */
250 static void tsk_rej_rx_queue(struct sock *sk)
251 {
252         struct sk_buff *skb;
253         u32 dnode;
254         struct net *net = sock_net(sk);
255
256         while ((skb = __skb_dequeue(&sk->sk_receive_queue))) {
257                 if (tipc_msg_reverse(net, skb, &dnode, TIPC_ERR_NO_PORT))
258                         tipc_link_xmit_skb(net, skb, dnode, 0);
259         }
260 }
261
262 /* tsk_peer_msg - verify if message was sent by connected port's peer
263  *
264  * Handles cases where the node's network address has changed from
265  * the default of <0.0.0> to its configured setting.
266  */
267 static bool tsk_peer_msg(struct tipc_sock *tsk, struct tipc_msg *msg)
268 {
269         struct tipc_net *tn = net_generic(sock_net(&tsk->sk), tipc_net_id);
270         u32 peer_port = tsk_peer_port(tsk);
271         u32 orig_node;
272         u32 peer_node;
273
274         if (unlikely(!tsk->connected))
275                 return false;
276
277         if (unlikely(msg_origport(msg) != peer_port))
278                 return false;
279
280         orig_node = msg_orignode(msg);
281         peer_node = tsk_peer_node(tsk);
282
283         if (likely(orig_node == peer_node))
284                 return true;
285
286         if (!orig_node && (peer_node == tn->own_addr))
287                 return true;
288
289         if (!peer_node && (orig_node == tn->own_addr))
290                 return true;
291
292         return false;
293 }
294
295 /**
296  * tipc_sk_create - create a TIPC socket
297  * @net: network namespace (must be default network)
298  * @sock: pre-allocated socket structure
299  * @protocol: protocol indicator (must be 0)
300  * @kern: caused by kernel or by userspace?
301  *
302  * This routine creates additional data structures used by the TIPC socket,
303  * initializes them, and links them together.
304  *
305  * Returns 0 on success, errno otherwise
306  */
307 static int tipc_sk_create(struct net *net, struct socket *sock,
308                           int protocol, int kern)
309 {
310         const struct proto_ops *ops;
311         socket_state state;
312         struct sock *sk;
313         struct tipc_sock *tsk;
314         struct tipc_msg *msg;
315
316         /* Validate arguments */
317         if (unlikely(protocol != 0))
318                 return -EPROTONOSUPPORT;
319
320         switch (sock->type) {
321         case SOCK_STREAM:
322                 ops = &stream_ops;
323                 state = SS_UNCONNECTED;
324                 break;
325         case SOCK_SEQPACKET:
326                 ops = &packet_ops;
327                 state = SS_UNCONNECTED;
328                 break;
329         case SOCK_DGRAM:
330         case SOCK_RDM:
331                 ops = &msg_ops;
332                 state = SS_READY;
333                 break;
334         default:
335                 return -EPROTOTYPE;
336         }
337
338         /* Allocate socket's protocol area */
339         if (!kern)
340                 sk = sk_alloc(net, AF_TIPC, GFP_KERNEL, &tipc_proto);
341         else
342                 sk = sk_alloc(net, AF_TIPC, GFP_KERNEL, &tipc_proto_kern);
343
344         if (sk == NULL)
345                 return -ENOMEM;
346
347         tsk = tipc_sk(sk);
348         tsk->max_pkt = MAX_PKT_DEFAULT;
349         INIT_LIST_HEAD(&tsk->publications);
350         msg = &tsk->phdr;
351         tipc_msg_init(net, msg, TIPC_LOW_IMPORTANCE, TIPC_NAMED_MSG,
352                       NAMED_H_SIZE, 0);
353
354         /* Finish initializing socket data structures */
355         sock->ops = ops;
356         sock->state = state;
357         sock_init_data(sock, sk);
358         if (tipc_sk_insert(tsk)) {
359                 pr_warn("Socket create failed; port numbrer exhausted\n");
360                 return -EINVAL;
361         }
362         msg_set_origport(msg, tsk->portid);
363         setup_timer(&tsk->timer, tipc_sk_timeout, (unsigned long)tsk);
364         sk->sk_backlog_rcv = tipc_backlog_rcv;
365         sk->sk_rcvbuf = sysctl_tipc_rmem[1];
366         sk->sk_data_ready = tipc_data_ready;
367         sk->sk_write_space = tipc_write_space;
368         tsk->conn_timeout = CONN_TIMEOUT_DEFAULT;
369         tsk->sent_unacked = 0;
370         atomic_set(&tsk->dupl_rcvcnt, 0);
371
372         if (sock->state == SS_READY) {
373                 tsk_set_unreturnable(tsk, true);
374                 if (sock->type == SOCK_DGRAM)
375                         tsk_set_unreliable(tsk, true);
376         }
377         return 0;
378 }
379
380 /**
381  * tipc_sock_create_local - create TIPC socket from inside TIPC module
382  * @type: socket type - SOCK_RDM or SOCK_SEQPACKET
383  *
384  * We cannot use sock_creat_kern here because it bumps module user count.
385  * Since socket owner and creator is the same module we must make sure
386  * that module count remains zero for module local sockets, otherwise
387  * we cannot do rmmod.
388  *
389  * Returns 0 on success, errno otherwise
390  */
391 int tipc_sock_create_local(struct net *net, int type, struct socket **res)
392 {
393         int rc;
394
395         rc = sock_create_lite(AF_TIPC, type, 0, res);
396         if (rc < 0) {
397                 pr_err("Failed to create kernel socket\n");
398                 return rc;
399         }
400         tipc_sk_create(net, *res, 0, 1);
401
402         return 0;
403 }
404
405 /**
406  * tipc_sock_release_local - release socket created by tipc_sock_create_local
407  * @sock: the socket to be released.
408  *
409  * Module reference count is not incremented when such sockets are created,
410  * so we must keep it from being decremented when they are released.
411  */
412 void tipc_sock_release_local(struct socket *sock)
413 {
414         tipc_release(sock);
415         sock->ops = NULL;
416         sock_release(sock);
417 }
418
419 /**
420  * tipc_sock_accept_local - accept a connection on a socket created
421  * with tipc_sock_create_local. Use this function to avoid that
422  * module reference count is inadvertently incremented.
423  *
424  * @sock:    the accepting socket
425  * @newsock: reference to the new socket to be created
426  * @flags:   socket flags
427  */
428
429 int tipc_sock_accept_local(struct socket *sock, struct socket **newsock,
430                            int flags)
431 {
432         struct sock *sk = sock->sk;
433         int ret;
434
435         ret = sock_create_lite(sk->sk_family, sk->sk_type,
436                                sk->sk_protocol, newsock);
437         if (ret < 0)
438                 return ret;
439
440         ret = tipc_accept(sock, *newsock, flags);
441         if (ret < 0) {
442                 sock_release(*newsock);
443                 return ret;
444         }
445         (*newsock)->ops = sock->ops;
446         return ret;
447 }
448
449 static void tipc_sk_callback(struct rcu_head *head)
450 {
451         struct tipc_sock *tsk = container_of(head, struct tipc_sock, rcu);
452
453         sock_put(&tsk->sk);
454 }
455
456 /**
457  * tipc_release - destroy a TIPC socket
458  * @sock: socket to destroy
459  *
460  * This routine cleans up any messages that are still queued on the socket.
461  * For DGRAM and RDM socket types, all queued messages are rejected.
462  * For SEQPACKET and STREAM socket types, the first message is rejected
463  * and any others are discarded.  (If the first message on a STREAM socket
464  * is partially-read, it is discarded and the next one is rejected instead.)
465  *
466  * NOTE: Rejected messages are not necessarily returned to the sender!  They
467  * are returned or discarded according to the "destination droppable" setting
468  * specified for the message by the sender.
469  *
470  * Returns 0 on success, errno otherwise
471  */
472 static int tipc_release(struct socket *sock)
473 {
474         struct sock *sk = sock->sk;
475         struct net *net = sock_net(sk);
476         struct tipc_net *tn = net_generic(net, tipc_net_id);
477         struct tipc_sock *tsk;
478         struct sk_buff *skb;
479         u32 dnode, probing_state;
480
481         /*
482          * Exit if socket isn't fully initialized (occurs when a failed accept()
483          * releases a pre-allocated child socket that was never used)
484          */
485         if (sk == NULL)
486                 return 0;
487
488         tsk = tipc_sk(sk);
489         lock_sock(sk);
490
491         /*
492          * Reject all unreceived messages, except on an active connection
493          * (which disconnects locally & sends a 'FIN+' to peer)
494          */
495         dnode = tsk_peer_node(tsk);
496         while (sock->state != SS_DISCONNECTING) {
497                 skb = __skb_dequeue(&sk->sk_receive_queue);
498                 if (skb == NULL)
499                         break;
500                 if (TIPC_SKB_CB(skb)->handle != NULL)
501                         kfree_skb(skb);
502                 else {
503                         if ((sock->state == SS_CONNECTING) ||
504                             (sock->state == SS_CONNECTED)) {
505                                 sock->state = SS_DISCONNECTING;
506                                 tsk->connected = 0;
507                                 tipc_node_remove_conn(net, dnode, tsk->portid);
508                         }
509                         if (tipc_msg_reverse(net, skb, &dnode,
510                                              TIPC_ERR_NO_PORT))
511                                 tipc_link_xmit_skb(net, skb, dnode, 0);
512                 }
513         }
514
515         tipc_sk_withdraw(tsk, 0, NULL);
516         probing_state = tsk->probing_state;
517         if (del_timer_sync(&tsk->timer) && probing_state != TIPC_CONN_PROBING)
518                 sock_put(sk);
519         tipc_sk_remove(tsk);
520         if (tsk->connected) {
521                 skb = tipc_msg_create(net, TIPC_CRITICAL_IMPORTANCE,
522                                       TIPC_CONN_MSG, SHORT_H_SIZE, 0, dnode,
523                                       tn->own_addr, tsk_peer_port(tsk),
524                                       tsk->portid, TIPC_ERR_NO_PORT);
525                 if (skb)
526                         tipc_link_xmit_skb(net, skb, dnode, tsk->portid);
527                 tipc_node_remove_conn(net, dnode, tsk->portid);
528         }
529
530         /* Discard any remaining (connection-based) messages in receive queue */
531         __skb_queue_purge(&sk->sk_receive_queue);
532
533         /* Reject any messages that accumulated in backlog queue */
534         sock->state = SS_DISCONNECTING;
535         release_sock(sk);
536
537         call_rcu(&tsk->rcu, tipc_sk_callback);
538         sock->sk = NULL;
539
540         return 0;
541 }
542
543 /**
544  * tipc_bind - associate or disassocate TIPC name(s) with a socket
545  * @sock: socket structure
546  * @uaddr: socket address describing name(s) and desired operation
547  * @uaddr_len: size of socket address data structure
548  *
549  * Name and name sequence binding is indicated using a positive scope value;
550  * a negative scope value unbinds the specified name.  Specifying no name
551  * (i.e. a socket address length of 0) unbinds all names from the socket.
552  *
553  * Returns 0 on success, errno otherwise
554  *
555  * NOTE: This routine doesn't need to take the socket lock since it doesn't
556  *       access any non-constant socket information.
557  */
558 static int tipc_bind(struct socket *sock, struct sockaddr *uaddr,
559                      int uaddr_len)
560 {
561         struct sock *sk = sock->sk;
562         struct sockaddr_tipc *addr = (struct sockaddr_tipc *)uaddr;
563         struct tipc_sock *tsk = tipc_sk(sk);
564         int res = -EINVAL;
565
566         lock_sock(sk);
567         if (unlikely(!uaddr_len)) {
568                 res = tipc_sk_withdraw(tsk, 0, NULL);
569                 goto exit;
570         }
571
572         if (uaddr_len < sizeof(struct sockaddr_tipc)) {
573                 res = -EINVAL;
574                 goto exit;
575         }
576         if (addr->family != AF_TIPC) {
577                 res = -EAFNOSUPPORT;
578                 goto exit;
579         }
580
581         if (addr->addrtype == TIPC_ADDR_NAME)
582                 addr->addr.nameseq.upper = addr->addr.nameseq.lower;
583         else if (addr->addrtype != TIPC_ADDR_NAMESEQ) {
584                 res = -EAFNOSUPPORT;
585                 goto exit;
586         }
587
588         if ((addr->addr.nameseq.type < TIPC_RESERVED_TYPES) &&
589             (addr->addr.nameseq.type != TIPC_TOP_SRV) &&
590             (addr->addr.nameseq.type != TIPC_CFG_SRV)) {
591                 res = -EACCES;
592                 goto exit;
593         }
594
595         res = (addr->scope > 0) ?
596                 tipc_sk_publish(tsk, addr->scope, &addr->addr.nameseq) :
597                 tipc_sk_withdraw(tsk, -addr->scope, &addr->addr.nameseq);
598 exit:
599         release_sock(sk);
600         return res;
601 }
602
603 /**
604  * tipc_getname - get port ID of socket or peer socket
605  * @sock: socket structure
606  * @uaddr: area for returned socket address
607  * @uaddr_len: area for returned length of socket address
608  * @peer: 0 = own ID, 1 = current peer ID, 2 = current/former peer ID
609  *
610  * Returns 0 on success, errno otherwise
611  *
612  * NOTE: This routine doesn't need to take the socket lock since it only
613  *       accesses socket information that is unchanging (or which changes in
614  *       a completely predictable manner).
615  */
616 static int tipc_getname(struct socket *sock, struct sockaddr *uaddr,
617                         int *uaddr_len, int peer)
618 {
619         struct sockaddr_tipc *addr = (struct sockaddr_tipc *)uaddr;
620         struct tipc_sock *tsk = tipc_sk(sock->sk);
621         struct tipc_net *tn = net_generic(sock_net(sock->sk), tipc_net_id);
622
623         memset(addr, 0, sizeof(*addr));
624         if (peer) {
625                 if ((sock->state != SS_CONNECTED) &&
626                         ((peer != 2) || (sock->state != SS_DISCONNECTING)))
627                         return -ENOTCONN;
628                 addr->addr.id.ref = tsk_peer_port(tsk);
629                 addr->addr.id.node = tsk_peer_node(tsk);
630         } else {
631                 addr->addr.id.ref = tsk->portid;
632                 addr->addr.id.node = tn->own_addr;
633         }
634
635         *uaddr_len = sizeof(*addr);
636         addr->addrtype = TIPC_ADDR_ID;
637         addr->family = AF_TIPC;
638         addr->scope = 0;
639         addr->addr.name.domain = 0;
640
641         return 0;
642 }
643
644 /**
645  * tipc_poll - read and possibly block on pollmask
646  * @file: file structure associated with the socket
647  * @sock: socket for which to calculate the poll bits
648  * @wait: ???
649  *
650  * Returns pollmask value
651  *
652  * COMMENTARY:
653  * It appears that the usual socket locking mechanisms are not useful here
654  * since the pollmask info is potentially out-of-date the moment this routine
655  * exits.  TCP and other protocols seem to rely on higher level poll routines
656  * to handle any preventable race conditions, so TIPC will do the same ...
657  *
658  * TIPC sets the returned events as follows:
659  *
660  * socket state         flags set
661  * ------------         ---------
662  * unconnected          no read flags
663  *                      POLLOUT if port is not congested
664  *
665  * connecting           POLLIN/POLLRDNORM if ACK/NACK in rx queue
666  *                      no write flags
667  *
668  * connected            POLLIN/POLLRDNORM if data in rx queue
669  *                      POLLOUT if port is not congested
670  *
671  * disconnecting        POLLIN/POLLRDNORM/POLLHUP
672  *                      no write flags
673  *
674  * listening            POLLIN if SYN in rx queue
675  *                      no write flags
676  *
677  * ready                POLLIN/POLLRDNORM if data in rx queue
678  * [connectionless]     POLLOUT (since port cannot be congested)
679  *
680  * IMPORTANT: The fact that a read or write operation is indicated does NOT
681  * imply that the operation will succeed, merely that it should be performed
682  * and will not block.
683  */
684 static unsigned int tipc_poll(struct file *file, struct socket *sock,
685                               poll_table *wait)
686 {
687         struct sock *sk = sock->sk;
688         struct tipc_sock *tsk = tipc_sk(sk);
689         u32 mask = 0;
690
691         sock_poll_wait(file, sk_sleep(sk), wait);
692
693         switch ((int)sock->state) {
694         case SS_UNCONNECTED:
695                 if (!tsk->link_cong)
696                         mask |= POLLOUT;
697                 break;
698         case SS_READY:
699         case SS_CONNECTED:
700                 if (!tsk->link_cong && !tsk_conn_cong(tsk))
701                         mask |= POLLOUT;
702                 /* fall thru' */
703         case SS_CONNECTING:
704         case SS_LISTENING:
705                 if (!skb_queue_empty(&sk->sk_receive_queue))
706                         mask |= (POLLIN | POLLRDNORM);
707                 break;
708         case SS_DISCONNECTING:
709                 mask = (POLLIN | POLLRDNORM | POLLHUP);
710                 break;
711         }
712
713         return mask;
714 }
715
716 /**
717  * tipc_sendmcast - send multicast message
718  * @sock: socket structure
719  * @seq: destination address
720  * @msg: message to send
721  * @dsz: total length of message data
722  * @timeo: timeout to wait for wakeup
723  *
724  * Called from function tipc_sendmsg(), which has done all sanity checks
725  * Returns the number of bytes sent on success, or errno
726  */
727 static int tipc_sendmcast(struct  socket *sock, struct tipc_name_seq *seq,
728                           struct msghdr *msg, size_t dsz, long timeo)
729 {
730         struct sock *sk = sock->sk;
731         struct net *net = sock_net(sk);
732         struct tipc_msg *mhdr = &tipc_sk(sk)->phdr;
733         struct sk_buff_head head;
734         uint mtu;
735         int rc;
736
737         msg_set_type(mhdr, TIPC_MCAST_MSG);
738         msg_set_lookup_scope(mhdr, TIPC_CLUSTER_SCOPE);
739         msg_set_destport(mhdr, 0);
740         msg_set_destnode(mhdr, 0);
741         msg_set_nametype(mhdr, seq->type);
742         msg_set_namelower(mhdr, seq->lower);
743         msg_set_nameupper(mhdr, seq->upper);
744         msg_set_hdr_sz(mhdr, MCAST_H_SIZE);
745
746 new_mtu:
747         mtu = tipc_bclink_get_mtu();
748         __skb_queue_head_init(&head);
749         rc = tipc_msg_build(net, mhdr, msg, 0, dsz, mtu, &head);
750         if (unlikely(rc < 0))
751                 return rc;
752
753         do {
754                 rc = tipc_bclink_xmit(net, &head);
755                 if (likely(rc >= 0)) {
756                         rc = dsz;
757                         break;
758                 }
759                 if (rc == -EMSGSIZE)
760                         goto new_mtu;
761                 if (rc != -ELINKCONG)
762                         break;
763                 tipc_sk(sk)->link_cong = 1;
764                 rc = tipc_wait_for_sndmsg(sock, &timeo);
765                 if (rc)
766                         __skb_queue_purge(&head);
767         } while (!rc);
768         return rc;
769 }
770
771 /* tipc_sk_mcast_rcv - Deliver multicast message to all destination sockets
772  */
773 void tipc_sk_mcast_rcv(struct net *net, struct sk_buff *buf)
774 {
775         struct tipc_msg *msg = buf_msg(buf);
776         struct tipc_port_list dports = {0, NULL, };
777         struct tipc_port_list *item;
778         struct sk_buff *b;
779         uint i, last, dst = 0;
780         u32 scope = TIPC_CLUSTER_SCOPE;
781
782         if (in_own_node(net, msg_orignode(msg)))
783                 scope = TIPC_NODE_SCOPE;
784
785         /* Create destination port list: */
786         tipc_nametbl_mc_translate(net, msg_nametype(msg), msg_namelower(msg),
787                                   msg_nameupper(msg), scope, &dports);
788         last = dports.count;
789         if (!last) {
790                 kfree_skb(buf);
791                 return;
792         }
793
794         for (item = &dports; item; item = item->next) {
795                 for (i = 0; i < PLSIZE && ++dst <= last; i++) {
796                         b = (dst != last) ? skb_clone(buf, GFP_ATOMIC) : buf;
797                         if (!b) {
798                                 pr_warn("Failed do clone mcast rcv buffer\n");
799                                 continue;
800                         }
801                         msg_set_destport(msg, item->ports[i]);
802                         tipc_sk_rcv(net, b);
803                 }
804         }
805         tipc_port_list_free(&dports);
806 }
807
808 /**
809  * tipc_sk_proto_rcv - receive a connection mng protocol message
810  * @tsk: receiving socket
811  * @dnode: node to send response message to, if any
812  * @buf: buffer containing protocol message
813  * Returns 0 (TIPC_OK) if message was consumed, 1 (TIPC_FWD_MSG) if
814  * (CONN_PROBE_REPLY) message should be forwarded.
815  */
816 static int tipc_sk_proto_rcv(struct tipc_sock *tsk, u32 *dnode,
817                              struct sk_buff *buf)
818 {
819         struct tipc_msg *msg = buf_msg(buf);
820         int conn_cong;
821
822         /* Ignore if connection cannot be validated: */
823         if (!tsk_peer_msg(tsk, msg))
824                 goto exit;
825
826         tsk->probing_state = TIPC_CONN_OK;
827
828         if (msg_type(msg) == CONN_ACK) {
829                 conn_cong = tsk_conn_cong(tsk);
830                 tsk->sent_unacked -= msg_msgcnt(msg);
831                 if (conn_cong)
832                         tsk->sk.sk_write_space(&tsk->sk);
833         } else if (msg_type(msg) == CONN_PROBE) {
834                 if (!tipc_msg_reverse(sock_net(&tsk->sk), buf, dnode, TIPC_OK))
835                         return TIPC_OK;
836                 msg_set_type(msg, CONN_PROBE_REPLY);
837                 return TIPC_FWD_MSG;
838         }
839         /* Do nothing if msg_type() == CONN_PROBE_REPLY */
840 exit:
841         kfree_skb(buf);
842         return TIPC_OK;
843 }
844
845 static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p)
846 {
847         struct sock *sk = sock->sk;
848         struct tipc_sock *tsk = tipc_sk(sk);
849         DEFINE_WAIT(wait);
850         int done;
851
852         do {
853                 int err = sock_error(sk);
854                 if (err)
855                         return err;
856                 if (sock->state == SS_DISCONNECTING)
857                         return -EPIPE;
858                 if (!*timeo_p)
859                         return -EAGAIN;
860                 if (signal_pending(current))
861                         return sock_intr_errno(*timeo_p);
862
863                 prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
864                 done = sk_wait_event(sk, timeo_p, !tsk->link_cong);
865                 finish_wait(sk_sleep(sk), &wait);
866         } while (!done);
867         return 0;
868 }
869
870 /**
871  * tipc_sendmsg - send message in connectionless manner
872  * @iocb: if NULL, indicates that socket lock is already held
873  * @sock: socket structure
874  * @m: message to send
875  * @dsz: amount of user data to be sent
876  *
877  * Message must have an destination specified explicitly.
878  * Used for SOCK_RDM and SOCK_DGRAM messages,
879  * and for 'SYN' messages on SOCK_SEQPACKET and SOCK_STREAM connections.
880  * (Note: 'SYN+' is prohibited on SOCK_STREAM.)
881  *
882  * Returns the number of bytes sent on success, or errno otherwise
883  */
884 static int tipc_sendmsg(struct kiocb *iocb, struct socket *sock,
885                         struct msghdr *m, size_t dsz)
886 {
887         DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
888         struct sock *sk = sock->sk;
889         struct tipc_sock *tsk = tipc_sk(sk);
890         struct net *net = sock_net(sk);
891         struct tipc_msg *mhdr = &tsk->phdr;
892         u32 dnode, dport;
893         struct sk_buff_head head;
894         struct sk_buff *skb;
895         struct tipc_name_seq *seq = &dest->addr.nameseq;
896         u32 mtu;
897         long timeo;
898         int rc;
899
900         if (unlikely(!dest))
901                 return -EDESTADDRREQ;
902
903         if (unlikely((m->msg_namelen < sizeof(*dest)) ||
904                      (dest->family != AF_TIPC)))
905                 return -EINVAL;
906
907         if (dsz > TIPC_MAX_USER_MSG_SIZE)
908                 return -EMSGSIZE;
909
910         if (iocb)
911                 lock_sock(sk);
912
913         if (unlikely(sock->state != SS_READY)) {
914                 if (sock->state == SS_LISTENING) {
915                         rc = -EPIPE;
916                         goto exit;
917                 }
918                 if (sock->state != SS_UNCONNECTED) {
919                         rc = -EISCONN;
920                         goto exit;
921                 }
922                 if (tsk->published) {
923                         rc = -EOPNOTSUPP;
924                         goto exit;
925                 }
926                 if (dest->addrtype == TIPC_ADDR_NAME) {
927                         tsk->conn_type = dest->addr.name.name.type;
928                         tsk->conn_instance = dest->addr.name.name.instance;
929                 }
930         }
931
932         timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
933
934         if (dest->addrtype == TIPC_ADDR_MCAST) {
935                 rc = tipc_sendmcast(sock, seq, m, dsz, timeo);
936                 goto exit;
937         } else if (dest->addrtype == TIPC_ADDR_NAME) {
938                 u32 type = dest->addr.name.name.type;
939                 u32 inst = dest->addr.name.name.instance;
940                 u32 domain = dest->addr.name.domain;
941
942                 dnode = domain;
943                 msg_set_type(mhdr, TIPC_NAMED_MSG);
944                 msg_set_hdr_sz(mhdr, NAMED_H_SIZE);
945                 msg_set_nametype(mhdr, type);
946                 msg_set_nameinst(mhdr, inst);
947                 msg_set_lookup_scope(mhdr, tipc_addr_scope(domain));
948                 dport = tipc_nametbl_translate(net, type, inst, &dnode);
949                 msg_set_destnode(mhdr, dnode);
950                 msg_set_destport(mhdr, dport);
951                 if (unlikely(!dport && !dnode)) {
952                         rc = -EHOSTUNREACH;
953                         goto exit;
954                 }
955         } else if (dest->addrtype == TIPC_ADDR_ID) {
956                 dnode = dest->addr.id.node;
957                 msg_set_type(mhdr, TIPC_DIRECT_MSG);
958                 msg_set_lookup_scope(mhdr, 0);
959                 msg_set_destnode(mhdr, dnode);
960                 msg_set_destport(mhdr, dest->addr.id.ref);
961                 msg_set_hdr_sz(mhdr, BASIC_H_SIZE);
962         }
963
964 new_mtu:
965         mtu = tipc_node_get_mtu(net, dnode, tsk->portid);
966         __skb_queue_head_init(&head);
967         rc = tipc_msg_build(net, mhdr, m, 0, dsz, mtu, &head);
968         if (rc < 0)
969                 goto exit;
970
971         do {
972                 skb = skb_peek(&head);
973                 TIPC_SKB_CB(skb)->wakeup_pending = tsk->link_cong;
974                 rc = tipc_link_xmit(net, &head, dnode, tsk->portid);
975                 if (likely(rc >= 0)) {
976                         if (sock->state != SS_READY)
977                                 sock->state = SS_CONNECTING;
978                         rc = dsz;
979                         break;
980                 }
981                 if (rc == -EMSGSIZE)
982                         goto new_mtu;
983                 if (rc != -ELINKCONG)
984                         break;
985                 tsk->link_cong = 1;
986                 rc = tipc_wait_for_sndmsg(sock, &timeo);
987                 if (rc)
988                         __skb_queue_purge(&head);
989         } while (!rc);
990 exit:
991         if (iocb)
992                 release_sock(sk);
993
994         return rc;
995 }
996
997 static int tipc_wait_for_sndpkt(struct socket *sock, long *timeo_p)
998 {
999         struct sock *sk = sock->sk;
1000         struct tipc_sock *tsk = tipc_sk(sk);
1001         DEFINE_WAIT(wait);
1002         int done;
1003
1004         do {
1005                 int err = sock_error(sk);
1006                 if (err)
1007                         return err;
1008                 if (sock->state == SS_DISCONNECTING)
1009                         return -EPIPE;
1010                 else if (sock->state != SS_CONNECTED)
1011                         return -ENOTCONN;
1012                 if (!*timeo_p)
1013                         return -EAGAIN;
1014                 if (signal_pending(current))
1015                         return sock_intr_errno(*timeo_p);
1016
1017                 prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
1018                 done = sk_wait_event(sk, timeo_p,
1019                                      (!tsk->link_cong &&
1020                                       !tsk_conn_cong(tsk)) ||
1021                                      !tsk->connected);
1022                 finish_wait(sk_sleep(sk), &wait);
1023         } while (!done);
1024         return 0;
1025 }
1026
1027 /**
1028  * tipc_send_stream - send stream-oriented data
1029  * @iocb: (unused)
1030  * @sock: socket structure
1031  * @m: data to send
1032  * @dsz: total length of data to be transmitted
1033  *
1034  * Used for SOCK_STREAM data.
1035  *
1036  * Returns the number of bytes sent on success (or partial success),
1037  * or errno if no data sent
1038  */
1039 static int tipc_send_stream(struct kiocb *iocb, struct socket *sock,
1040                             struct msghdr *m, size_t dsz)
1041 {
1042         struct sock *sk = sock->sk;
1043         struct net *net = sock_net(sk);
1044         struct tipc_sock *tsk = tipc_sk(sk);
1045         struct tipc_msg *mhdr = &tsk->phdr;
1046         struct sk_buff_head head;
1047         DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
1048         u32 portid = tsk->portid;
1049         int rc = -EINVAL;
1050         long timeo;
1051         u32 dnode;
1052         uint mtu, send, sent = 0;
1053
1054         /* Handle implied connection establishment */
1055         if (unlikely(dest)) {
1056                 rc = tipc_sendmsg(iocb, sock, m, dsz);
1057                 if (dsz && (dsz == rc))
1058                         tsk->sent_unacked = 1;
1059                 return rc;
1060         }
1061         if (dsz > (uint)INT_MAX)
1062                 return -EMSGSIZE;
1063
1064         if (iocb)
1065                 lock_sock(sk);
1066
1067         if (unlikely(sock->state != SS_CONNECTED)) {
1068                 if (sock->state == SS_DISCONNECTING)
1069                         rc = -EPIPE;
1070                 else
1071                         rc = -ENOTCONN;
1072                 goto exit;
1073         }
1074
1075         timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
1076         dnode = tsk_peer_node(tsk);
1077
1078 next:
1079         mtu = tsk->max_pkt;
1080         send = min_t(uint, dsz - sent, TIPC_MAX_USER_MSG_SIZE);
1081         __skb_queue_head_init(&head);
1082         rc = tipc_msg_build(net, mhdr, m, sent, send, mtu, &head);
1083         if (unlikely(rc < 0))
1084                 goto exit;
1085         do {
1086                 if (likely(!tsk_conn_cong(tsk))) {
1087                         rc = tipc_link_xmit(net, &head, dnode, portid);
1088                         if (likely(!rc)) {
1089                                 tsk->sent_unacked++;
1090                                 sent += send;
1091                                 if (sent == dsz)
1092                                         break;
1093                                 goto next;
1094                         }
1095                         if (rc == -EMSGSIZE) {
1096                                 tsk->max_pkt = tipc_node_get_mtu(net, dnode,
1097                                                                  portid);
1098                                 goto next;
1099                         }
1100                         if (rc != -ELINKCONG)
1101                                 break;
1102                         tsk->link_cong = 1;
1103                 }
1104                 rc = tipc_wait_for_sndpkt(sock, &timeo);
1105                 if (rc)
1106                         __skb_queue_purge(&head);
1107         } while (!rc);
1108 exit:
1109         if (iocb)
1110                 release_sock(sk);
1111         return sent ? sent : rc;
1112 }
1113
1114 /**
1115  * tipc_send_packet - send a connection-oriented message
1116  * @iocb: if NULL, indicates that socket lock is already held
1117  * @sock: socket structure
1118  * @m: message to send
1119  * @dsz: length of data to be transmitted
1120  *
1121  * Used for SOCK_SEQPACKET messages.
1122  *
1123  * Returns the number of bytes sent on success, or errno otherwise
1124  */
1125 static int tipc_send_packet(struct kiocb *iocb, struct socket *sock,
1126                             struct msghdr *m, size_t dsz)
1127 {
1128         if (dsz > TIPC_MAX_USER_MSG_SIZE)
1129                 return -EMSGSIZE;
1130
1131         return tipc_send_stream(iocb, sock, m, dsz);
1132 }
1133
1134 /* tipc_sk_finish_conn - complete the setup of a connection
1135  */
1136 static void tipc_sk_finish_conn(struct tipc_sock *tsk, u32 peer_port,
1137                                 u32 peer_node)
1138 {
1139         struct net *net = sock_net(&tsk->sk);
1140         struct tipc_msg *msg = &tsk->phdr;
1141
1142         msg_set_destnode(msg, peer_node);
1143         msg_set_destport(msg, peer_port);
1144         msg_set_type(msg, TIPC_CONN_MSG);
1145         msg_set_lookup_scope(msg, 0);
1146         msg_set_hdr_sz(msg, SHORT_H_SIZE);
1147
1148         tsk->probing_intv = CONN_PROBING_INTERVAL;
1149         tsk->probing_state = TIPC_CONN_OK;
1150         tsk->connected = 1;
1151         if (!mod_timer(&tsk->timer, jiffies + tsk->probing_intv))
1152                 sock_hold(&tsk->sk);
1153         tipc_node_add_conn(net, peer_node, tsk->portid, peer_port);
1154         tsk->max_pkt = tipc_node_get_mtu(net, peer_node, tsk->portid);
1155 }
1156
1157 /**
1158  * set_orig_addr - capture sender's address for received message
1159  * @m: descriptor for message info
1160  * @msg: received message header
1161  *
1162  * Note: Address is not captured if not requested by receiver.
1163  */
1164 static void set_orig_addr(struct msghdr *m, struct tipc_msg *msg)
1165 {
1166         DECLARE_SOCKADDR(struct sockaddr_tipc *, addr, m->msg_name);
1167
1168         if (addr) {
1169                 addr->family = AF_TIPC;
1170                 addr->addrtype = TIPC_ADDR_ID;
1171                 memset(&addr->addr, 0, sizeof(addr->addr));
1172                 addr->addr.id.ref = msg_origport(msg);
1173                 addr->addr.id.node = msg_orignode(msg);
1174                 addr->addr.name.domain = 0;     /* could leave uninitialized */
1175                 addr->scope = 0;                /* could leave uninitialized */
1176                 m->msg_namelen = sizeof(struct sockaddr_tipc);
1177         }
1178 }
1179
1180 /**
1181  * tipc_sk_anc_data_recv - optionally capture ancillary data for received message
1182  * @m: descriptor for message info
1183  * @msg: received message header
1184  * @tsk: TIPC port associated with message
1185  *
1186  * Note: Ancillary data is not captured if not requested by receiver.
1187  *
1188  * Returns 0 if successful, otherwise errno
1189  */
1190 static int tipc_sk_anc_data_recv(struct msghdr *m, struct tipc_msg *msg,
1191                                  struct tipc_sock *tsk)
1192 {
1193         u32 anc_data[3];
1194         u32 err;
1195         u32 dest_type;
1196         int has_name;
1197         int res;
1198
1199         if (likely(m->msg_controllen == 0))
1200                 return 0;
1201
1202         /* Optionally capture errored message object(s) */
1203         err = msg ? msg_errcode(msg) : 0;
1204         if (unlikely(err)) {
1205                 anc_data[0] = err;
1206                 anc_data[1] = msg_data_sz(msg);
1207                 res = put_cmsg(m, SOL_TIPC, TIPC_ERRINFO, 8, anc_data);
1208                 if (res)
1209                         return res;
1210                 if (anc_data[1]) {
1211                         res = put_cmsg(m, SOL_TIPC, TIPC_RETDATA, anc_data[1],
1212                                        msg_data(msg));
1213                         if (res)
1214                                 return res;
1215                 }
1216         }
1217
1218         /* Optionally capture message destination object */
1219         dest_type = msg ? msg_type(msg) : TIPC_DIRECT_MSG;
1220         switch (dest_type) {
1221         case TIPC_NAMED_MSG:
1222                 has_name = 1;
1223                 anc_data[0] = msg_nametype(msg);
1224                 anc_data[1] = msg_namelower(msg);
1225                 anc_data[2] = msg_namelower(msg);
1226                 break;
1227         case TIPC_MCAST_MSG:
1228                 has_name = 1;
1229                 anc_data[0] = msg_nametype(msg);
1230                 anc_data[1] = msg_namelower(msg);
1231                 anc_data[2] = msg_nameupper(msg);
1232                 break;
1233         case TIPC_CONN_MSG:
1234                 has_name = (tsk->conn_type != 0);
1235                 anc_data[0] = tsk->conn_type;
1236                 anc_data[1] = tsk->conn_instance;
1237                 anc_data[2] = tsk->conn_instance;
1238                 break;
1239         default:
1240                 has_name = 0;
1241         }
1242         if (has_name) {
1243                 res = put_cmsg(m, SOL_TIPC, TIPC_DESTNAME, 12, anc_data);
1244                 if (res)
1245                         return res;
1246         }
1247
1248         return 0;
1249 }
1250
1251 static void tipc_sk_send_ack(struct tipc_sock *tsk, uint ack)
1252 {
1253         struct net *net = sock_net(&tsk->sk);
1254         struct tipc_net *tn = net_generic(net, tipc_net_id);
1255         struct sk_buff *skb = NULL;
1256         struct tipc_msg *msg;
1257         u32 peer_port = tsk_peer_port(tsk);
1258         u32 dnode = tsk_peer_node(tsk);
1259
1260         if (!tsk->connected)
1261                 return;
1262         skb = tipc_msg_create(net, CONN_MANAGER, CONN_ACK, INT_H_SIZE, 0,
1263                               dnode, tn->own_addr, peer_port, tsk->portid,
1264                               TIPC_OK);
1265         if (!skb)
1266                 return;
1267         msg = buf_msg(skb);
1268         msg_set_msgcnt(msg, ack);
1269         tipc_link_xmit_skb(net, skb, dnode, msg_link_selector(msg));
1270 }
1271
1272 static int tipc_wait_for_rcvmsg(struct socket *sock, long *timeop)
1273 {
1274         struct sock *sk = sock->sk;
1275         DEFINE_WAIT(wait);
1276         long timeo = *timeop;
1277         int err;
1278
1279         for (;;) {
1280                 prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
1281                 if (timeo && skb_queue_empty(&sk->sk_receive_queue)) {
1282                         if (sock->state == SS_DISCONNECTING) {
1283                                 err = -ENOTCONN;
1284                                 break;
1285                         }
1286                         release_sock(sk);
1287                         timeo = schedule_timeout(timeo);
1288                         lock_sock(sk);
1289                 }
1290                 err = 0;
1291                 if (!skb_queue_empty(&sk->sk_receive_queue))
1292                         break;
1293                 err = sock_intr_errno(timeo);
1294                 if (signal_pending(current))
1295                         break;
1296                 err = -EAGAIN;
1297                 if (!timeo)
1298                         break;
1299         }
1300         finish_wait(sk_sleep(sk), &wait);
1301         *timeop = timeo;
1302         return err;
1303 }
1304
1305 /**
1306  * tipc_recvmsg - receive packet-oriented message
1307  * @iocb: (unused)
1308  * @m: descriptor for message info
1309  * @buf_len: total size of user buffer area
1310  * @flags: receive flags
1311  *
1312  * Used for SOCK_DGRAM, SOCK_RDM, and SOCK_SEQPACKET messages.
1313  * If the complete message doesn't fit in user area, truncate it.
1314  *
1315  * Returns size of returned message data, errno otherwise
1316  */
1317 static int tipc_recvmsg(struct kiocb *iocb, struct socket *sock,
1318                         struct msghdr *m, size_t buf_len, int flags)
1319 {
1320         struct sock *sk = sock->sk;
1321         struct tipc_sock *tsk = tipc_sk(sk);
1322         struct sk_buff *buf;
1323         struct tipc_msg *msg;
1324         long timeo;
1325         unsigned int sz;
1326         u32 err;
1327         int res;
1328
1329         /* Catch invalid receive requests */
1330         if (unlikely(!buf_len))
1331                 return -EINVAL;
1332
1333         lock_sock(sk);
1334
1335         if (unlikely(sock->state == SS_UNCONNECTED)) {
1336                 res = -ENOTCONN;
1337                 goto exit;
1338         }
1339
1340         timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
1341 restart:
1342
1343         /* Look for a message in receive queue; wait if necessary */
1344         res = tipc_wait_for_rcvmsg(sock, &timeo);
1345         if (res)
1346                 goto exit;
1347
1348         /* Look at first message in receive queue */
1349         buf = skb_peek(&sk->sk_receive_queue);
1350         msg = buf_msg(buf);
1351         sz = msg_data_sz(msg);
1352         err = msg_errcode(msg);
1353
1354         /* Discard an empty non-errored message & try again */
1355         if ((!sz) && (!err)) {
1356                 tsk_advance_rx_queue(sk);
1357                 goto restart;
1358         }
1359
1360         /* Capture sender's address (optional) */
1361         set_orig_addr(m, msg);
1362
1363         /* Capture ancillary data (optional) */
1364         res = tipc_sk_anc_data_recv(m, msg, tsk);
1365         if (res)
1366                 goto exit;
1367
1368         /* Capture message data (if valid) & compute return value (always) */
1369         if (!err) {
1370                 if (unlikely(buf_len < sz)) {
1371                         sz = buf_len;
1372                         m->msg_flags |= MSG_TRUNC;
1373                 }
1374                 res = skb_copy_datagram_msg(buf, msg_hdr_sz(msg), m, sz);
1375                 if (res)
1376                         goto exit;
1377                 res = sz;
1378         } else {
1379                 if ((sock->state == SS_READY) ||
1380                     ((err == TIPC_CONN_SHUTDOWN) || m->msg_control))
1381                         res = 0;
1382                 else
1383                         res = -ECONNRESET;
1384         }
1385
1386         /* Consume received message (optional) */
1387         if (likely(!(flags & MSG_PEEK))) {
1388                 if ((sock->state != SS_READY) &&
1389                     (++tsk->rcv_unacked >= TIPC_CONNACK_INTV)) {
1390                         tipc_sk_send_ack(tsk, tsk->rcv_unacked);
1391                         tsk->rcv_unacked = 0;
1392                 }
1393                 tsk_advance_rx_queue(sk);
1394         }
1395 exit:
1396         release_sock(sk);
1397         return res;
1398 }
1399
1400 /**
1401  * tipc_recv_stream - receive stream-oriented data
1402  * @iocb: (unused)
1403  * @m: descriptor for message info
1404  * @buf_len: total size of user buffer area
1405  * @flags: receive flags
1406  *
1407  * Used for SOCK_STREAM messages only.  If not enough data is available
1408  * will optionally wait for more; never truncates data.
1409  *
1410  * Returns size of returned message data, errno otherwise
1411  */
1412 static int tipc_recv_stream(struct kiocb *iocb, struct socket *sock,
1413                             struct msghdr *m, size_t buf_len, int flags)
1414 {
1415         struct sock *sk = sock->sk;
1416         struct tipc_sock *tsk = tipc_sk(sk);
1417         struct sk_buff *buf;
1418         struct tipc_msg *msg;
1419         long timeo;
1420         unsigned int sz;
1421         int sz_to_copy, target, needed;
1422         int sz_copied = 0;
1423         u32 err;
1424         int res = 0;
1425
1426         /* Catch invalid receive attempts */
1427         if (unlikely(!buf_len))
1428                 return -EINVAL;
1429
1430         lock_sock(sk);
1431
1432         if (unlikely(sock->state == SS_UNCONNECTED)) {
1433                 res = -ENOTCONN;
1434                 goto exit;
1435         }
1436
1437         target = sock_rcvlowat(sk, flags & MSG_WAITALL, buf_len);
1438         timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
1439
1440 restart:
1441         /* Look for a message in receive queue; wait if necessary */
1442         res = tipc_wait_for_rcvmsg(sock, &timeo);
1443         if (res)
1444                 goto exit;
1445
1446         /* Look at first message in receive queue */
1447         buf = skb_peek(&sk->sk_receive_queue);
1448         msg = buf_msg(buf);
1449         sz = msg_data_sz(msg);
1450         err = msg_errcode(msg);
1451
1452         /* Discard an empty non-errored message & try again */
1453         if ((!sz) && (!err)) {
1454                 tsk_advance_rx_queue(sk);
1455                 goto restart;
1456         }
1457
1458         /* Optionally capture sender's address & ancillary data of first msg */
1459         if (sz_copied == 0) {
1460                 set_orig_addr(m, msg);
1461                 res = tipc_sk_anc_data_recv(m, msg, tsk);
1462                 if (res)
1463                         goto exit;
1464         }
1465
1466         /* Capture message data (if valid) & compute return value (always) */
1467         if (!err) {
1468                 u32 offset = (u32)(unsigned long)(TIPC_SKB_CB(buf)->handle);
1469
1470                 sz -= offset;
1471                 needed = (buf_len - sz_copied);
1472                 sz_to_copy = (sz <= needed) ? sz : needed;
1473
1474                 res = skb_copy_datagram_msg(buf, msg_hdr_sz(msg) + offset,
1475                                             m, sz_to_copy);
1476                 if (res)
1477                         goto exit;
1478
1479                 sz_copied += sz_to_copy;
1480
1481                 if (sz_to_copy < sz) {
1482                         if (!(flags & MSG_PEEK))
1483                                 TIPC_SKB_CB(buf)->handle =
1484                                 (void *)(unsigned long)(offset + sz_to_copy);
1485                         goto exit;
1486                 }
1487         } else {
1488                 if (sz_copied != 0)
1489                         goto exit; /* can't add error msg to valid data */
1490
1491                 if ((err == TIPC_CONN_SHUTDOWN) || m->msg_control)
1492                         res = 0;
1493                 else
1494                         res = -ECONNRESET;
1495         }
1496
1497         /* Consume received message (optional) */
1498         if (likely(!(flags & MSG_PEEK))) {
1499                 if (unlikely(++tsk->rcv_unacked >= TIPC_CONNACK_INTV)) {
1500                         tipc_sk_send_ack(tsk, tsk->rcv_unacked);
1501                         tsk->rcv_unacked = 0;
1502                 }
1503                 tsk_advance_rx_queue(sk);
1504         }
1505
1506         /* Loop around if more data is required */
1507         if ((sz_copied < buf_len) &&    /* didn't get all requested data */
1508             (!skb_queue_empty(&sk->sk_receive_queue) ||
1509             (sz_copied < target)) &&    /* and more is ready or required */
1510             (!(flags & MSG_PEEK)) &&    /* and aren't just peeking at data */
1511             (!err))                     /* and haven't reached a FIN */
1512                 goto restart;
1513
1514 exit:
1515         release_sock(sk);
1516         return sz_copied ? sz_copied : res;
1517 }
1518
1519 /**
1520  * tipc_write_space - wake up thread if port congestion is released
1521  * @sk: socket
1522  */
1523 static void tipc_write_space(struct sock *sk)
1524 {
1525         struct socket_wq *wq;
1526
1527         rcu_read_lock();
1528         wq = rcu_dereference(sk->sk_wq);
1529         if (wq_has_sleeper(wq))
1530                 wake_up_interruptible_sync_poll(&wq->wait, POLLOUT |
1531                                                 POLLWRNORM | POLLWRBAND);
1532         rcu_read_unlock();
1533 }
1534
1535 /**
1536  * tipc_data_ready - wake up threads to indicate messages have been received
1537  * @sk: socket
1538  * @len: the length of messages
1539  */
1540 static void tipc_data_ready(struct sock *sk)
1541 {
1542         struct socket_wq *wq;
1543
1544         rcu_read_lock();
1545         wq = rcu_dereference(sk->sk_wq);
1546         if (wq_has_sleeper(wq))
1547                 wake_up_interruptible_sync_poll(&wq->wait, POLLIN |
1548                                                 POLLRDNORM | POLLRDBAND);
1549         rcu_read_unlock();
1550 }
1551
1552 /**
1553  * filter_connect - Handle all incoming messages for a connection-based socket
1554  * @tsk: TIPC socket
1555  * @msg: message
1556  *
1557  * Returns 0 (TIPC_OK) if everything ok, -TIPC_ERR_NO_PORT otherwise
1558  */
1559 static int filter_connect(struct tipc_sock *tsk, struct sk_buff **buf)
1560 {
1561         struct sock *sk = &tsk->sk;
1562         struct net *net = sock_net(sk);
1563         struct socket *sock = sk->sk_socket;
1564         struct tipc_msg *msg = buf_msg(*buf);
1565         int retval = -TIPC_ERR_NO_PORT;
1566
1567         if (msg_mcast(msg))
1568                 return retval;
1569
1570         switch ((int)sock->state) {
1571         case SS_CONNECTED:
1572                 /* Accept only connection-based messages sent by peer */
1573                 if (tsk_peer_msg(tsk, msg)) {
1574                         if (unlikely(msg_errcode(msg))) {
1575                                 sock->state = SS_DISCONNECTING;
1576                                 tsk->connected = 0;
1577                                 /* let timer expire on it's own */
1578                                 tipc_node_remove_conn(net, tsk_peer_node(tsk),
1579                                                       tsk->portid);
1580                         }
1581                         retval = TIPC_OK;
1582                 }
1583                 break;
1584         case SS_CONNECTING:
1585                 /* Accept only ACK or NACK message */
1586
1587                 if (unlikely(!msg_connected(msg)))
1588                         break;
1589
1590                 if (unlikely(msg_errcode(msg))) {
1591                         sock->state = SS_DISCONNECTING;
1592                         sk->sk_err = ECONNREFUSED;
1593                         retval = TIPC_OK;
1594                         break;
1595                 }
1596
1597                 if (unlikely(msg_importance(msg) > TIPC_CRITICAL_IMPORTANCE)) {
1598                         sock->state = SS_DISCONNECTING;
1599                         sk->sk_err = EINVAL;
1600                         retval = TIPC_OK;
1601                         break;
1602                 }
1603
1604                 tipc_sk_finish_conn(tsk, msg_origport(msg), msg_orignode(msg));
1605                 msg_set_importance(&tsk->phdr, msg_importance(msg));
1606                 sock->state = SS_CONNECTED;
1607
1608                 /* If an incoming message is an 'ACK-', it should be
1609                  * discarded here because it doesn't contain useful
1610                  * data. In addition, we should try to wake up
1611                  * connect() routine if sleeping.
1612                  */
1613                 if (msg_data_sz(msg) == 0) {
1614                         kfree_skb(*buf);
1615                         *buf = NULL;
1616                         if (waitqueue_active(sk_sleep(sk)))
1617                                 wake_up_interruptible(sk_sleep(sk));
1618                 }
1619                 retval = TIPC_OK;
1620                 break;
1621         case SS_LISTENING:
1622         case SS_UNCONNECTED:
1623                 /* Accept only SYN message */
1624                 if (!msg_connected(msg) && !(msg_errcode(msg)))
1625                         retval = TIPC_OK;
1626                 break;
1627         case SS_DISCONNECTING:
1628                 break;
1629         default:
1630                 pr_err("Unknown socket state %u\n", sock->state);
1631         }
1632         return retval;
1633 }
1634
1635 /**
1636  * rcvbuf_limit - get proper overload limit of socket receive queue
1637  * @sk: socket
1638  * @buf: message
1639  *
1640  * For all connection oriented messages, irrespective of importance,
1641  * the default overload value (i.e. 67MB) is set as limit.
1642  *
1643  * For all connectionless messages, by default new queue limits are
1644  * as belows:
1645  *
1646  * TIPC_LOW_IMPORTANCE       (4 MB)
1647  * TIPC_MEDIUM_IMPORTANCE    (8 MB)
1648  * TIPC_HIGH_IMPORTANCE      (16 MB)
1649  * TIPC_CRITICAL_IMPORTANCE  (32 MB)
1650  *
1651  * Returns overload limit according to corresponding message importance
1652  */
1653 static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *buf)
1654 {
1655         struct tipc_msg *msg = buf_msg(buf);
1656
1657         if (msg_connected(msg))
1658                 return sysctl_tipc_rmem[2];
1659
1660         return sk->sk_rcvbuf >> TIPC_CRITICAL_IMPORTANCE <<
1661                 msg_importance(msg);
1662 }
1663
1664 /**
1665  * filter_rcv - validate incoming message
1666  * @sk: socket
1667  * @buf: message
1668  *
1669  * Enqueues message on receive queue if acceptable; optionally handles
1670  * disconnect indication for a connected socket.
1671  *
1672  * Called with socket lock already taken; port lock may also be taken.
1673  *
1674  * Returns 0 (TIPC_OK) if message was consumed, -TIPC error code if message
1675  * to be rejected, 1 (TIPC_FWD_MSG) if (CONN_MANAGER) message to be forwarded
1676  */
1677 static int filter_rcv(struct sock *sk, struct sk_buff *buf)
1678 {
1679         struct socket *sock = sk->sk_socket;
1680         struct tipc_sock *tsk = tipc_sk(sk);
1681         struct tipc_msg *msg = buf_msg(buf);
1682         unsigned int limit = rcvbuf_limit(sk, buf);
1683         u32 onode;
1684         int rc = TIPC_OK;
1685
1686         if (unlikely(msg_user(msg) == CONN_MANAGER))
1687                 return tipc_sk_proto_rcv(tsk, &onode, buf);
1688
1689         if (unlikely(msg_user(msg) == SOCK_WAKEUP)) {
1690                 kfree_skb(buf);
1691                 tsk->link_cong = 0;
1692                 sk->sk_write_space(sk);
1693                 return TIPC_OK;
1694         }
1695
1696         /* Reject message if it is wrong sort of message for socket */
1697         if (msg_type(msg) > TIPC_DIRECT_MSG)
1698                 return -TIPC_ERR_NO_PORT;
1699
1700         if (sock->state == SS_READY) {
1701                 if (msg_connected(msg))
1702                         return -TIPC_ERR_NO_PORT;
1703         } else {
1704                 rc = filter_connect(tsk, &buf);
1705                 if (rc != TIPC_OK || buf == NULL)
1706                         return rc;
1707         }
1708
1709         /* Reject message if there isn't room to queue it */
1710         if (sk_rmem_alloc_get(sk) + buf->truesize >= limit)
1711                 return -TIPC_ERR_OVERLOAD;
1712
1713         /* Enqueue message */
1714         TIPC_SKB_CB(buf)->handle = NULL;
1715         __skb_queue_tail(&sk->sk_receive_queue, buf);
1716         skb_set_owner_r(buf, sk);
1717
1718         sk->sk_data_ready(sk);
1719         return TIPC_OK;
1720 }
1721
1722 /**
1723  * tipc_backlog_rcv - handle incoming message from backlog queue
1724  * @sk: socket
1725  * @skb: message
1726  *
1727  * Caller must hold socket lock, but not port lock.
1728  *
1729  * Returns 0
1730  */
1731 static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb)
1732 {
1733         int rc;
1734         u32 onode;
1735         struct tipc_sock *tsk = tipc_sk(sk);
1736         struct net *net = sock_net(sk);
1737         uint truesize = skb->truesize;
1738
1739         rc = filter_rcv(sk, skb);
1740
1741         if (likely(!rc)) {
1742                 if (atomic_read(&tsk->dupl_rcvcnt) < TIPC_CONN_OVERLOAD_LIMIT)
1743                         atomic_add(truesize, &tsk->dupl_rcvcnt);
1744                 return 0;
1745         }
1746
1747         if ((rc < 0) && !tipc_msg_reverse(net, skb, &onode, -rc))
1748                 return 0;
1749
1750         tipc_link_xmit_skb(net, skb, onode, 0);
1751
1752         return 0;
1753 }
1754
1755 /**
1756  * tipc_sk_rcv - handle incoming message
1757  * @skb: buffer containing arriving message
1758  * Consumes buffer
1759  * Returns 0 if success, or errno: -EHOSTUNREACH
1760  */
1761 int tipc_sk_rcv(struct net *net, struct sk_buff *skb)
1762 {
1763         struct tipc_sock *tsk;
1764         struct sock *sk;
1765         u32 dport = msg_destport(buf_msg(skb));
1766         int rc = TIPC_OK;
1767         uint limit;
1768         u32 dnode;
1769
1770         /* Validate destination and message */
1771         tsk = tipc_sk_lookup(net, dport);
1772         if (unlikely(!tsk)) {
1773                 rc = tipc_msg_eval(net, skb, &dnode);
1774                 goto exit;
1775         }
1776         sk = &tsk->sk;
1777
1778         /* Queue message */
1779         spin_lock_bh(&sk->sk_lock.slock);
1780
1781         if (!sock_owned_by_user(sk)) {
1782                 rc = filter_rcv(sk, skb);
1783         } else {
1784                 if (sk->sk_backlog.len == 0)
1785                         atomic_set(&tsk->dupl_rcvcnt, 0);
1786                 limit = rcvbuf_limit(sk, skb) + atomic_read(&tsk->dupl_rcvcnt);
1787                 if (sk_add_backlog(sk, skb, limit))
1788                         rc = -TIPC_ERR_OVERLOAD;
1789         }
1790         spin_unlock_bh(&sk->sk_lock.slock);
1791         sock_put(sk);
1792         if (likely(!rc))
1793                 return 0;
1794 exit:
1795         if ((rc < 0) && !tipc_msg_reverse(net, skb, &dnode, -rc))
1796                 return -EHOSTUNREACH;
1797
1798         tipc_link_xmit_skb(net, skb, dnode, 0);
1799         return (rc < 0) ? -EHOSTUNREACH : 0;
1800 }
1801
1802 static int tipc_wait_for_connect(struct socket *sock, long *timeo_p)
1803 {
1804         struct sock *sk = sock->sk;
1805         DEFINE_WAIT(wait);
1806         int done;
1807
1808         do {
1809                 int err = sock_error(sk);
1810                 if (err)
1811                         return err;
1812                 if (!*timeo_p)
1813                         return -ETIMEDOUT;
1814                 if (signal_pending(current))
1815                         return sock_intr_errno(*timeo_p);
1816
1817                 prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
1818                 done = sk_wait_event(sk, timeo_p, sock->state != SS_CONNECTING);
1819                 finish_wait(sk_sleep(sk), &wait);
1820         } while (!done);
1821         return 0;
1822 }
1823
1824 /**
1825  * tipc_connect - establish a connection to another TIPC port
1826  * @sock: socket structure
1827  * @dest: socket address for destination port
1828  * @destlen: size of socket address data structure
1829  * @flags: file-related flags associated with socket
1830  *
1831  * Returns 0 on success, errno otherwise
1832  */
1833 static int tipc_connect(struct socket *sock, struct sockaddr *dest,
1834                         int destlen, int flags)
1835 {
1836         struct sock *sk = sock->sk;
1837         struct sockaddr_tipc *dst = (struct sockaddr_tipc *)dest;
1838         struct msghdr m = {NULL,};
1839         long timeout = (flags & O_NONBLOCK) ? 0 : tipc_sk(sk)->conn_timeout;
1840         socket_state previous;
1841         int res;
1842
1843         lock_sock(sk);
1844
1845         /* For now, TIPC does not allow use of connect() with DGRAM/RDM types */
1846         if (sock->state == SS_READY) {
1847                 res = -EOPNOTSUPP;
1848                 goto exit;
1849         }
1850
1851         /*
1852          * Reject connection attempt using multicast address
1853          *
1854          * Note: send_msg() validates the rest of the address fields,
1855          *       so there's no need to do it here
1856          */
1857         if (dst->addrtype == TIPC_ADDR_MCAST) {
1858                 res = -EINVAL;
1859                 goto exit;
1860         }
1861
1862         previous = sock->state;
1863         switch (sock->state) {
1864         case SS_UNCONNECTED:
1865                 /* Send a 'SYN-' to destination */
1866                 m.msg_name = dest;
1867                 m.msg_namelen = destlen;
1868
1869                 /* If connect is in non-blocking case, set MSG_DONTWAIT to
1870                  * indicate send_msg() is never blocked.
1871                  */
1872                 if (!timeout)
1873                         m.msg_flags = MSG_DONTWAIT;
1874
1875                 res = tipc_sendmsg(NULL, sock, &m, 0);
1876                 if ((res < 0) && (res != -EWOULDBLOCK))
1877                         goto exit;
1878
1879                 /* Just entered SS_CONNECTING state; the only
1880                  * difference is that return value in non-blocking
1881                  * case is EINPROGRESS, rather than EALREADY.
1882                  */
1883                 res = -EINPROGRESS;
1884         case SS_CONNECTING:
1885                 if (previous == SS_CONNECTING)
1886                         res = -EALREADY;
1887                 if (!timeout)
1888                         goto exit;
1889                 timeout = msecs_to_jiffies(timeout);
1890                 /* Wait until an 'ACK' or 'RST' arrives, or a timeout occurs */
1891                 res = tipc_wait_for_connect(sock, &timeout);
1892                 break;
1893         case SS_CONNECTED:
1894                 res = -EISCONN;
1895                 break;
1896         default:
1897                 res = -EINVAL;
1898                 break;
1899         }
1900 exit:
1901         release_sock(sk);
1902         return res;
1903 }
1904
1905 /**
1906  * tipc_listen - allow socket to listen for incoming connections
1907  * @sock: socket structure
1908  * @len: (unused)
1909  *
1910  * Returns 0 on success, errno otherwise
1911  */
1912 static int tipc_listen(struct socket *sock, int len)
1913 {
1914         struct sock *sk = sock->sk;
1915         int res;
1916
1917         lock_sock(sk);
1918
1919         if (sock->state != SS_UNCONNECTED)
1920                 res = -EINVAL;
1921         else {
1922                 sock->state = SS_LISTENING;
1923                 res = 0;
1924         }
1925
1926         release_sock(sk);
1927         return res;
1928 }
1929
1930 static int tipc_wait_for_accept(struct socket *sock, long timeo)
1931 {
1932         struct sock *sk = sock->sk;
1933         DEFINE_WAIT(wait);
1934         int err;
1935
1936         /* True wake-one mechanism for incoming connections: only
1937          * one process gets woken up, not the 'whole herd'.
1938          * Since we do not 'race & poll' for established sockets
1939          * anymore, the common case will execute the loop only once.
1940         */
1941         for (;;) {
1942                 prepare_to_wait_exclusive(sk_sleep(sk), &wait,
1943                                           TASK_INTERRUPTIBLE);
1944                 if (timeo && skb_queue_empty(&sk->sk_receive_queue)) {
1945                         release_sock(sk);
1946                         timeo = schedule_timeout(timeo);
1947                         lock_sock(sk);
1948                 }
1949                 err = 0;
1950                 if (!skb_queue_empty(&sk->sk_receive_queue))
1951                         break;
1952                 err = -EINVAL;
1953                 if (sock->state != SS_LISTENING)
1954                         break;
1955                 err = sock_intr_errno(timeo);
1956                 if (signal_pending(current))
1957                         break;
1958                 err = -EAGAIN;
1959                 if (!timeo)
1960                         break;
1961         }
1962         finish_wait(sk_sleep(sk), &wait);
1963         return err;
1964 }
1965
1966 /**
1967  * tipc_accept - wait for connection request
1968  * @sock: listening socket
1969  * @newsock: new socket that is to be connected
1970  * @flags: file-related flags associated with socket
1971  *
1972  * Returns 0 on success, errno otherwise
1973  */
1974 static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags)
1975 {
1976         struct sock *new_sk, *sk = sock->sk;
1977         struct sk_buff *buf;
1978         struct tipc_sock *new_tsock;
1979         struct tipc_msg *msg;
1980         long timeo;
1981         int res;
1982
1983         lock_sock(sk);
1984
1985         if (sock->state != SS_LISTENING) {
1986                 res = -EINVAL;
1987                 goto exit;
1988         }
1989         timeo = sock_rcvtimeo(sk, flags & O_NONBLOCK);
1990         res = tipc_wait_for_accept(sock, timeo);
1991         if (res)
1992                 goto exit;
1993
1994         buf = skb_peek(&sk->sk_receive_queue);
1995
1996         res = tipc_sk_create(sock_net(sock->sk), new_sock, 0, 1);
1997         if (res)
1998                 goto exit;
1999
2000         new_sk = new_sock->sk;
2001         new_tsock = tipc_sk(new_sk);
2002         msg = buf_msg(buf);
2003
2004         /* we lock on new_sk; but lockdep sees the lock on sk */
2005         lock_sock_nested(new_sk, SINGLE_DEPTH_NESTING);
2006
2007         /*
2008          * Reject any stray messages received by new socket
2009          * before the socket lock was taken (very, very unlikely)
2010          */
2011         tsk_rej_rx_queue(new_sk);
2012
2013         /* Connect new socket to it's peer */
2014         tipc_sk_finish_conn(new_tsock, msg_origport(msg), msg_orignode(msg));
2015         new_sock->state = SS_CONNECTED;
2016
2017         tsk_set_importance(new_tsock, msg_importance(msg));
2018         if (msg_named(msg)) {
2019                 new_tsock->conn_type = msg_nametype(msg);
2020                 new_tsock->conn_instance = msg_nameinst(msg);
2021         }
2022
2023         /*
2024          * Respond to 'SYN-' by discarding it & returning 'ACK'-.
2025          * Respond to 'SYN+' by queuing it on new socket.
2026          */
2027         if (!msg_data_sz(msg)) {
2028                 struct msghdr m = {NULL,};
2029
2030                 tsk_advance_rx_queue(sk);
2031                 tipc_send_packet(NULL, new_sock, &m, 0);
2032         } else {
2033                 __skb_dequeue(&sk->sk_receive_queue);
2034                 __skb_queue_head(&new_sk->sk_receive_queue, buf);
2035                 skb_set_owner_r(buf, new_sk);
2036         }
2037         release_sock(new_sk);
2038 exit:
2039         release_sock(sk);
2040         return res;
2041 }
2042
2043 /**
2044  * tipc_shutdown - shutdown socket connection
2045  * @sock: socket structure
2046  * @how: direction to close (must be SHUT_RDWR)
2047  *
2048  * Terminates connection (if necessary), then purges socket's receive queue.
2049  *
2050  * Returns 0 on success, errno otherwise
2051  */
2052 static int tipc_shutdown(struct socket *sock, int how)
2053 {
2054         struct sock *sk = sock->sk;
2055         struct net *net = sock_net(sk);
2056         struct tipc_net *tn = net_generic(net, tipc_net_id);
2057         struct tipc_sock *tsk = tipc_sk(sk);
2058         struct sk_buff *skb;
2059         u32 dnode;
2060         int res;
2061
2062         if (how != SHUT_RDWR)
2063                 return -EINVAL;
2064
2065         lock_sock(sk);
2066
2067         switch (sock->state) {
2068         case SS_CONNECTING:
2069         case SS_CONNECTED:
2070
2071 restart:
2072                 /* Disconnect and send a 'FIN+' or 'FIN-' message to peer */
2073                 skb = __skb_dequeue(&sk->sk_receive_queue);
2074                 if (skb) {
2075                         if (TIPC_SKB_CB(skb)->handle != NULL) {
2076                                 kfree_skb(skb);
2077                                 goto restart;
2078                         }
2079                         if (tipc_msg_reverse(net, skb, &dnode,
2080                                              TIPC_CONN_SHUTDOWN))
2081                                 tipc_link_xmit_skb(net, skb, dnode,
2082                                                    tsk->portid);
2083                         tipc_node_remove_conn(net, dnode, tsk->portid);
2084                 } else {
2085                         dnode = tsk_peer_node(tsk);
2086                         skb = tipc_msg_create(net, TIPC_CRITICAL_IMPORTANCE,
2087                                               TIPC_CONN_MSG, SHORT_H_SIZE,
2088                                               0, dnode, tn->own_addr,
2089                                               tsk_peer_port(tsk),
2090                                               tsk->portid, TIPC_CONN_SHUTDOWN);
2091                         tipc_link_xmit_skb(net, skb, dnode, tsk->portid);
2092                 }
2093                 tsk->connected = 0;
2094                 sock->state = SS_DISCONNECTING;
2095                 tipc_node_remove_conn(net, dnode, tsk->portid);
2096                 /* fall through */
2097
2098         case SS_DISCONNECTING:
2099
2100                 /* Discard any unreceived messages */
2101                 __skb_queue_purge(&sk->sk_receive_queue);
2102
2103                 /* Wake up anyone sleeping in poll */
2104                 sk->sk_state_change(sk);
2105                 res = 0;
2106                 break;
2107
2108         default:
2109                 res = -ENOTCONN;
2110         }
2111
2112         release_sock(sk);
2113         return res;
2114 }
2115
2116 static void tipc_sk_timeout(unsigned long data)
2117 {
2118         struct tipc_sock *tsk = (struct tipc_sock *)data;
2119         struct sock *sk = &tsk->sk;
2120         struct net *net = sock_net(sk);
2121         struct tipc_net *tn = net_generic(net, tipc_net_id);
2122         struct sk_buff *skb = NULL;
2123         u32 peer_port, peer_node;
2124
2125         bh_lock_sock(sk);
2126         if (!tsk->connected) {
2127                 bh_unlock_sock(sk);
2128                 goto exit;
2129         }
2130         peer_port = tsk_peer_port(tsk);
2131         peer_node = tsk_peer_node(tsk);
2132
2133         if (tsk->probing_state == TIPC_CONN_PROBING) {
2134                 /* Previous probe not answered -> self abort */
2135                 skb = tipc_msg_create(net, TIPC_CRITICAL_IMPORTANCE,
2136                                       TIPC_CONN_MSG, SHORT_H_SIZE, 0,
2137                                       tn->own_addr, peer_node, tsk->portid,
2138                                       peer_port, TIPC_ERR_NO_PORT);
2139         } else {
2140                 skb = tipc_msg_create(net, CONN_MANAGER, CONN_PROBE, INT_H_SIZE,
2141                                       0, peer_node, tn->own_addr,
2142                                       peer_port, tsk->portid, TIPC_OK);
2143                 tsk->probing_state = TIPC_CONN_PROBING;
2144                 if (!mod_timer(&tsk->timer, jiffies + tsk->probing_intv))
2145                         sock_hold(sk);
2146         }
2147         bh_unlock_sock(sk);
2148         if (skb)
2149                 tipc_link_xmit_skb(sock_net(sk), skb, peer_node, tsk->portid);
2150 exit:
2151         sock_put(sk);
2152 }
2153
2154 static int tipc_sk_publish(struct tipc_sock *tsk, uint scope,
2155                            struct tipc_name_seq const *seq)
2156 {
2157         struct net *net = sock_net(&tsk->sk);
2158         struct publication *publ;
2159         u32 key;
2160
2161         if (tsk->connected)
2162                 return -EINVAL;
2163         key = tsk->portid + tsk->pub_count + 1;
2164         if (key == tsk->portid)
2165                 return -EADDRINUSE;
2166
2167         publ = tipc_nametbl_publish(net, seq->type, seq->lower, seq->upper,
2168                                     scope, tsk->portid, key);
2169         if (unlikely(!publ))
2170                 return -EINVAL;
2171
2172         list_add(&publ->pport_list, &tsk->publications);
2173         tsk->pub_count++;
2174         tsk->published = 1;
2175         return 0;
2176 }
2177
2178 static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope,
2179                             struct tipc_name_seq const *seq)
2180 {
2181         struct net *net = sock_net(&tsk->sk);
2182         struct publication *publ;
2183         struct publication *safe;
2184         int rc = -EINVAL;
2185
2186         list_for_each_entry_safe(publ, safe, &tsk->publications, pport_list) {
2187                 if (seq) {
2188                         if (publ->scope != scope)
2189                                 continue;
2190                         if (publ->type != seq->type)
2191                                 continue;
2192                         if (publ->lower != seq->lower)
2193                                 continue;
2194                         if (publ->upper != seq->upper)
2195                                 break;
2196                         tipc_nametbl_withdraw(net, publ->type, publ->lower,
2197                                               publ->ref, publ->key);
2198                         rc = 0;
2199                         break;
2200                 }
2201                 tipc_nametbl_withdraw(net, publ->type, publ->lower,
2202                                       publ->ref, publ->key);
2203                 rc = 0;
2204         }
2205         if (list_empty(&tsk->publications))
2206                 tsk->published = 0;
2207         return rc;
2208 }
2209
2210 static int tipc_sk_show(struct tipc_sock *tsk, char *buf,
2211                         int len, int full_id)
2212 {
2213         struct net *net = sock_net(&tsk->sk);
2214         struct tipc_net *tn = net_generic(net, tipc_net_id);
2215         struct publication *publ;
2216         int ret;
2217
2218         if (full_id)
2219                 ret = tipc_snprintf(buf, len, "<%u.%u.%u:%u>:",
2220                                     tipc_zone(tn->own_addr),
2221                                     tipc_cluster(tn->own_addr),
2222                                     tipc_node(tn->own_addr), tsk->portid);
2223         else
2224                 ret = tipc_snprintf(buf, len, "%-10u:", tsk->portid);
2225
2226         if (tsk->connected) {
2227                 u32 dport = tsk_peer_port(tsk);
2228                 u32 destnode = tsk_peer_node(tsk);
2229
2230                 ret += tipc_snprintf(buf + ret, len - ret,
2231                                      " connected to <%u.%u.%u:%u>",
2232                                      tipc_zone(destnode),
2233                                      tipc_cluster(destnode),
2234                                      tipc_node(destnode), dport);
2235                 if (tsk->conn_type != 0)
2236                         ret += tipc_snprintf(buf + ret, len - ret,
2237                                              " via {%u,%u}", tsk->conn_type,
2238                                              tsk->conn_instance);
2239         } else if (tsk->published) {
2240                 ret += tipc_snprintf(buf + ret, len - ret, " bound to");
2241                 list_for_each_entry(publ, &tsk->publications, pport_list) {
2242                         if (publ->lower == publ->upper)
2243                                 ret += tipc_snprintf(buf + ret, len - ret,
2244                                                      " {%u,%u}", publ->type,
2245                                                      publ->lower);
2246                         else
2247                                 ret += tipc_snprintf(buf + ret, len - ret,
2248                                                      " {%u,%u,%u}", publ->type,
2249                                                      publ->lower, publ->upper);
2250                 }
2251         }
2252         ret += tipc_snprintf(buf + ret, len - ret, "\n");
2253         return ret;
2254 }
2255
2256 struct sk_buff *tipc_sk_socks_show(struct net *net)
2257 {
2258         struct tipc_net *tn = net_generic(net, tipc_net_id);
2259         const struct bucket_table *tbl;
2260         struct rhash_head *pos;
2261         struct sk_buff *buf;
2262         struct tlv_desc *rep_tlv;
2263         char *pb;
2264         int pb_len;
2265         struct tipc_sock *tsk;
2266         int str_len = 0;
2267         int i;
2268
2269         buf = tipc_cfg_reply_alloc(TLV_SPACE(ULTRA_STRING_MAX_LEN));
2270         if (!buf)
2271                 return NULL;
2272         rep_tlv = (struct tlv_desc *)buf->data;
2273         pb = TLV_DATA(rep_tlv);
2274         pb_len = ULTRA_STRING_MAX_LEN;
2275
2276         rcu_read_lock();
2277         tbl = rht_dereference_rcu((&tn->sk_rht)->tbl, &tn->sk_rht);
2278         for (i = 0; i < tbl->size; i++) {
2279                 rht_for_each_entry_rcu(tsk, pos, tbl, i, node) {
2280                         spin_lock_bh(&tsk->sk.sk_lock.slock);
2281                         str_len += tipc_sk_show(tsk, pb + str_len,
2282                                                 pb_len - str_len, 0);
2283                         spin_unlock_bh(&tsk->sk.sk_lock.slock);
2284                 }
2285         }
2286         rcu_read_unlock();
2287
2288         str_len += 1;   /* for "\0" */
2289         skb_put(buf, TLV_SPACE(str_len));
2290         TLV_SET(rep_tlv, TIPC_TLV_ULTRA_STRING, NULL, str_len);
2291
2292         return buf;
2293 }
2294
2295 /* tipc_sk_reinit: set non-zero address in all existing sockets
2296  *                 when we go from standalone to network mode.
2297  */
2298 void tipc_sk_reinit(struct net *net)
2299 {
2300         struct tipc_net *tn = net_generic(net, tipc_net_id);
2301         const struct bucket_table *tbl;
2302         struct rhash_head *pos;
2303         struct tipc_sock *tsk;
2304         struct tipc_msg *msg;
2305         int i;
2306
2307         rcu_read_lock();
2308         tbl = rht_dereference_rcu((&tn->sk_rht)->tbl, &tn->sk_rht);
2309         for (i = 0; i < tbl->size; i++) {
2310                 rht_for_each_entry_rcu(tsk, pos, tbl, i, node) {
2311                         spin_lock_bh(&tsk->sk.sk_lock.slock);
2312                         msg = &tsk->phdr;
2313                         msg_set_prevnode(msg, tn->own_addr);
2314                         msg_set_orignode(msg, tn->own_addr);
2315                         spin_unlock_bh(&tsk->sk.sk_lock.slock);
2316                 }
2317         }
2318         rcu_read_unlock();
2319 }
2320
2321 static struct tipc_sock *tipc_sk_lookup(struct net *net, u32 portid)
2322 {
2323         struct tipc_net *tn = net_generic(net, tipc_net_id);
2324         struct tipc_sock *tsk;
2325
2326         rcu_read_lock();
2327         tsk = rhashtable_lookup(&tn->sk_rht, &portid);
2328         if (tsk)
2329                 sock_hold(&tsk->sk);
2330         rcu_read_unlock();
2331
2332         return tsk;
2333 }
2334
2335 static int tipc_sk_insert(struct tipc_sock *tsk)
2336 {
2337         struct sock *sk = &tsk->sk;
2338         struct net *net = sock_net(sk);
2339         struct tipc_net *tn = net_generic(net, tipc_net_id);
2340         u32 remaining = (TIPC_MAX_PORT - TIPC_MIN_PORT) + 1;
2341         u32 portid = prandom_u32() % remaining + TIPC_MIN_PORT;
2342
2343         while (remaining--) {
2344                 portid++;
2345                 if ((portid < TIPC_MIN_PORT) || (portid > TIPC_MAX_PORT))
2346                         portid = TIPC_MIN_PORT;
2347                 tsk->portid = portid;
2348                 sock_hold(&tsk->sk);
2349                 if (rhashtable_lookup_insert(&tn->sk_rht, &tsk->node))
2350                         return 0;
2351                 sock_put(&tsk->sk);
2352         }
2353
2354         return -1;
2355 }
2356
2357 static void tipc_sk_remove(struct tipc_sock *tsk)
2358 {
2359         struct sock *sk = &tsk->sk;
2360         struct tipc_net *tn = net_generic(sock_net(sk), tipc_net_id);
2361
2362         if (rhashtable_remove(&tn->sk_rht, &tsk->node)) {
2363                 WARN_ON(atomic_read(&sk->sk_refcnt) == 1);
2364                 __sock_put(sk);
2365         }
2366 }
2367
2368 int tipc_sk_rht_init(struct net *net)
2369 {
2370         struct tipc_net *tn = net_generic(net, tipc_net_id);
2371         struct rhashtable_params rht_params = {
2372                 .nelem_hint = 192,
2373                 .head_offset = offsetof(struct tipc_sock, node),
2374                 .key_offset = offsetof(struct tipc_sock, portid),
2375                 .key_len = sizeof(u32), /* portid */
2376                 .hashfn = jhash,
2377                 .max_shift = 20, /* 1M */
2378                 .min_shift = 8,  /* 256 */
2379                 .grow_decision = rht_grow_above_75,
2380                 .shrink_decision = rht_shrink_below_30,
2381         };
2382
2383         return rhashtable_init(&tn->sk_rht, &rht_params);
2384 }
2385
2386 void tipc_sk_rht_destroy(struct net *net)
2387 {
2388         struct tipc_net *tn = net_generic(net, tipc_net_id);
2389
2390         /* Wait for socket readers to complete */
2391         synchronize_net();
2392
2393         rhashtable_destroy(&tn->sk_rht);
2394 }
2395
2396 /**
2397  * tipc_setsockopt - set socket option
2398  * @sock: socket structure
2399  * @lvl: option level
2400  * @opt: option identifier
2401  * @ov: pointer to new option value
2402  * @ol: length of option value
2403  *
2404  * For stream sockets only, accepts and ignores all IPPROTO_TCP options
2405  * (to ease compatibility).
2406  *
2407  * Returns 0 on success, errno otherwise
2408  */
2409 static int tipc_setsockopt(struct socket *sock, int lvl, int opt,
2410                            char __user *ov, unsigned int ol)
2411 {
2412         struct sock *sk = sock->sk;
2413         struct tipc_sock *tsk = tipc_sk(sk);
2414         u32 value;
2415         int res;
2416
2417         if ((lvl == IPPROTO_TCP) && (sock->type == SOCK_STREAM))
2418                 return 0;
2419         if (lvl != SOL_TIPC)
2420                 return -ENOPROTOOPT;
2421         if (ol < sizeof(value))
2422                 return -EINVAL;
2423         res = get_user(value, (u32 __user *)ov);
2424         if (res)
2425                 return res;
2426
2427         lock_sock(sk);
2428
2429         switch (opt) {
2430         case TIPC_IMPORTANCE:
2431                 res = tsk_set_importance(tsk, value);
2432                 break;
2433         case TIPC_SRC_DROPPABLE:
2434                 if (sock->type != SOCK_STREAM)
2435                         tsk_set_unreliable(tsk, value);
2436                 else
2437                         res = -ENOPROTOOPT;
2438                 break;
2439         case TIPC_DEST_DROPPABLE:
2440                 tsk_set_unreturnable(tsk, value);
2441                 break;
2442         case TIPC_CONN_TIMEOUT:
2443                 tipc_sk(sk)->conn_timeout = value;
2444                 /* no need to set "res", since already 0 at this point */
2445                 break;
2446         default:
2447                 res = -EINVAL;
2448         }
2449
2450         release_sock(sk);
2451
2452         return res;
2453 }
2454
2455 /**
2456  * tipc_getsockopt - get socket option
2457  * @sock: socket structure
2458  * @lvl: option level
2459  * @opt: option identifier
2460  * @ov: receptacle for option value
2461  * @ol: receptacle for length of option value
2462  *
2463  * For stream sockets only, returns 0 length result for all IPPROTO_TCP options
2464  * (to ease compatibility).
2465  *
2466  * Returns 0 on success, errno otherwise
2467  */
2468 static int tipc_getsockopt(struct socket *sock, int lvl, int opt,
2469                            char __user *ov, int __user *ol)
2470 {
2471         struct sock *sk = sock->sk;
2472         struct tipc_sock *tsk = tipc_sk(sk);
2473         int len;
2474         u32 value;
2475         int res;
2476
2477         if ((lvl == IPPROTO_TCP) && (sock->type == SOCK_STREAM))
2478                 return put_user(0, ol);
2479         if (lvl != SOL_TIPC)
2480                 return -ENOPROTOOPT;
2481         res = get_user(len, ol);
2482         if (res)
2483                 return res;
2484
2485         lock_sock(sk);
2486
2487         switch (opt) {
2488         case TIPC_IMPORTANCE:
2489                 value = tsk_importance(tsk);
2490                 break;
2491         case TIPC_SRC_DROPPABLE:
2492                 value = tsk_unreliable(tsk);
2493                 break;
2494         case TIPC_DEST_DROPPABLE:
2495                 value = tsk_unreturnable(tsk);
2496                 break;
2497         case TIPC_CONN_TIMEOUT:
2498                 value = tsk->conn_timeout;
2499                 /* no need to set "res", since already 0 at this point */
2500                 break;
2501         case TIPC_NODE_RECVQ_DEPTH:
2502                 value = 0; /* was tipc_queue_size, now obsolete */
2503                 break;
2504         case TIPC_SOCK_RECVQ_DEPTH:
2505                 value = skb_queue_len(&sk->sk_receive_queue);
2506                 break;
2507         default:
2508                 res = -EINVAL;
2509         }
2510
2511         release_sock(sk);
2512
2513         if (res)
2514                 return res;     /* "get" failed */
2515
2516         if (len < sizeof(value))
2517                 return -EINVAL;
2518
2519         if (copy_to_user(ov, &value, sizeof(value)))
2520                 return -EFAULT;
2521
2522         return put_user(sizeof(value), ol);
2523 }
2524
2525 static int tipc_ioctl(struct socket *sock, unsigned int cmd, unsigned long arg)
2526 {
2527         struct sock *sk = sock->sk;
2528         struct tipc_sioc_ln_req lnr;
2529         void __user *argp = (void __user *)arg;
2530
2531         switch (cmd) {
2532         case SIOCGETLINKNAME:
2533                 if (copy_from_user(&lnr, argp, sizeof(lnr)))
2534                         return -EFAULT;
2535                 if (!tipc_node_get_linkname(sock_net(sk),
2536                                             lnr.bearer_id & 0xffff, lnr.peer,
2537                                             lnr.linkname, TIPC_MAX_LINK_NAME)) {
2538                         if (copy_to_user(argp, &lnr, sizeof(lnr)))
2539                                 return -EFAULT;
2540                         return 0;
2541                 }
2542                 return -EADDRNOTAVAIL;
2543         default:
2544                 return -ENOIOCTLCMD;
2545         }
2546 }
2547
2548 /* Protocol switches for the various types of TIPC sockets */
2549
2550 static const struct proto_ops msg_ops = {
2551         .owner          = THIS_MODULE,
2552         .family         = AF_TIPC,
2553         .release        = tipc_release,
2554         .bind           = tipc_bind,
2555         .connect        = tipc_connect,
2556         .socketpair     = sock_no_socketpair,
2557         .accept         = sock_no_accept,
2558         .getname        = tipc_getname,
2559         .poll           = tipc_poll,
2560         .ioctl          = tipc_ioctl,
2561         .listen         = sock_no_listen,
2562         .shutdown       = tipc_shutdown,
2563         .setsockopt     = tipc_setsockopt,
2564         .getsockopt     = tipc_getsockopt,
2565         .sendmsg        = tipc_sendmsg,
2566         .recvmsg        = tipc_recvmsg,
2567         .mmap           = sock_no_mmap,
2568         .sendpage       = sock_no_sendpage
2569 };
2570
2571 static const struct proto_ops packet_ops = {
2572         .owner          = THIS_MODULE,
2573         .family         = AF_TIPC,
2574         .release        = tipc_release,
2575         .bind           = tipc_bind,
2576         .connect        = tipc_connect,
2577         .socketpair     = sock_no_socketpair,
2578         .accept         = tipc_accept,
2579         .getname        = tipc_getname,
2580         .poll           = tipc_poll,
2581         .ioctl          = tipc_ioctl,
2582         .listen         = tipc_listen,
2583         .shutdown       = tipc_shutdown,
2584         .setsockopt     = tipc_setsockopt,
2585         .getsockopt     = tipc_getsockopt,
2586         .sendmsg        = tipc_send_packet,
2587         .recvmsg        = tipc_recvmsg,
2588         .mmap           = sock_no_mmap,
2589         .sendpage       = sock_no_sendpage
2590 };
2591
2592 static const struct proto_ops stream_ops = {
2593         .owner          = THIS_MODULE,
2594         .family         = AF_TIPC,
2595         .release        = tipc_release,
2596         .bind           = tipc_bind,
2597         .connect        = tipc_connect,
2598         .socketpair     = sock_no_socketpair,
2599         .accept         = tipc_accept,
2600         .getname        = tipc_getname,
2601         .poll           = tipc_poll,
2602         .ioctl          = tipc_ioctl,
2603         .listen         = tipc_listen,
2604         .shutdown       = tipc_shutdown,
2605         .setsockopt     = tipc_setsockopt,
2606         .getsockopt     = tipc_getsockopt,
2607         .sendmsg        = tipc_send_stream,
2608         .recvmsg        = tipc_recv_stream,
2609         .mmap           = sock_no_mmap,
2610         .sendpage       = sock_no_sendpage
2611 };
2612
2613 static const struct net_proto_family tipc_family_ops = {
2614         .owner          = THIS_MODULE,
2615         .family         = AF_TIPC,
2616         .create         = tipc_sk_create
2617 };
2618
2619 static struct proto tipc_proto = {
2620         .name           = "TIPC",
2621         .owner          = THIS_MODULE,
2622         .obj_size       = sizeof(struct tipc_sock),
2623         .sysctl_rmem    = sysctl_tipc_rmem
2624 };
2625
2626 static struct proto tipc_proto_kern = {
2627         .name           = "TIPC",
2628         .obj_size       = sizeof(struct tipc_sock),
2629         .sysctl_rmem    = sysctl_tipc_rmem
2630 };
2631
2632 /**
2633  * tipc_socket_init - initialize TIPC socket interface
2634  *
2635  * Returns 0 on success, errno otherwise
2636  */
2637 int tipc_socket_init(void)
2638 {
2639         int res;
2640
2641         res = proto_register(&tipc_proto, 1);
2642         if (res) {
2643                 pr_err("Failed to register TIPC protocol type\n");
2644                 goto out;
2645         }
2646
2647         res = sock_register(&tipc_family_ops);
2648         if (res) {
2649                 pr_err("Failed to register TIPC socket type\n");
2650                 proto_unregister(&tipc_proto);
2651                 goto out;
2652         }
2653  out:
2654         return res;
2655 }
2656
2657 /**
2658  * tipc_socket_stop - stop TIPC socket interface
2659  */
2660 void tipc_socket_stop(void)
2661 {
2662         sock_unregister(tipc_family_ops.family);
2663         proto_unregister(&tipc_proto);
2664 }
2665
2666 /* Caller should hold socket lock for the passed tipc socket. */
2667 static int __tipc_nl_add_sk_con(struct sk_buff *skb, struct tipc_sock *tsk)
2668 {
2669         u32 peer_node;
2670         u32 peer_port;
2671         struct nlattr *nest;
2672
2673         peer_node = tsk_peer_node(tsk);
2674         peer_port = tsk_peer_port(tsk);
2675
2676         nest = nla_nest_start(skb, TIPC_NLA_SOCK_CON);
2677
2678         if (nla_put_u32(skb, TIPC_NLA_CON_NODE, peer_node))
2679                 goto msg_full;
2680         if (nla_put_u32(skb, TIPC_NLA_CON_SOCK, peer_port))
2681                 goto msg_full;
2682
2683         if (tsk->conn_type != 0) {
2684                 if (nla_put_flag(skb, TIPC_NLA_CON_FLAG))
2685                         goto msg_full;
2686                 if (nla_put_u32(skb, TIPC_NLA_CON_TYPE, tsk->conn_type))
2687                         goto msg_full;
2688                 if (nla_put_u32(skb, TIPC_NLA_CON_INST, tsk->conn_instance))
2689                         goto msg_full;
2690         }
2691         nla_nest_end(skb, nest);
2692
2693         return 0;
2694
2695 msg_full:
2696         nla_nest_cancel(skb, nest);
2697
2698         return -EMSGSIZE;
2699 }
2700
2701 /* Caller should hold socket lock for the passed tipc socket. */
2702 static int __tipc_nl_add_sk(struct sk_buff *skb, struct netlink_callback *cb,
2703                             struct tipc_sock *tsk)
2704 {
2705         int err;
2706         void *hdr;
2707         struct nlattr *attrs;
2708         struct net *net = sock_net(skb->sk);
2709         struct tipc_net *tn = net_generic(net, tipc_net_id);
2710
2711         hdr = genlmsg_put(skb, NETLINK_CB(cb->skb).portid, cb->nlh->nlmsg_seq,
2712                           &tipc_genl_v2_family, NLM_F_MULTI, TIPC_NL_SOCK_GET);
2713         if (!hdr)
2714                 goto msg_cancel;
2715
2716         attrs = nla_nest_start(skb, TIPC_NLA_SOCK);
2717         if (!attrs)
2718                 goto genlmsg_cancel;
2719         if (nla_put_u32(skb, TIPC_NLA_SOCK_REF, tsk->portid))
2720                 goto attr_msg_cancel;
2721         if (nla_put_u32(skb, TIPC_NLA_SOCK_ADDR, tn->own_addr))
2722                 goto attr_msg_cancel;
2723
2724         if (tsk->connected) {
2725                 err = __tipc_nl_add_sk_con(skb, tsk);
2726                 if (err)
2727                         goto attr_msg_cancel;
2728         } else if (!list_empty(&tsk->publications)) {
2729                 if (nla_put_flag(skb, TIPC_NLA_SOCK_HAS_PUBL))
2730                         goto attr_msg_cancel;
2731         }
2732         nla_nest_end(skb, attrs);
2733         genlmsg_end(skb, hdr);
2734
2735         return 0;
2736
2737 attr_msg_cancel:
2738         nla_nest_cancel(skb, attrs);
2739 genlmsg_cancel:
2740         genlmsg_cancel(skb, hdr);
2741 msg_cancel:
2742         return -EMSGSIZE;
2743 }
2744
2745 int tipc_nl_sk_dump(struct sk_buff *skb, struct netlink_callback *cb)
2746 {
2747         int err;
2748         struct tipc_sock *tsk;
2749         const struct bucket_table *tbl;
2750         struct rhash_head *pos;
2751         u32 prev_portid = cb->args[0];
2752         u32 portid = prev_portid;
2753         struct net *net = sock_net(skb->sk);
2754         struct tipc_net *tn = net_generic(net, tipc_net_id);
2755         int i;
2756
2757         rcu_read_lock();
2758         tbl = rht_dereference_rcu((&tn->sk_rht)->tbl, &tn->sk_rht);
2759         for (i = 0; i < tbl->size; i++) {
2760                 rht_for_each_entry_rcu(tsk, pos, tbl, i, node) {
2761                         spin_lock_bh(&tsk->sk.sk_lock.slock);
2762                         portid = tsk->portid;
2763                         err = __tipc_nl_add_sk(skb, cb, tsk);
2764                         spin_unlock_bh(&tsk->sk.sk_lock.slock);
2765                         if (err)
2766                                 break;
2767
2768                         prev_portid = portid;
2769                 }
2770         }
2771         rcu_read_unlock();
2772
2773         cb->args[0] = prev_portid;
2774
2775         return skb->len;
2776 }
2777
2778 /* Caller should hold socket lock for the passed tipc socket. */
2779 static int __tipc_nl_add_sk_publ(struct sk_buff *skb,
2780                                  struct netlink_callback *cb,
2781                                  struct publication *publ)
2782 {
2783         void *hdr;
2784         struct nlattr *attrs;
2785
2786         hdr = genlmsg_put(skb, NETLINK_CB(cb->skb).portid, cb->nlh->nlmsg_seq,
2787                           &tipc_genl_v2_family, NLM_F_MULTI, TIPC_NL_PUBL_GET);
2788         if (!hdr)
2789                 goto msg_cancel;
2790
2791         attrs = nla_nest_start(skb, TIPC_NLA_PUBL);
2792         if (!attrs)
2793                 goto genlmsg_cancel;
2794
2795         if (nla_put_u32(skb, TIPC_NLA_PUBL_KEY, publ->key))
2796                 goto attr_msg_cancel;
2797         if (nla_put_u32(skb, TIPC_NLA_PUBL_TYPE, publ->type))
2798                 goto attr_msg_cancel;
2799         if (nla_put_u32(skb, TIPC_NLA_PUBL_LOWER, publ->lower))
2800                 goto attr_msg_cancel;
2801         if (nla_put_u32(skb, TIPC_NLA_PUBL_UPPER, publ->upper))
2802                 goto attr_msg_cancel;
2803
2804         nla_nest_end(skb, attrs);
2805         genlmsg_end(skb, hdr);
2806
2807         return 0;
2808
2809 attr_msg_cancel:
2810         nla_nest_cancel(skb, attrs);
2811 genlmsg_cancel:
2812         genlmsg_cancel(skb, hdr);
2813 msg_cancel:
2814         return -EMSGSIZE;
2815 }
2816
2817 /* Caller should hold socket lock for the passed tipc socket. */
2818 static int __tipc_nl_list_sk_publ(struct sk_buff *skb,
2819                                   struct netlink_callback *cb,
2820                                   struct tipc_sock *tsk, u32 *last_publ)
2821 {
2822         int err;
2823         struct publication *p;
2824
2825         if (*last_publ) {
2826                 list_for_each_entry(p, &tsk->publications, pport_list) {
2827                         if (p->key == *last_publ)
2828                                 break;
2829                 }
2830                 if (p->key != *last_publ) {
2831                         /* We never set seq or call nl_dump_check_consistent()
2832                          * this means that setting prev_seq here will cause the
2833                          * consistence check to fail in the netlink callback
2834                          * handler. Resulting in the last NLMSG_DONE message
2835                          * having the NLM_F_DUMP_INTR flag set.
2836                          */
2837                         cb->prev_seq = 1;
2838                         *last_publ = 0;
2839                         return -EPIPE;
2840                 }
2841         } else {
2842                 p = list_first_entry(&tsk->publications, struct publication,
2843                                      pport_list);
2844         }
2845
2846         list_for_each_entry_from(p, &tsk->publications, pport_list) {
2847                 err = __tipc_nl_add_sk_publ(skb, cb, p);
2848                 if (err) {
2849                         *last_publ = p->key;
2850                         return err;
2851                 }
2852         }
2853         *last_publ = 0;
2854
2855         return 0;
2856 }
2857
2858 int tipc_nl_publ_dump(struct sk_buff *skb, struct netlink_callback *cb)
2859 {
2860         int err;
2861         u32 tsk_portid = cb->args[0];
2862         u32 last_publ = cb->args[1];
2863         u32 done = cb->args[2];
2864         struct net *net = sock_net(skb->sk);
2865         struct tipc_sock *tsk;
2866
2867         if (!tsk_portid) {
2868                 struct nlattr **attrs;
2869                 struct nlattr *sock[TIPC_NLA_SOCK_MAX + 1];
2870
2871                 err = tipc_nlmsg_parse(cb->nlh, &attrs);
2872                 if (err)
2873                         return err;
2874
2875                 err = nla_parse_nested(sock, TIPC_NLA_SOCK_MAX,
2876                                        attrs[TIPC_NLA_SOCK],
2877                                        tipc_nl_sock_policy);
2878                 if (err)
2879                         return err;
2880
2881                 if (!sock[TIPC_NLA_SOCK_REF])
2882                         return -EINVAL;
2883
2884                 tsk_portid = nla_get_u32(sock[TIPC_NLA_SOCK_REF]);
2885         }
2886
2887         if (done)
2888                 return 0;
2889
2890         tsk = tipc_sk_lookup(net, tsk_portid);
2891         if (!tsk)
2892                 return -EINVAL;
2893
2894         lock_sock(&tsk->sk);
2895         err = __tipc_nl_list_sk_publ(skb, cb, tsk, &last_publ);
2896         if (!err)
2897                 done = 1;
2898         release_sock(&tsk->sk);
2899         sock_put(&tsk->sk);
2900
2901         cb->args[0] = tsk_portid;
2902         cb->args[1] = last_publ;
2903         cb->args[2] = done;
2904
2905         return skb->len;
2906 }