Merge branch 'master' of git://git.kernel.org/pub/scm/linux/kernel/git/kvalo/wireless...
[cascardo/linux.git] / net / tipc / socket.c
1 /*
2  * net/tipc/socket.c: TIPC socket API
3  *
4  * Copyright (c) 2001-2007, 2012-2014, Ericsson AB
5  * Copyright (c) 2004-2008, 2010-2013, Wind River Systems
6  * All rights reserved.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in the
15  *    documentation and/or other materials provided with the distribution.
16  * 3. Neither the names of the copyright holders nor the names of its
17  *    contributors may be used to endorse or promote products derived from
18  *    this software without specific prior written permission.
19  *
20  * Alternatively, this software may be distributed under the terms of the
21  * GNU General Public License ("GPL") version 2 as published by the Free
22  * Software Foundation.
23  *
24  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
25  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
26  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
27  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
28  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
29  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
30  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
31  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
32  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
33  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
34  * POSSIBILITY OF SUCH DAMAGE.
35  */
36
37 #include <linux/rhashtable.h>
38 #include <linux/jhash.h>
39 #include "core.h"
40 #include "name_table.h"
41 #include "node.h"
42 #include "link.h"
43 #include "config.h"
44 #include "socket.h"
45
46 #define SS_LISTENING            -1      /* socket is listening */
47 #define SS_READY                -2      /* socket is connectionless */
48
49 #define CONN_TIMEOUT_DEFAULT    8000    /* default connect timeout = 8s */
50 #define CONN_PROBING_INTERVAL   msecs_to_jiffies(3600000)  /* [ms] => 1 h */
51 #define TIPC_FWD_MSG            1
52 #define TIPC_CONN_OK            0
53 #define TIPC_CONN_PROBING       1
54 #define TIPC_MAX_PORT           0xffffffff
55 #define TIPC_MIN_PORT           1
56
57 /**
58  * struct tipc_sock - TIPC socket structure
59  * @sk: socket - interacts with 'port' and with user via the socket API
60  * @connected: non-zero if port is currently connected to a peer port
61  * @conn_type: TIPC type used when connection was established
62  * @conn_instance: TIPC instance used when connection was established
63  * @published: non-zero if port has one or more associated names
64  * @max_pkt: maximum packet size "hint" used when building messages sent by port
65  * @portid: unique port identity in TIPC socket hash table
66  * @phdr: preformatted message header used when sending messages
67  * @port_list: adjacent ports in TIPC's global list of ports
68  * @publications: list of publications for port
69  * @pub_count: total # of publications port has made during its lifetime
70  * @probing_state:
71  * @probing_intv:
72  * @port: port - interacts with 'sk' and with the rest of the TIPC stack
73  * @peer_name: the peer of the connection, if any
74  * @conn_timeout: the time we can wait for an unresponded setup request
75  * @dupl_rcvcnt: number of bytes counted twice, in both backlog and rcv queue
76  * @link_cong: non-zero if owner must sleep because of link congestion
77  * @sent_unacked: # messages sent by socket, and not yet acked by peer
78  * @rcv_unacked: # messages read by user, but not yet acked back to peer
79  * @node: hash table node
80  * @rcu: rcu struct for tipc_sock
81  */
82 struct tipc_sock {
83         struct sock sk;
84         int connected;
85         u32 conn_type;
86         u32 conn_instance;
87         int published;
88         u32 max_pkt;
89         u32 portid;
90         struct tipc_msg phdr;
91         struct list_head sock_list;
92         struct list_head publications;
93         u32 pub_count;
94         u32 probing_state;
95         unsigned long probing_intv;
96         uint conn_timeout;
97         atomic_t dupl_rcvcnt;
98         bool link_cong;
99         uint sent_unacked;
100         uint rcv_unacked;
101         struct rhash_head node;
102         struct rcu_head rcu;
103 };
104
105 static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb);
106 static void tipc_data_ready(struct sock *sk);
107 static void tipc_write_space(struct sock *sk);
108 static int tipc_release(struct socket *sock);
109 static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags);
110 static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p);
111 static void tipc_sk_timeout(unsigned long data);
112 static int tipc_sk_publish(struct tipc_sock *tsk, uint scope,
113                            struct tipc_name_seq const *seq);
114 static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope,
115                             struct tipc_name_seq const *seq);
116 static struct tipc_sock *tipc_sk_lookup(struct net *net, u32 portid);
117 static int tipc_sk_insert(struct tipc_sock *tsk);
118 static void tipc_sk_remove(struct tipc_sock *tsk);
119
120 static const struct proto_ops packet_ops;
121 static const struct proto_ops stream_ops;
122 static const struct proto_ops msg_ops;
123
124 static struct proto tipc_proto;
125 static struct proto tipc_proto_kern;
126
127 static const struct nla_policy tipc_nl_sock_policy[TIPC_NLA_SOCK_MAX + 1] = {
128         [TIPC_NLA_SOCK_UNSPEC]          = { .type = NLA_UNSPEC },
129         [TIPC_NLA_SOCK_ADDR]            = { .type = NLA_U32 },
130         [TIPC_NLA_SOCK_REF]             = { .type = NLA_U32 },
131         [TIPC_NLA_SOCK_CON]             = { .type = NLA_NESTED },
132         [TIPC_NLA_SOCK_HAS_PUBL]        = { .type = NLA_FLAG }
133 };
134
135 /*
136  * Revised TIPC socket locking policy:
137  *
138  * Most socket operations take the standard socket lock when they start
139  * and hold it until they finish (or until they need to sleep).  Acquiring
140  * this lock grants the owner exclusive access to the fields of the socket
141  * data structures, with the exception of the backlog queue.  A few socket
142  * operations can be done without taking the socket lock because they only
143  * read socket information that never changes during the life of the socket.
144  *
145  * Socket operations may acquire the lock for the associated TIPC port if they
146  * need to perform an operation on the port.  If any routine needs to acquire
147  * both the socket lock and the port lock it must take the socket lock first
148  * to avoid the risk of deadlock.
149  *
150  * The dispatcher handling incoming messages cannot grab the socket lock in
151  * the standard fashion, since invoked it runs at the BH level and cannot block.
152  * Instead, it checks to see if the socket lock is currently owned by someone,
153  * and either handles the message itself or adds it to the socket's backlog
154  * queue; in the latter case the queued message is processed once the process
155  * owning the socket lock releases it.
156  *
157  * NOTE: Releasing the socket lock while an operation is sleeping overcomes
158  * the problem of a blocked socket operation preventing any other operations
159  * from occurring.  However, applications must be careful if they have
160  * multiple threads trying to send (or receive) on the same socket, as these
161  * operations might interfere with each other.  For example, doing a connect
162  * and a receive at the same time might allow the receive to consume the
163  * ACK message meant for the connect.  While additional work could be done
164  * to try and overcome this, it doesn't seem to be worthwhile at the present.
165  *
166  * NOTE: Releasing the socket lock while an operation is sleeping also ensures
167  * that another operation that must be performed in a non-blocking manner is
168  * not delayed for very long because the lock has already been taken.
169  *
170  * NOTE: This code assumes that certain fields of a port/socket pair are
171  * constant over its lifetime; such fields can be examined without taking
172  * the socket lock and/or port lock, and do not need to be re-read even
173  * after resuming processing after waiting.  These fields include:
174  *   - socket type
175  *   - pointer to socket sk structure (aka tipc_sock structure)
176  *   - pointer to port structure
177  *   - port reference
178  */
179
180 static u32 tsk_peer_node(struct tipc_sock *tsk)
181 {
182         return msg_destnode(&tsk->phdr);
183 }
184
185 static u32 tsk_peer_port(struct tipc_sock *tsk)
186 {
187         return msg_destport(&tsk->phdr);
188 }
189
190 static  bool tsk_unreliable(struct tipc_sock *tsk)
191 {
192         return msg_src_droppable(&tsk->phdr) != 0;
193 }
194
195 static void tsk_set_unreliable(struct tipc_sock *tsk, bool unreliable)
196 {
197         msg_set_src_droppable(&tsk->phdr, unreliable ? 1 : 0);
198 }
199
200 static bool tsk_unreturnable(struct tipc_sock *tsk)
201 {
202         return msg_dest_droppable(&tsk->phdr) != 0;
203 }
204
205 static void tsk_set_unreturnable(struct tipc_sock *tsk, bool unreturnable)
206 {
207         msg_set_dest_droppable(&tsk->phdr, unreturnable ? 1 : 0);
208 }
209
210 static int tsk_importance(struct tipc_sock *tsk)
211 {
212         return msg_importance(&tsk->phdr);
213 }
214
215 static int tsk_set_importance(struct tipc_sock *tsk, int imp)
216 {
217         if (imp > TIPC_CRITICAL_IMPORTANCE)
218                 return -EINVAL;
219         msg_set_importance(&tsk->phdr, (u32)imp);
220         return 0;
221 }
222
223 static struct tipc_sock *tipc_sk(const struct sock *sk)
224 {
225         return container_of(sk, struct tipc_sock, sk);
226 }
227
228 static int tsk_conn_cong(struct tipc_sock *tsk)
229 {
230         return tsk->sent_unacked >= TIPC_FLOWCTRL_WIN;
231 }
232
233 /**
234  * tsk_advance_rx_queue - discard first buffer in socket receive queue
235  *
236  * Caller must hold socket lock
237  */
238 static void tsk_advance_rx_queue(struct sock *sk)
239 {
240         kfree_skb(__skb_dequeue(&sk->sk_receive_queue));
241 }
242
243 /**
244  * tsk_rej_rx_queue - reject all buffers in socket receive queue
245  *
246  * Caller must hold socket lock
247  */
248 static void tsk_rej_rx_queue(struct sock *sk)
249 {
250         struct sk_buff *skb;
251         u32 dnode;
252         struct net *net = sock_net(sk);
253
254         while ((skb = __skb_dequeue(&sk->sk_receive_queue))) {
255                 if (tipc_msg_reverse(net, skb, &dnode, TIPC_ERR_NO_PORT))
256                         tipc_link_xmit_skb(net, skb, dnode, 0);
257         }
258 }
259
260 /* tsk_peer_msg - verify if message was sent by connected port's peer
261  *
262  * Handles cases where the node's network address has changed from
263  * the default of <0.0.0> to its configured setting.
264  */
265 static bool tsk_peer_msg(struct tipc_sock *tsk, struct tipc_msg *msg)
266 {
267         struct tipc_net *tn = net_generic(sock_net(&tsk->sk), tipc_net_id);
268         u32 peer_port = tsk_peer_port(tsk);
269         u32 orig_node;
270         u32 peer_node;
271
272         if (unlikely(!tsk->connected))
273                 return false;
274
275         if (unlikely(msg_origport(msg) != peer_port))
276                 return false;
277
278         orig_node = msg_orignode(msg);
279         peer_node = tsk_peer_node(tsk);
280
281         if (likely(orig_node == peer_node))
282                 return true;
283
284         if (!orig_node && (peer_node == tn->own_addr))
285                 return true;
286
287         if (!peer_node && (orig_node == tn->own_addr))
288                 return true;
289
290         return false;
291 }
292
293 /**
294  * tipc_sk_create - create a TIPC socket
295  * @net: network namespace (must be default network)
296  * @sock: pre-allocated socket structure
297  * @protocol: protocol indicator (must be 0)
298  * @kern: caused by kernel or by userspace?
299  *
300  * This routine creates additional data structures used by the TIPC socket,
301  * initializes them, and links them together.
302  *
303  * Returns 0 on success, errno otherwise
304  */
305 static int tipc_sk_create(struct net *net, struct socket *sock,
306                           int protocol, int kern)
307 {
308         const struct proto_ops *ops;
309         socket_state state;
310         struct sock *sk;
311         struct tipc_sock *tsk;
312         struct tipc_msg *msg;
313
314         /* Validate arguments */
315         if (unlikely(protocol != 0))
316                 return -EPROTONOSUPPORT;
317
318         switch (sock->type) {
319         case SOCK_STREAM:
320                 ops = &stream_ops;
321                 state = SS_UNCONNECTED;
322                 break;
323         case SOCK_SEQPACKET:
324                 ops = &packet_ops;
325                 state = SS_UNCONNECTED;
326                 break;
327         case SOCK_DGRAM:
328         case SOCK_RDM:
329                 ops = &msg_ops;
330                 state = SS_READY;
331                 break;
332         default:
333                 return -EPROTOTYPE;
334         }
335
336         /* Allocate socket's protocol area */
337         if (!kern)
338                 sk = sk_alloc(net, AF_TIPC, GFP_KERNEL, &tipc_proto);
339         else
340                 sk = sk_alloc(net, AF_TIPC, GFP_KERNEL, &tipc_proto_kern);
341
342         if (sk == NULL)
343                 return -ENOMEM;
344
345         tsk = tipc_sk(sk);
346         tsk->max_pkt = MAX_PKT_DEFAULT;
347         INIT_LIST_HEAD(&tsk->publications);
348         msg = &tsk->phdr;
349         tipc_msg_init(net, msg, TIPC_LOW_IMPORTANCE, TIPC_NAMED_MSG,
350                       NAMED_H_SIZE, 0);
351
352         /* Finish initializing socket data structures */
353         sock->ops = ops;
354         sock->state = state;
355         sock_init_data(sock, sk);
356         if (tipc_sk_insert(tsk)) {
357                 pr_warn("Socket create failed; port numbrer exhausted\n");
358                 return -EINVAL;
359         }
360         msg_set_origport(msg, tsk->portid);
361         setup_timer(&sk->sk_timer, tipc_sk_timeout, (unsigned long)tsk);
362         sk->sk_backlog_rcv = tipc_backlog_rcv;
363         sk->sk_rcvbuf = sysctl_tipc_rmem[1];
364         sk->sk_data_ready = tipc_data_ready;
365         sk->sk_write_space = tipc_write_space;
366         tsk->conn_timeout = CONN_TIMEOUT_DEFAULT;
367         tsk->sent_unacked = 0;
368         atomic_set(&tsk->dupl_rcvcnt, 0);
369
370         if (sock->state == SS_READY) {
371                 tsk_set_unreturnable(tsk, true);
372                 if (sock->type == SOCK_DGRAM)
373                         tsk_set_unreliable(tsk, true);
374         }
375         return 0;
376 }
377
378 /**
379  * tipc_sock_create_local - create TIPC socket from inside TIPC module
380  * @type: socket type - SOCK_RDM or SOCK_SEQPACKET
381  *
382  * We cannot use sock_creat_kern here because it bumps module user count.
383  * Since socket owner and creator is the same module we must make sure
384  * that module count remains zero for module local sockets, otherwise
385  * we cannot do rmmod.
386  *
387  * Returns 0 on success, errno otherwise
388  */
389 int tipc_sock_create_local(struct net *net, int type, struct socket **res)
390 {
391         int rc;
392
393         rc = sock_create_lite(AF_TIPC, type, 0, res);
394         if (rc < 0) {
395                 pr_err("Failed to create kernel socket\n");
396                 return rc;
397         }
398         tipc_sk_create(net, *res, 0, 1);
399
400         return 0;
401 }
402
403 /**
404  * tipc_sock_release_local - release socket created by tipc_sock_create_local
405  * @sock: the socket to be released.
406  *
407  * Module reference count is not incremented when such sockets are created,
408  * so we must keep it from being decremented when they are released.
409  */
410 void tipc_sock_release_local(struct socket *sock)
411 {
412         tipc_release(sock);
413         sock->ops = NULL;
414         sock_release(sock);
415 }
416
417 /**
418  * tipc_sock_accept_local - accept a connection on a socket created
419  * with tipc_sock_create_local. Use this function to avoid that
420  * module reference count is inadvertently incremented.
421  *
422  * @sock:    the accepting socket
423  * @newsock: reference to the new socket to be created
424  * @flags:   socket flags
425  */
426
427 int tipc_sock_accept_local(struct socket *sock, struct socket **newsock,
428                            int flags)
429 {
430         struct sock *sk = sock->sk;
431         int ret;
432
433         ret = sock_create_lite(sk->sk_family, sk->sk_type,
434                                sk->sk_protocol, newsock);
435         if (ret < 0)
436                 return ret;
437
438         ret = tipc_accept(sock, *newsock, flags);
439         if (ret < 0) {
440                 sock_release(*newsock);
441                 return ret;
442         }
443         (*newsock)->ops = sock->ops;
444         return ret;
445 }
446
447 static void tipc_sk_callback(struct rcu_head *head)
448 {
449         struct tipc_sock *tsk = container_of(head, struct tipc_sock, rcu);
450
451         sock_put(&tsk->sk);
452 }
453
454 /**
455  * tipc_release - destroy a TIPC socket
456  * @sock: socket to destroy
457  *
458  * This routine cleans up any messages that are still queued on the socket.
459  * For DGRAM and RDM socket types, all queued messages are rejected.
460  * For SEQPACKET and STREAM socket types, the first message is rejected
461  * and any others are discarded.  (If the first message on a STREAM socket
462  * is partially-read, it is discarded and the next one is rejected instead.)
463  *
464  * NOTE: Rejected messages are not necessarily returned to the sender!  They
465  * are returned or discarded according to the "destination droppable" setting
466  * specified for the message by the sender.
467  *
468  * Returns 0 on success, errno otherwise
469  */
470 static int tipc_release(struct socket *sock)
471 {
472         struct sock *sk = sock->sk;
473         struct net *net;
474         struct tipc_net *tn;
475         struct tipc_sock *tsk;
476         struct sk_buff *skb;
477         u32 dnode, probing_state;
478
479         /*
480          * Exit if socket isn't fully initialized (occurs when a failed accept()
481          * releases a pre-allocated child socket that was never used)
482          */
483         if (sk == NULL)
484                 return 0;
485
486         net = sock_net(sk);
487         tn = net_generic(net, tipc_net_id);
488
489         tsk = tipc_sk(sk);
490         lock_sock(sk);
491
492         /*
493          * Reject all unreceived messages, except on an active connection
494          * (which disconnects locally & sends a 'FIN+' to peer)
495          */
496         dnode = tsk_peer_node(tsk);
497         while (sock->state != SS_DISCONNECTING) {
498                 skb = __skb_dequeue(&sk->sk_receive_queue);
499                 if (skb == NULL)
500                         break;
501                 if (TIPC_SKB_CB(skb)->handle != NULL)
502                         kfree_skb(skb);
503                 else {
504                         if ((sock->state == SS_CONNECTING) ||
505                             (sock->state == SS_CONNECTED)) {
506                                 sock->state = SS_DISCONNECTING;
507                                 tsk->connected = 0;
508                                 tipc_node_remove_conn(net, dnode, tsk->portid);
509                         }
510                         if (tipc_msg_reverse(net, skb, &dnode,
511                                              TIPC_ERR_NO_PORT))
512                                 tipc_link_xmit_skb(net, skb, dnode, 0);
513                 }
514         }
515
516         tipc_sk_withdraw(tsk, 0, NULL);
517         probing_state = tsk->probing_state;
518         if (del_timer_sync(&sk->sk_timer) &&
519             probing_state != TIPC_CONN_PROBING)
520                 sock_put(sk);
521         tipc_sk_remove(tsk);
522         if (tsk->connected) {
523                 skb = tipc_msg_create(net, TIPC_CRITICAL_IMPORTANCE,
524                                       TIPC_CONN_MSG, SHORT_H_SIZE, 0, dnode,
525                                       tn->own_addr, tsk_peer_port(tsk),
526                                       tsk->portid, TIPC_ERR_NO_PORT);
527                 if (skb)
528                         tipc_link_xmit_skb(net, skb, dnode, tsk->portid);
529                 tipc_node_remove_conn(net, dnode, tsk->portid);
530         }
531
532         /* Discard any remaining (connection-based) messages in receive queue */
533         __skb_queue_purge(&sk->sk_receive_queue);
534
535         /* Reject any messages that accumulated in backlog queue */
536         sock->state = SS_DISCONNECTING;
537         release_sock(sk);
538
539         call_rcu(&tsk->rcu, tipc_sk_callback);
540         sock->sk = NULL;
541
542         return 0;
543 }
544
545 /**
546  * tipc_bind - associate or disassocate TIPC name(s) with a socket
547  * @sock: socket structure
548  * @uaddr: socket address describing name(s) and desired operation
549  * @uaddr_len: size of socket address data structure
550  *
551  * Name and name sequence binding is indicated using a positive scope value;
552  * a negative scope value unbinds the specified name.  Specifying no name
553  * (i.e. a socket address length of 0) unbinds all names from the socket.
554  *
555  * Returns 0 on success, errno otherwise
556  *
557  * NOTE: This routine doesn't need to take the socket lock since it doesn't
558  *       access any non-constant socket information.
559  */
560 static int tipc_bind(struct socket *sock, struct sockaddr *uaddr,
561                      int uaddr_len)
562 {
563         struct sock *sk = sock->sk;
564         struct sockaddr_tipc *addr = (struct sockaddr_tipc *)uaddr;
565         struct tipc_sock *tsk = tipc_sk(sk);
566         int res = -EINVAL;
567
568         lock_sock(sk);
569         if (unlikely(!uaddr_len)) {
570                 res = tipc_sk_withdraw(tsk, 0, NULL);
571                 goto exit;
572         }
573
574         if (uaddr_len < sizeof(struct sockaddr_tipc)) {
575                 res = -EINVAL;
576                 goto exit;
577         }
578         if (addr->family != AF_TIPC) {
579                 res = -EAFNOSUPPORT;
580                 goto exit;
581         }
582
583         if (addr->addrtype == TIPC_ADDR_NAME)
584                 addr->addr.nameseq.upper = addr->addr.nameseq.lower;
585         else if (addr->addrtype != TIPC_ADDR_NAMESEQ) {
586                 res = -EAFNOSUPPORT;
587                 goto exit;
588         }
589
590         if ((addr->addr.nameseq.type < TIPC_RESERVED_TYPES) &&
591             (addr->addr.nameseq.type != TIPC_TOP_SRV) &&
592             (addr->addr.nameseq.type != TIPC_CFG_SRV)) {
593                 res = -EACCES;
594                 goto exit;
595         }
596
597         res = (addr->scope > 0) ?
598                 tipc_sk_publish(tsk, addr->scope, &addr->addr.nameseq) :
599                 tipc_sk_withdraw(tsk, -addr->scope, &addr->addr.nameseq);
600 exit:
601         release_sock(sk);
602         return res;
603 }
604
605 /**
606  * tipc_getname - get port ID of socket or peer socket
607  * @sock: socket structure
608  * @uaddr: area for returned socket address
609  * @uaddr_len: area for returned length of socket address
610  * @peer: 0 = own ID, 1 = current peer ID, 2 = current/former peer ID
611  *
612  * Returns 0 on success, errno otherwise
613  *
614  * NOTE: This routine doesn't need to take the socket lock since it only
615  *       accesses socket information that is unchanging (or which changes in
616  *       a completely predictable manner).
617  */
618 static int tipc_getname(struct socket *sock, struct sockaddr *uaddr,
619                         int *uaddr_len, int peer)
620 {
621         struct sockaddr_tipc *addr = (struct sockaddr_tipc *)uaddr;
622         struct tipc_sock *tsk = tipc_sk(sock->sk);
623         struct tipc_net *tn = net_generic(sock_net(sock->sk), tipc_net_id);
624
625         memset(addr, 0, sizeof(*addr));
626         if (peer) {
627                 if ((sock->state != SS_CONNECTED) &&
628                         ((peer != 2) || (sock->state != SS_DISCONNECTING)))
629                         return -ENOTCONN;
630                 addr->addr.id.ref = tsk_peer_port(tsk);
631                 addr->addr.id.node = tsk_peer_node(tsk);
632         } else {
633                 addr->addr.id.ref = tsk->portid;
634                 addr->addr.id.node = tn->own_addr;
635         }
636
637         *uaddr_len = sizeof(*addr);
638         addr->addrtype = TIPC_ADDR_ID;
639         addr->family = AF_TIPC;
640         addr->scope = 0;
641         addr->addr.name.domain = 0;
642
643         return 0;
644 }
645
646 /**
647  * tipc_poll - read and possibly block on pollmask
648  * @file: file structure associated with the socket
649  * @sock: socket for which to calculate the poll bits
650  * @wait: ???
651  *
652  * Returns pollmask value
653  *
654  * COMMENTARY:
655  * It appears that the usual socket locking mechanisms are not useful here
656  * since the pollmask info is potentially out-of-date the moment this routine
657  * exits.  TCP and other protocols seem to rely on higher level poll routines
658  * to handle any preventable race conditions, so TIPC will do the same ...
659  *
660  * TIPC sets the returned events as follows:
661  *
662  * socket state         flags set
663  * ------------         ---------
664  * unconnected          no read flags
665  *                      POLLOUT if port is not congested
666  *
667  * connecting           POLLIN/POLLRDNORM if ACK/NACK in rx queue
668  *                      no write flags
669  *
670  * connected            POLLIN/POLLRDNORM if data in rx queue
671  *                      POLLOUT if port is not congested
672  *
673  * disconnecting        POLLIN/POLLRDNORM/POLLHUP
674  *                      no write flags
675  *
676  * listening            POLLIN if SYN in rx queue
677  *                      no write flags
678  *
679  * ready                POLLIN/POLLRDNORM if data in rx queue
680  * [connectionless]     POLLOUT (since port cannot be congested)
681  *
682  * IMPORTANT: The fact that a read or write operation is indicated does NOT
683  * imply that the operation will succeed, merely that it should be performed
684  * and will not block.
685  */
686 static unsigned int tipc_poll(struct file *file, struct socket *sock,
687                               poll_table *wait)
688 {
689         struct sock *sk = sock->sk;
690         struct tipc_sock *tsk = tipc_sk(sk);
691         u32 mask = 0;
692
693         sock_poll_wait(file, sk_sleep(sk), wait);
694
695         switch ((int)sock->state) {
696         case SS_UNCONNECTED:
697                 if (!tsk->link_cong)
698                         mask |= POLLOUT;
699                 break;
700         case SS_READY:
701         case SS_CONNECTED:
702                 if (!tsk->link_cong && !tsk_conn_cong(tsk))
703                         mask |= POLLOUT;
704                 /* fall thru' */
705         case SS_CONNECTING:
706         case SS_LISTENING:
707                 if (!skb_queue_empty(&sk->sk_receive_queue))
708                         mask |= (POLLIN | POLLRDNORM);
709                 break;
710         case SS_DISCONNECTING:
711                 mask = (POLLIN | POLLRDNORM | POLLHUP);
712                 break;
713         }
714
715         return mask;
716 }
717
718 /**
719  * tipc_sendmcast - send multicast message
720  * @sock: socket structure
721  * @seq: destination address
722  * @msg: message to send
723  * @dsz: total length of message data
724  * @timeo: timeout to wait for wakeup
725  *
726  * Called from function tipc_sendmsg(), which has done all sanity checks
727  * Returns the number of bytes sent on success, or errno
728  */
729 static int tipc_sendmcast(struct  socket *sock, struct tipc_name_seq *seq,
730                           struct msghdr *msg, size_t dsz, long timeo)
731 {
732         struct sock *sk = sock->sk;
733         struct net *net = sock_net(sk);
734         struct tipc_msg *mhdr = &tipc_sk(sk)->phdr;
735         struct sk_buff_head head;
736         uint mtu;
737         int rc;
738
739         msg_set_type(mhdr, TIPC_MCAST_MSG);
740         msg_set_lookup_scope(mhdr, TIPC_CLUSTER_SCOPE);
741         msg_set_destport(mhdr, 0);
742         msg_set_destnode(mhdr, 0);
743         msg_set_nametype(mhdr, seq->type);
744         msg_set_namelower(mhdr, seq->lower);
745         msg_set_nameupper(mhdr, seq->upper);
746         msg_set_hdr_sz(mhdr, MCAST_H_SIZE);
747
748 new_mtu:
749         mtu = tipc_bclink_get_mtu();
750         __skb_queue_head_init(&head);
751         rc = tipc_msg_build(net, mhdr, msg, 0, dsz, mtu, &head);
752         if (unlikely(rc < 0))
753                 return rc;
754
755         do {
756                 rc = tipc_bclink_xmit(net, &head);
757                 if (likely(rc >= 0)) {
758                         rc = dsz;
759                         break;
760                 }
761                 if (rc == -EMSGSIZE)
762                         goto new_mtu;
763                 if (rc != -ELINKCONG)
764                         break;
765                 tipc_sk(sk)->link_cong = 1;
766                 rc = tipc_wait_for_sndmsg(sock, &timeo);
767                 if (rc)
768                         __skb_queue_purge(&head);
769         } while (!rc);
770         return rc;
771 }
772
773 /* tipc_sk_mcast_rcv - Deliver multicast message to all destination sockets
774  */
775 void tipc_sk_mcast_rcv(struct net *net, struct sk_buff *buf)
776 {
777         struct tipc_msg *msg = buf_msg(buf);
778         struct tipc_port_list dports = {0, NULL, };
779         struct tipc_port_list *item;
780         struct sk_buff *b;
781         uint i, last, dst = 0;
782         u32 scope = TIPC_CLUSTER_SCOPE;
783
784         if (in_own_node(net, msg_orignode(msg)))
785                 scope = TIPC_NODE_SCOPE;
786
787         /* Create destination port list: */
788         tipc_nametbl_mc_translate(net, msg_nametype(msg), msg_namelower(msg),
789                                   msg_nameupper(msg), scope, &dports);
790         last = dports.count;
791         if (!last) {
792                 kfree_skb(buf);
793                 return;
794         }
795
796         for (item = &dports; item; item = item->next) {
797                 for (i = 0; i < PLSIZE && ++dst <= last; i++) {
798                         b = (dst != last) ? skb_clone(buf, GFP_ATOMIC) : buf;
799                         if (!b) {
800                                 pr_warn("Failed do clone mcast rcv buffer\n");
801                                 continue;
802                         }
803                         msg_set_destport(msg, item->ports[i]);
804                         tipc_sk_rcv(net, b);
805                 }
806         }
807         tipc_port_list_free(&dports);
808 }
809
810 /**
811  * tipc_sk_proto_rcv - receive a connection mng protocol message
812  * @tsk: receiving socket
813  * @dnode: node to send response message to, if any
814  * @buf: buffer containing protocol message
815  * Returns 0 (TIPC_OK) if message was consumed, 1 (TIPC_FWD_MSG) if
816  * (CONN_PROBE_REPLY) message should be forwarded.
817  */
818 static int tipc_sk_proto_rcv(struct tipc_sock *tsk, u32 *dnode,
819                              struct sk_buff *buf)
820 {
821         struct tipc_msg *msg = buf_msg(buf);
822         int conn_cong;
823
824         /* Ignore if connection cannot be validated: */
825         if (!tsk_peer_msg(tsk, msg))
826                 goto exit;
827
828         tsk->probing_state = TIPC_CONN_OK;
829
830         if (msg_type(msg) == CONN_ACK) {
831                 conn_cong = tsk_conn_cong(tsk);
832                 tsk->sent_unacked -= msg_msgcnt(msg);
833                 if (conn_cong)
834                         tsk->sk.sk_write_space(&tsk->sk);
835         } else if (msg_type(msg) == CONN_PROBE) {
836                 if (!tipc_msg_reverse(sock_net(&tsk->sk), buf, dnode, TIPC_OK))
837                         return TIPC_OK;
838                 msg_set_type(msg, CONN_PROBE_REPLY);
839                 return TIPC_FWD_MSG;
840         }
841         /* Do nothing if msg_type() == CONN_PROBE_REPLY */
842 exit:
843         kfree_skb(buf);
844         return TIPC_OK;
845 }
846
847 static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p)
848 {
849         struct sock *sk = sock->sk;
850         struct tipc_sock *tsk = tipc_sk(sk);
851         DEFINE_WAIT(wait);
852         int done;
853
854         do {
855                 int err = sock_error(sk);
856                 if (err)
857                         return err;
858                 if (sock->state == SS_DISCONNECTING)
859                         return -EPIPE;
860                 if (!*timeo_p)
861                         return -EAGAIN;
862                 if (signal_pending(current))
863                         return sock_intr_errno(*timeo_p);
864
865                 prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
866                 done = sk_wait_event(sk, timeo_p, !tsk->link_cong);
867                 finish_wait(sk_sleep(sk), &wait);
868         } while (!done);
869         return 0;
870 }
871
872 /**
873  * tipc_sendmsg - send message in connectionless manner
874  * @iocb: if NULL, indicates that socket lock is already held
875  * @sock: socket structure
876  * @m: message to send
877  * @dsz: amount of user data to be sent
878  *
879  * Message must have an destination specified explicitly.
880  * Used for SOCK_RDM and SOCK_DGRAM messages,
881  * and for 'SYN' messages on SOCK_SEQPACKET and SOCK_STREAM connections.
882  * (Note: 'SYN+' is prohibited on SOCK_STREAM.)
883  *
884  * Returns the number of bytes sent on success, or errno otherwise
885  */
886 static int tipc_sendmsg(struct kiocb *iocb, struct socket *sock,
887                         struct msghdr *m, size_t dsz)
888 {
889         DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
890         struct sock *sk = sock->sk;
891         struct tipc_sock *tsk = tipc_sk(sk);
892         struct net *net = sock_net(sk);
893         struct tipc_msg *mhdr = &tsk->phdr;
894         u32 dnode, dport;
895         struct sk_buff_head head;
896         struct sk_buff *skb;
897         struct tipc_name_seq *seq = &dest->addr.nameseq;
898         u32 mtu;
899         long timeo;
900         int rc;
901
902         if (unlikely(!dest))
903                 return -EDESTADDRREQ;
904
905         if (unlikely((m->msg_namelen < sizeof(*dest)) ||
906                      (dest->family != AF_TIPC)))
907                 return -EINVAL;
908
909         if (dsz > TIPC_MAX_USER_MSG_SIZE)
910                 return -EMSGSIZE;
911
912         if (iocb)
913                 lock_sock(sk);
914
915         if (unlikely(sock->state != SS_READY)) {
916                 if (sock->state == SS_LISTENING) {
917                         rc = -EPIPE;
918                         goto exit;
919                 }
920                 if (sock->state != SS_UNCONNECTED) {
921                         rc = -EISCONN;
922                         goto exit;
923                 }
924                 if (tsk->published) {
925                         rc = -EOPNOTSUPP;
926                         goto exit;
927                 }
928                 if (dest->addrtype == TIPC_ADDR_NAME) {
929                         tsk->conn_type = dest->addr.name.name.type;
930                         tsk->conn_instance = dest->addr.name.name.instance;
931                 }
932         }
933
934         timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
935
936         if (dest->addrtype == TIPC_ADDR_MCAST) {
937                 rc = tipc_sendmcast(sock, seq, m, dsz, timeo);
938                 goto exit;
939         } else if (dest->addrtype == TIPC_ADDR_NAME) {
940                 u32 type = dest->addr.name.name.type;
941                 u32 inst = dest->addr.name.name.instance;
942                 u32 domain = dest->addr.name.domain;
943
944                 dnode = domain;
945                 msg_set_type(mhdr, TIPC_NAMED_MSG);
946                 msg_set_hdr_sz(mhdr, NAMED_H_SIZE);
947                 msg_set_nametype(mhdr, type);
948                 msg_set_nameinst(mhdr, inst);
949                 msg_set_lookup_scope(mhdr, tipc_addr_scope(domain));
950                 dport = tipc_nametbl_translate(net, type, inst, &dnode);
951                 msg_set_destnode(mhdr, dnode);
952                 msg_set_destport(mhdr, dport);
953                 if (unlikely(!dport && !dnode)) {
954                         rc = -EHOSTUNREACH;
955                         goto exit;
956                 }
957         } else if (dest->addrtype == TIPC_ADDR_ID) {
958                 dnode = dest->addr.id.node;
959                 msg_set_type(mhdr, TIPC_DIRECT_MSG);
960                 msg_set_lookup_scope(mhdr, 0);
961                 msg_set_destnode(mhdr, dnode);
962                 msg_set_destport(mhdr, dest->addr.id.ref);
963                 msg_set_hdr_sz(mhdr, BASIC_H_SIZE);
964         }
965
966 new_mtu:
967         mtu = tipc_node_get_mtu(net, dnode, tsk->portid);
968         __skb_queue_head_init(&head);
969         rc = tipc_msg_build(net, mhdr, m, 0, dsz, mtu, &head);
970         if (rc < 0)
971                 goto exit;
972
973         do {
974                 skb = skb_peek(&head);
975                 TIPC_SKB_CB(skb)->wakeup_pending = tsk->link_cong;
976                 rc = tipc_link_xmit(net, &head, dnode, tsk->portid);
977                 if (likely(rc >= 0)) {
978                         if (sock->state != SS_READY)
979                                 sock->state = SS_CONNECTING;
980                         rc = dsz;
981                         break;
982                 }
983                 if (rc == -EMSGSIZE)
984                         goto new_mtu;
985                 if (rc != -ELINKCONG)
986                         break;
987                 tsk->link_cong = 1;
988                 rc = tipc_wait_for_sndmsg(sock, &timeo);
989                 if (rc)
990                         __skb_queue_purge(&head);
991         } while (!rc);
992 exit:
993         if (iocb)
994                 release_sock(sk);
995
996         return rc;
997 }
998
999 static int tipc_wait_for_sndpkt(struct socket *sock, long *timeo_p)
1000 {
1001         struct sock *sk = sock->sk;
1002         struct tipc_sock *tsk = tipc_sk(sk);
1003         DEFINE_WAIT(wait);
1004         int done;
1005
1006         do {
1007                 int err = sock_error(sk);
1008                 if (err)
1009                         return err;
1010                 if (sock->state == SS_DISCONNECTING)
1011                         return -EPIPE;
1012                 else if (sock->state != SS_CONNECTED)
1013                         return -ENOTCONN;
1014                 if (!*timeo_p)
1015                         return -EAGAIN;
1016                 if (signal_pending(current))
1017                         return sock_intr_errno(*timeo_p);
1018
1019                 prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
1020                 done = sk_wait_event(sk, timeo_p,
1021                                      (!tsk->link_cong &&
1022                                       !tsk_conn_cong(tsk)) ||
1023                                      !tsk->connected);
1024                 finish_wait(sk_sleep(sk), &wait);
1025         } while (!done);
1026         return 0;
1027 }
1028
1029 /**
1030  * tipc_send_stream - send stream-oriented data
1031  * @iocb: (unused)
1032  * @sock: socket structure
1033  * @m: data to send
1034  * @dsz: total length of data to be transmitted
1035  *
1036  * Used for SOCK_STREAM data.
1037  *
1038  * Returns the number of bytes sent on success (or partial success),
1039  * or errno if no data sent
1040  */
1041 static int tipc_send_stream(struct kiocb *iocb, struct socket *sock,
1042                             struct msghdr *m, size_t dsz)
1043 {
1044         struct sock *sk = sock->sk;
1045         struct net *net = sock_net(sk);
1046         struct tipc_sock *tsk = tipc_sk(sk);
1047         struct tipc_msg *mhdr = &tsk->phdr;
1048         struct sk_buff_head head;
1049         DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
1050         u32 portid = tsk->portid;
1051         int rc = -EINVAL;
1052         long timeo;
1053         u32 dnode;
1054         uint mtu, send, sent = 0;
1055
1056         /* Handle implied connection establishment */
1057         if (unlikely(dest)) {
1058                 rc = tipc_sendmsg(iocb, sock, m, dsz);
1059                 if (dsz && (dsz == rc))
1060                         tsk->sent_unacked = 1;
1061                 return rc;
1062         }
1063         if (dsz > (uint)INT_MAX)
1064                 return -EMSGSIZE;
1065
1066         if (iocb)
1067                 lock_sock(sk);
1068
1069         if (unlikely(sock->state != SS_CONNECTED)) {
1070                 if (sock->state == SS_DISCONNECTING)
1071                         rc = -EPIPE;
1072                 else
1073                         rc = -ENOTCONN;
1074                 goto exit;
1075         }
1076
1077         timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
1078         dnode = tsk_peer_node(tsk);
1079
1080 next:
1081         mtu = tsk->max_pkt;
1082         send = min_t(uint, dsz - sent, TIPC_MAX_USER_MSG_SIZE);
1083         __skb_queue_head_init(&head);
1084         rc = tipc_msg_build(net, mhdr, m, sent, send, mtu, &head);
1085         if (unlikely(rc < 0))
1086                 goto exit;
1087         do {
1088                 if (likely(!tsk_conn_cong(tsk))) {
1089                         rc = tipc_link_xmit(net, &head, dnode, portid);
1090                         if (likely(!rc)) {
1091                                 tsk->sent_unacked++;
1092                                 sent += send;
1093                                 if (sent == dsz)
1094                                         break;
1095                                 goto next;
1096                         }
1097                         if (rc == -EMSGSIZE) {
1098                                 tsk->max_pkt = tipc_node_get_mtu(net, dnode,
1099                                                                  portid);
1100                                 goto next;
1101                         }
1102                         if (rc != -ELINKCONG)
1103                                 break;
1104                         tsk->link_cong = 1;
1105                 }
1106                 rc = tipc_wait_for_sndpkt(sock, &timeo);
1107                 if (rc)
1108                         __skb_queue_purge(&head);
1109         } while (!rc);
1110 exit:
1111         if (iocb)
1112                 release_sock(sk);
1113         return sent ? sent : rc;
1114 }
1115
1116 /**
1117  * tipc_send_packet - send a connection-oriented message
1118  * @iocb: if NULL, indicates that socket lock is already held
1119  * @sock: socket structure
1120  * @m: message to send
1121  * @dsz: length of data to be transmitted
1122  *
1123  * Used for SOCK_SEQPACKET messages.
1124  *
1125  * Returns the number of bytes sent on success, or errno otherwise
1126  */
1127 static int tipc_send_packet(struct kiocb *iocb, struct socket *sock,
1128                             struct msghdr *m, size_t dsz)
1129 {
1130         if (dsz > TIPC_MAX_USER_MSG_SIZE)
1131                 return -EMSGSIZE;
1132
1133         return tipc_send_stream(iocb, sock, m, dsz);
1134 }
1135
1136 /* tipc_sk_finish_conn - complete the setup of a connection
1137  */
1138 static void tipc_sk_finish_conn(struct tipc_sock *tsk, u32 peer_port,
1139                                 u32 peer_node)
1140 {
1141         struct sock *sk = &tsk->sk;
1142         struct net *net = sock_net(sk);
1143         struct tipc_msg *msg = &tsk->phdr;
1144
1145         msg_set_destnode(msg, peer_node);
1146         msg_set_destport(msg, peer_port);
1147         msg_set_type(msg, TIPC_CONN_MSG);
1148         msg_set_lookup_scope(msg, 0);
1149         msg_set_hdr_sz(msg, SHORT_H_SIZE);
1150
1151         tsk->probing_intv = CONN_PROBING_INTERVAL;
1152         tsk->probing_state = TIPC_CONN_OK;
1153         tsk->connected = 1;
1154         sk_reset_timer(sk, &sk->sk_timer, jiffies + tsk->probing_intv);
1155         tipc_node_add_conn(net, peer_node, tsk->portid, peer_port);
1156         tsk->max_pkt = tipc_node_get_mtu(net, peer_node, tsk->portid);
1157 }
1158
1159 /**
1160  * set_orig_addr - capture sender's address for received message
1161  * @m: descriptor for message info
1162  * @msg: received message header
1163  *
1164  * Note: Address is not captured if not requested by receiver.
1165  */
1166 static void set_orig_addr(struct msghdr *m, struct tipc_msg *msg)
1167 {
1168         DECLARE_SOCKADDR(struct sockaddr_tipc *, addr, m->msg_name);
1169
1170         if (addr) {
1171                 addr->family = AF_TIPC;
1172                 addr->addrtype = TIPC_ADDR_ID;
1173                 memset(&addr->addr, 0, sizeof(addr->addr));
1174                 addr->addr.id.ref = msg_origport(msg);
1175                 addr->addr.id.node = msg_orignode(msg);
1176                 addr->addr.name.domain = 0;     /* could leave uninitialized */
1177                 addr->scope = 0;                /* could leave uninitialized */
1178                 m->msg_namelen = sizeof(struct sockaddr_tipc);
1179         }
1180 }
1181
1182 /**
1183  * tipc_sk_anc_data_recv - optionally capture ancillary data for received message
1184  * @m: descriptor for message info
1185  * @msg: received message header
1186  * @tsk: TIPC port associated with message
1187  *
1188  * Note: Ancillary data is not captured if not requested by receiver.
1189  *
1190  * Returns 0 if successful, otherwise errno
1191  */
1192 static int tipc_sk_anc_data_recv(struct msghdr *m, struct tipc_msg *msg,
1193                                  struct tipc_sock *tsk)
1194 {
1195         u32 anc_data[3];
1196         u32 err;
1197         u32 dest_type;
1198         int has_name;
1199         int res;
1200
1201         if (likely(m->msg_controllen == 0))
1202                 return 0;
1203
1204         /* Optionally capture errored message object(s) */
1205         err = msg ? msg_errcode(msg) : 0;
1206         if (unlikely(err)) {
1207                 anc_data[0] = err;
1208                 anc_data[1] = msg_data_sz(msg);
1209                 res = put_cmsg(m, SOL_TIPC, TIPC_ERRINFO, 8, anc_data);
1210                 if (res)
1211                         return res;
1212                 if (anc_data[1]) {
1213                         res = put_cmsg(m, SOL_TIPC, TIPC_RETDATA, anc_data[1],
1214                                        msg_data(msg));
1215                         if (res)
1216                                 return res;
1217                 }
1218         }
1219
1220         /* Optionally capture message destination object */
1221         dest_type = msg ? msg_type(msg) : TIPC_DIRECT_MSG;
1222         switch (dest_type) {
1223         case TIPC_NAMED_MSG:
1224                 has_name = 1;
1225                 anc_data[0] = msg_nametype(msg);
1226                 anc_data[1] = msg_namelower(msg);
1227                 anc_data[2] = msg_namelower(msg);
1228                 break;
1229         case TIPC_MCAST_MSG:
1230                 has_name = 1;
1231                 anc_data[0] = msg_nametype(msg);
1232                 anc_data[1] = msg_namelower(msg);
1233                 anc_data[2] = msg_nameupper(msg);
1234                 break;
1235         case TIPC_CONN_MSG:
1236                 has_name = (tsk->conn_type != 0);
1237                 anc_data[0] = tsk->conn_type;
1238                 anc_data[1] = tsk->conn_instance;
1239                 anc_data[2] = tsk->conn_instance;
1240                 break;
1241         default:
1242                 has_name = 0;
1243         }
1244         if (has_name) {
1245                 res = put_cmsg(m, SOL_TIPC, TIPC_DESTNAME, 12, anc_data);
1246                 if (res)
1247                         return res;
1248         }
1249
1250         return 0;
1251 }
1252
1253 static void tipc_sk_send_ack(struct tipc_sock *tsk, uint ack)
1254 {
1255         struct net *net = sock_net(&tsk->sk);
1256         struct tipc_net *tn = net_generic(net, tipc_net_id);
1257         struct sk_buff *skb = NULL;
1258         struct tipc_msg *msg;
1259         u32 peer_port = tsk_peer_port(tsk);
1260         u32 dnode = tsk_peer_node(tsk);
1261
1262         if (!tsk->connected)
1263                 return;
1264         skb = tipc_msg_create(net, CONN_MANAGER, CONN_ACK, INT_H_SIZE, 0,
1265                               dnode, tn->own_addr, peer_port, tsk->portid,
1266                               TIPC_OK);
1267         if (!skb)
1268                 return;
1269         msg = buf_msg(skb);
1270         msg_set_msgcnt(msg, ack);
1271         tipc_link_xmit_skb(net, skb, dnode, msg_link_selector(msg));
1272 }
1273
1274 static int tipc_wait_for_rcvmsg(struct socket *sock, long *timeop)
1275 {
1276         struct sock *sk = sock->sk;
1277         DEFINE_WAIT(wait);
1278         long timeo = *timeop;
1279         int err;
1280
1281         for (;;) {
1282                 prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
1283                 if (timeo && skb_queue_empty(&sk->sk_receive_queue)) {
1284                         if (sock->state == SS_DISCONNECTING) {
1285                                 err = -ENOTCONN;
1286                                 break;
1287                         }
1288                         release_sock(sk);
1289                         timeo = schedule_timeout(timeo);
1290                         lock_sock(sk);
1291                 }
1292                 err = 0;
1293                 if (!skb_queue_empty(&sk->sk_receive_queue))
1294                         break;
1295                 err = sock_intr_errno(timeo);
1296                 if (signal_pending(current))
1297                         break;
1298                 err = -EAGAIN;
1299                 if (!timeo)
1300                         break;
1301         }
1302         finish_wait(sk_sleep(sk), &wait);
1303         *timeop = timeo;
1304         return err;
1305 }
1306
1307 /**
1308  * tipc_recvmsg - receive packet-oriented message
1309  * @iocb: (unused)
1310  * @m: descriptor for message info
1311  * @buf_len: total size of user buffer area
1312  * @flags: receive flags
1313  *
1314  * Used for SOCK_DGRAM, SOCK_RDM, and SOCK_SEQPACKET messages.
1315  * If the complete message doesn't fit in user area, truncate it.
1316  *
1317  * Returns size of returned message data, errno otherwise
1318  */
1319 static int tipc_recvmsg(struct kiocb *iocb, struct socket *sock,
1320                         struct msghdr *m, size_t buf_len, int flags)
1321 {
1322         struct sock *sk = sock->sk;
1323         struct tipc_sock *tsk = tipc_sk(sk);
1324         struct sk_buff *buf;
1325         struct tipc_msg *msg;
1326         long timeo;
1327         unsigned int sz;
1328         u32 err;
1329         int res;
1330
1331         /* Catch invalid receive requests */
1332         if (unlikely(!buf_len))
1333                 return -EINVAL;
1334
1335         lock_sock(sk);
1336
1337         if (unlikely(sock->state == SS_UNCONNECTED)) {
1338                 res = -ENOTCONN;
1339                 goto exit;
1340         }
1341
1342         timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
1343 restart:
1344
1345         /* Look for a message in receive queue; wait if necessary */
1346         res = tipc_wait_for_rcvmsg(sock, &timeo);
1347         if (res)
1348                 goto exit;
1349
1350         /* Look at first message in receive queue */
1351         buf = skb_peek(&sk->sk_receive_queue);
1352         msg = buf_msg(buf);
1353         sz = msg_data_sz(msg);
1354         err = msg_errcode(msg);
1355
1356         /* Discard an empty non-errored message & try again */
1357         if ((!sz) && (!err)) {
1358                 tsk_advance_rx_queue(sk);
1359                 goto restart;
1360         }
1361
1362         /* Capture sender's address (optional) */
1363         set_orig_addr(m, msg);
1364
1365         /* Capture ancillary data (optional) */
1366         res = tipc_sk_anc_data_recv(m, msg, tsk);
1367         if (res)
1368                 goto exit;
1369
1370         /* Capture message data (if valid) & compute return value (always) */
1371         if (!err) {
1372                 if (unlikely(buf_len < sz)) {
1373                         sz = buf_len;
1374                         m->msg_flags |= MSG_TRUNC;
1375                 }
1376                 res = skb_copy_datagram_msg(buf, msg_hdr_sz(msg), m, sz);
1377                 if (res)
1378                         goto exit;
1379                 res = sz;
1380         } else {
1381                 if ((sock->state == SS_READY) ||
1382                     ((err == TIPC_CONN_SHUTDOWN) || m->msg_control))
1383                         res = 0;
1384                 else
1385                         res = -ECONNRESET;
1386         }
1387
1388         /* Consume received message (optional) */
1389         if (likely(!(flags & MSG_PEEK))) {
1390                 if ((sock->state != SS_READY) &&
1391                     (++tsk->rcv_unacked >= TIPC_CONNACK_INTV)) {
1392                         tipc_sk_send_ack(tsk, tsk->rcv_unacked);
1393                         tsk->rcv_unacked = 0;
1394                 }
1395                 tsk_advance_rx_queue(sk);
1396         }
1397 exit:
1398         release_sock(sk);
1399         return res;
1400 }
1401
1402 /**
1403  * tipc_recv_stream - receive stream-oriented data
1404  * @iocb: (unused)
1405  * @m: descriptor for message info
1406  * @buf_len: total size of user buffer area
1407  * @flags: receive flags
1408  *
1409  * Used for SOCK_STREAM messages only.  If not enough data is available
1410  * will optionally wait for more; never truncates data.
1411  *
1412  * Returns size of returned message data, errno otherwise
1413  */
1414 static int tipc_recv_stream(struct kiocb *iocb, struct socket *sock,
1415                             struct msghdr *m, size_t buf_len, int flags)
1416 {
1417         struct sock *sk = sock->sk;
1418         struct tipc_sock *tsk = tipc_sk(sk);
1419         struct sk_buff *buf;
1420         struct tipc_msg *msg;
1421         long timeo;
1422         unsigned int sz;
1423         int sz_to_copy, target, needed;
1424         int sz_copied = 0;
1425         u32 err;
1426         int res = 0;
1427
1428         /* Catch invalid receive attempts */
1429         if (unlikely(!buf_len))
1430                 return -EINVAL;
1431
1432         lock_sock(sk);
1433
1434         if (unlikely(sock->state == SS_UNCONNECTED)) {
1435                 res = -ENOTCONN;
1436                 goto exit;
1437         }
1438
1439         target = sock_rcvlowat(sk, flags & MSG_WAITALL, buf_len);
1440         timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
1441
1442 restart:
1443         /* Look for a message in receive queue; wait if necessary */
1444         res = tipc_wait_for_rcvmsg(sock, &timeo);
1445         if (res)
1446                 goto exit;
1447
1448         /* Look at first message in receive queue */
1449         buf = skb_peek(&sk->sk_receive_queue);
1450         msg = buf_msg(buf);
1451         sz = msg_data_sz(msg);
1452         err = msg_errcode(msg);
1453
1454         /* Discard an empty non-errored message & try again */
1455         if ((!sz) && (!err)) {
1456                 tsk_advance_rx_queue(sk);
1457                 goto restart;
1458         }
1459
1460         /* Optionally capture sender's address & ancillary data of first msg */
1461         if (sz_copied == 0) {
1462                 set_orig_addr(m, msg);
1463                 res = tipc_sk_anc_data_recv(m, msg, tsk);
1464                 if (res)
1465                         goto exit;
1466         }
1467
1468         /* Capture message data (if valid) & compute return value (always) */
1469         if (!err) {
1470                 u32 offset = (u32)(unsigned long)(TIPC_SKB_CB(buf)->handle);
1471
1472                 sz -= offset;
1473                 needed = (buf_len - sz_copied);
1474                 sz_to_copy = (sz <= needed) ? sz : needed;
1475
1476                 res = skb_copy_datagram_msg(buf, msg_hdr_sz(msg) + offset,
1477                                             m, sz_to_copy);
1478                 if (res)
1479                         goto exit;
1480
1481                 sz_copied += sz_to_copy;
1482
1483                 if (sz_to_copy < sz) {
1484                         if (!(flags & MSG_PEEK))
1485                                 TIPC_SKB_CB(buf)->handle =
1486                                 (void *)(unsigned long)(offset + sz_to_copy);
1487                         goto exit;
1488                 }
1489         } else {
1490                 if (sz_copied != 0)
1491                         goto exit; /* can't add error msg to valid data */
1492
1493                 if ((err == TIPC_CONN_SHUTDOWN) || m->msg_control)
1494                         res = 0;
1495                 else
1496                         res = -ECONNRESET;
1497         }
1498
1499         /* Consume received message (optional) */
1500         if (likely(!(flags & MSG_PEEK))) {
1501                 if (unlikely(++tsk->rcv_unacked >= TIPC_CONNACK_INTV)) {
1502                         tipc_sk_send_ack(tsk, tsk->rcv_unacked);
1503                         tsk->rcv_unacked = 0;
1504                 }
1505                 tsk_advance_rx_queue(sk);
1506         }
1507
1508         /* Loop around if more data is required */
1509         if ((sz_copied < buf_len) &&    /* didn't get all requested data */
1510             (!skb_queue_empty(&sk->sk_receive_queue) ||
1511             (sz_copied < target)) &&    /* and more is ready or required */
1512             (!(flags & MSG_PEEK)) &&    /* and aren't just peeking at data */
1513             (!err))                     /* and haven't reached a FIN */
1514                 goto restart;
1515
1516 exit:
1517         release_sock(sk);
1518         return sz_copied ? sz_copied : res;
1519 }
1520
1521 /**
1522  * tipc_write_space - wake up thread if port congestion is released
1523  * @sk: socket
1524  */
1525 static void tipc_write_space(struct sock *sk)
1526 {
1527         struct socket_wq *wq;
1528
1529         rcu_read_lock();
1530         wq = rcu_dereference(sk->sk_wq);
1531         if (wq_has_sleeper(wq))
1532                 wake_up_interruptible_sync_poll(&wq->wait, POLLOUT |
1533                                                 POLLWRNORM | POLLWRBAND);
1534         rcu_read_unlock();
1535 }
1536
1537 /**
1538  * tipc_data_ready - wake up threads to indicate messages have been received
1539  * @sk: socket
1540  * @len: the length of messages
1541  */
1542 static void tipc_data_ready(struct sock *sk)
1543 {
1544         struct socket_wq *wq;
1545
1546         rcu_read_lock();
1547         wq = rcu_dereference(sk->sk_wq);
1548         if (wq_has_sleeper(wq))
1549                 wake_up_interruptible_sync_poll(&wq->wait, POLLIN |
1550                                                 POLLRDNORM | POLLRDBAND);
1551         rcu_read_unlock();
1552 }
1553
1554 /**
1555  * filter_connect - Handle all incoming messages for a connection-based socket
1556  * @tsk: TIPC socket
1557  * @msg: message
1558  *
1559  * Returns 0 (TIPC_OK) if everything ok, -TIPC_ERR_NO_PORT otherwise
1560  */
1561 static int filter_connect(struct tipc_sock *tsk, struct sk_buff **buf)
1562 {
1563         struct sock *sk = &tsk->sk;
1564         struct net *net = sock_net(sk);
1565         struct socket *sock = sk->sk_socket;
1566         struct tipc_msg *msg = buf_msg(*buf);
1567         int retval = -TIPC_ERR_NO_PORT;
1568
1569         if (msg_mcast(msg))
1570                 return retval;
1571
1572         switch ((int)sock->state) {
1573         case SS_CONNECTED:
1574                 /* Accept only connection-based messages sent by peer */
1575                 if (tsk_peer_msg(tsk, msg)) {
1576                         if (unlikely(msg_errcode(msg))) {
1577                                 sock->state = SS_DISCONNECTING;
1578                                 tsk->connected = 0;
1579                                 /* let timer expire on it's own */
1580                                 tipc_node_remove_conn(net, tsk_peer_node(tsk),
1581                                                       tsk->portid);
1582                         }
1583                         retval = TIPC_OK;
1584                 }
1585                 break;
1586         case SS_CONNECTING:
1587                 /* Accept only ACK or NACK message */
1588
1589                 if (unlikely(!msg_connected(msg)))
1590                         break;
1591
1592                 if (unlikely(msg_errcode(msg))) {
1593                         sock->state = SS_DISCONNECTING;
1594                         sk->sk_err = ECONNREFUSED;
1595                         retval = TIPC_OK;
1596                         break;
1597                 }
1598
1599                 if (unlikely(msg_importance(msg) > TIPC_CRITICAL_IMPORTANCE)) {
1600                         sock->state = SS_DISCONNECTING;
1601                         sk->sk_err = EINVAL;
1602                         retval = TIPC_OK;
1603                         break;
1604                 }
1605
1606                 tipc_sk_finish_conn(tsk, msg_origport(msg), msg_orignode(msg));
1607                 msg_set_importance(&tsk->phdr, msg_importance(msg));
1608                 sock->state = SS_CONNECTED;
1609
1610                 /* If an incoming message is an 'ACK-', it should be
1611                  * discarded here because it doesn't contain useful
1612                  * data. In addition, we should try to wake up
1613                  * connect() routine if sleeping.
1614                  */
1615                 if (msg_data_sz(msg) == 0) {
1616                         kfree_skb(*buf);
1617                         *buf = NULL;
1618                         if (waitqueue_active(sk_sleep(sk)))
1619                                 wake_up_interruptible(sk_sleep(sk));
1620                 }
1621                 retval = TIPC_OK;
1622                 break;
1623         case SS_LISTENING:
1624         case SS_UNCONNECTED:
1625                 /* Accept only SYN message */
1626                 if (!msg_connected(msg) && !(msg_errcode(msg)))
1627                         retval = TIPC_OK;
1628                 break;
1629         case SS_DISCONNECTING:
1630                 break;
1631         default:
1632                 pr_err("Unknown socket state %u\n", sock->state);
1633         }
1634         return retval;
1635 }
1636
1637 /**
1638  * rcvbuf_limit - get proper overload limit of socket receive queue
1639  * @sk: socket
1640  * @buf: message
1641  *
1642  * For all connection oriented messages, irrespective of importance,
1643  * the default overload value (i.e. 67MB) is set as limit.
1644  *
1645  * For all connectionless messages, by default new queue limits are
1646  * as belows:
1647  *
1648  * TIPC_LOW_IMPORTANCE       (4 MB)
1649  * TIPC_MEDIUM_IMPORTANCE    (8 MB)
1650  * TIPC_HIGH_IMPORTANCE      (16 MB)
1651  * TIPC_CRITICAL_IMPORTANCE  (32 MB)
1652  *
1653  * Returns overload limit according to corresponding message importance
1654  */
1655 static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *buf)
1656 {
1657         struct tipc_msg *msg = buf_msg(buf);
1658
1659         if (msg_connected(msg))
1660                 return sysctl_tipc_rmem[2];
1661
1662         return sk->sk_rcvbuf >> TIPC_CRITICAL_IMPORTANCE <<
1663                 msg_importance(msg);
1664 }
1665
1666 /**
1667  * filter_rcv - validate incoming message
1668  * @sk: socket
1669  * @buf: message
1670  *
1671  * Enqueues message on receive queue if acceptable; optionally handles
1672  * disconnect indication for a connected socket.
1673  *
1674  * Called with socket lock already taken; port lock may also be taken.
1675  *
1676  * Returns 0 (TIPC_OK) if message was consumed, -TIPC error code if message
1677  * to be rejected, 1 (TIPC_FWD_MSG) if (CONN_MANAGER) message to be forwarded
1678  */
1679 static int filter_rcv(struct sock *sk, struct sk_buff *buf)
1680 {
1681         struct socket *sock = sk->sk_socket;
1682         struct tipc_sock *tsk = tipc_sk(sk);
1683         struct tipc_msg *msg = buf_msg(buf);
1684         unsigned int limit = rcvbuf_limit(sk, buf);
1685         u32 onode;
1686         int rc = TIPC_OK;
1687
1688         if (unlikely(msg_user(msg) == CONN_MANAGER))
1689                 return tipc_sk_proto_rcv(tsk, &onode, buf);
1690
1691         if (unlikely(msg_user(msg) == SOCK_WAKEUP)) {
1692                 kfree_skb(buf);
1693                 tsk->link_cong = 0;
1694                 sk->sk_write_space(sk);
1695                 return TIPC_OK;
1696         }
1697
1698         /* Reject message if it is wrong sort of message for socket */
1699         if (msg_type(msg) > TIPC_DIRECT_MSG)
1700                 return -TIPC_ERR_NO_PORT;
1701
1702         if (sock->state == SS_READY) {
1703                 if (msg_connected(msg))
1704                         return -TIPC_ERR_NO_PORT;
1705         } else {
1706                 rc = filter_connect(tsk, &buf);
1707                 if (rc != TIPC_OK || buf == NULL)
1708                         return rc;
1709         }
1710
1711         /* Reject message if there isn't room to queue it */
1712         if (sk_rmem_alloc_get(sk) + buf->truesize >= limit)
1713                 return -TIPC_ERR_OVERLOAD;
1714
1715         /* Enqueue message */
1716         TIPC_SKB_CB(buf)->handle = NULL;
1717         __skb_queue_tail(&sk->sk_receive_queue, buf);
1718         skb_set_owner_r(buf, sk);
1719
1720         sk->sk_data_ready(sk);
1721         return TIPC_OK;
1722 }
1723
1724 /**
1725  * tipc_backlog_rcv - handle incoming message from backlog queue
1726  * @sk: socket
1727  * @skb: message
1728  *
1729  * Caller must hold socket lock, but not port lock.
1730  *
1731  * Returns 0
1732  */
1733 static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb)
1734 {
1735         int rc;
1736         u32 onode;
1737         struct tipc_sock *tsk = tipc_sk(sk);
1738         struct net *net = sock_net(sk);
1739         uint truesize = skb->truesize;
1740
1741         rc = filter_rcv(sk, skb);
1742
1743         if (likely(!rc)) {
1744                 if (atomic_read(&tsk->dupl_rcvcnt) < TIPC_CONN_OVERLOAD_LIMIT)
1745                         atomic_add(truesize, &tsk->dupl_rcvcnt);
1746                 return 0;
1747         }
1748
1749         if ((rc < 0) && !tipc_msg_reverse(net, skb, &onode, -rc))
1750                 return 0;
1751
1752         tipc_link_xmit_skb(net, skb, onode, 0);
1753
1754         return 0;
1755 }
1756
1757 /**
1758  * tipc_sk_rcv - handle incoming message
1759  * @skb: buffer containing arriving message
1760  * Consumes buffer
1761  * Returns 0 if success, or errno: -EHOSTUNREACH
1762  */
1763 int tipc_sk_rcv(struct net *net, struct sk_buff *skb)
1764 {
1765         struct tipc_sock *tsk;
1766         struct sock *sk;
1767         u32 dport = msg_destport(buf_msg(skb));
1768         int rc = TIPC_OK;
1769         uint limit;
1770         u32 dnode;
1771
1772         /* Validate destination and message */
1773         tsk = tipc_sk_lookup(net, dport);
1774         if (unlikely(!tsk)) {
1775                 rc = tipc_msg_eval(net, skb, &dnode);
1776                 goto exit;
1777         }
1778         sk = &tsk->sk;
1779
1780         /* Queue message */
1781         spin_lock_bh(&sk->sk_lock.slock);
1782
1783         if (!sock_owned_by_user(sk)) {
1784                 rc = filter_rcv(sk, skb);
1785         } else {
1786                 if (sk->sk_backlog.len == 0)
1787                         atomic_set(&tsk->dupl_rcvcnt, 0);
1788                 limit = rcvbuf_limit(sk, skb) + atomic_read(&tsk->dupl_rcvcnt);
1789                 if (sk_add_backlog(sk, skb, limit))
1790                         rc = -TIPC_ERR_OVERLOAD;
1791         }
1792         spin_unlock_bh(&sk->sk_lock.slock);
1793         sock_put(sk);
1794         if (likely(!rc))
1795                 return 0;
1796 exit:
1797         if ((rc < 0) && !tipc_msg_reverse(net, skb, &dnode, -rc))
1798                 return -EHOSTUNREACH;
1799
1800         tipc_link_xmit_skb(net, skb, dnode, 0);
1801         return (rc < 0) ? -EHOSTUNREACH : 0;
1802 }
1803
1804 static int tipc_wait_for_connect(struct socket *sock, long *timeo_p)
1805 {
1806         struct sock *sk = sock->sk;
1807         DEFINE_WAIT(wait);
1808         int done;
1809
1810         do {
1811                 int err = sock_error(sk);
1812                 if (err)
1813                         return err;
1814                 if (!*timeo_p)
1815                         return -ETIMEDOUT;
1816                 if (signal_pending(current))
1817                         return sock_intr_errno(*timeo_p);
1818
1819                 prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
1820                 done = sk_wait_event(sk, timeo_p, sock->state != SS_CONNECTING);
1821                 finish_wait(sk_sleep(sk), &wait);
1822         } while (!done);
1823         return 0;
1824 }
1825
1826 /**
1827  * tipc_connect - establish a connection to another TIPC port
1828  * @sock: socket structure
1829  * @dest: socket address for destination port
1830  * @destlen: size of socket address data structure
1831  * @flags: file-related flags associated with socket
1832  *
1833  * Returns 0 on success, errno otherwise
1834  */
1835 static int tipc_connect(struct socket *sock, struct sockaddr *dest,
1836                         int destlen, int flags)
1837 {
1838         struct sock *sk = sock->sk;
1839         struct sockaddr_tipc *dst = (struct sockaddr_tipc *)dest;
1840         struct msghdr m = {NULL,};
1841         long timeout = (flags & O_NONBLOCK) ? 0 : tipc_sk(sk)->conn_timeout;
1842         socket_state previous;
1843         int res;
1844
1845         lock_sock(sk);
1846
1847         /* For now, TIPC does not allow use of connect() with DGRAM/RDM types */
1848         if (sock->state == SS_READY) {
1849                 res = -EOPNOTSUPP;
1850                 goto exit;
1851         }
1852
1853         /*
1854          * Reject connection attempt using multicast address
1855          *
1856          * Note: send_msg() validates the rest of the address fields,
1857          *       so there's no need to do it here
1858          */
1859         if (dst->addrtype == TIPC_ADDR_MCAST) {
1860                 res = -EINVAL;
1861                 goto exit;
1862         }
1863
1864         previous = sock->state;
1865         switch (sock->state) {
1866         case SS_UNCONNECTED:
1867                 /* Send a 'SYN-' to destination */
1868                 m.msg_name = dest;
1869                 m.msg_namelen = destlen;
1870
1871                 /* If connect is in non-blocking case, set MSG_DONTWAIT to
1872                  * indicate send_msg() is never blocked.
1873                  */
1874                 if (!timeout)
1875                         m.msg_flags = MSG_DONTWAIT;
1876
1877                 res = tipc_sendmsg(NULL, sock, &m, 0);
1878                 if ((res < 0) && (res != -EWOULDBLOCK))
1879                         goto exit;
1880
1881                 /* Just entered SS_CONNECTING state; the only
1882                  * difference is that return value in non-blocking
1883                  * case is EINPROGRESS, rather than EALREADY.
1884                  */
1885                 res = -EINPROGRESS;
1886         case SS_CONNECTING:
1887                 if (previous == SS_CONNECTING)
1888                         res = -EALREADY;
1889                 if (!timeout)
1890                         goto exit;
1891                 timeout = msecs_to_jiffies(timeout);
1892                 /* Wait until an 'ACK' or 'RST' arrives, or a timeout occurs */
1893                 res = tipc_wait_for_connect(sock, &timeout);
1894                 break;
1895         case SS_CONNECTED:
1896                 res = -EISCONN;
1897                 break;
1898         default:
1899                 res = -EINVAL;
1900                 break;
1901         }
1902 exit:
1903         release_sock(sk);
1904         return res;
1905 }
1906
1907 /**
1908  * tipc_listen - allow socket to listen for incoming connections
1909  * @sock: socket structure
1910  * @len: (unused)
1911  *
1912  * Returns 0 on success, errno otherwise
1913  */
1914 static int tipc_listen(struct socket *sock, int len)
1915 {
1916         struct sock *sk = sock->sk;
1917         int res;
1918
1919         lock_sock(sk);
1920
1921         if (sock->state != SS_UNCONNECTED)
1922                 res = -EINVAL;
1923         else {
1924                 sock->state = SS_LISTENING;
1925                 res = 0;
1926         }
1927
1928         release_sock(sk);
1929         return res;
1930 }
1931
1932 static int tipc_wait_for_accept(struct socket *sock, long timeo)
1933 {
1934         struct sock *sk = sock->sk;
1935         DEFINE_WAIT(wait);
1936         int err;
1937
1938         /* True wake-one mechanism for incoming connections: only
1939          * one process gets woken up, not the 'whole herd'.
1940          * Since we do not 'race & poll' for established sockets
1941          * anymore, the common case will execute the loop only once.
1942         */
1943         for (;;) {
1944                 prepare_to_wait_exclusive(sk_sleep(sk), &wait,
1945                                           TASK_INTERRUPTIBLE);
1946                 if (timeo && skb_queue_empty(&sk->sk_receive_queue)) {
1947                         release_sock(sk);
1948                         timeo = schedule_timeout(timeo);
1949                         lock_sock(sk);
1950                 }
1951                 err = 0;
1952                 if (!skb_queue_empty(&sk->sk_receive_queue))
1953                         break;
1954                 err = -EINVAL;
1955                 if (sock->state != SS_LISTENING)
1956                         break;
1957                 err = sock_intr_errno(timeo);
1958                 if (signal_pending(current))
1959                         break;
1960                 err = -EAGAIN;
1961                 if (!timeo)
1962                         break;
1963         }
1964         finish_wait(sk_sleep(sk), &wait);
1965         return err;
1966 }
1967
1968 /**
1969  * tipc_accept - wait for connection request
1970  * @sock: listening socket
1971  * @newsock: new socket that is to be connected
1972  * @flags: file-related flags associated with socket
1973  *
1974  * Returns 0 on success, errno otherwise
1975  */
1976 static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags)
1977 {
1978         struct sock *new_sk, *sk = sock->sk;
1979         struct sk_buff *buf;
1980         struct tipc_sock *new_tsock;
1981         struct tipc_msg *msg;
1982         long timeo;
1983         int res;
1984
1985         lock_sock(sk);
1986
1987         if (sock->state != SS_LISTENING) {
1988                 res = -EINVAL;
1989                 goto exit;
1990         }
1991         timeo = sock_rcvtimeo(sk, flags & O_NONBLOCK);
1992         res = tipc_wait_for_accept(sock, timeo);
1993         if (res)
1994                 goto exit;
1995
1996         buf = skb_peek(&sk->sk_receive_queue);
1997
1998         res = tipc_sk_create(sock_net(sock->sk), new_sock, 0, 1);
1999         if (res)
2000                 goto exit;
2001
2002         new_sk = new_sock->sk;
2003         new_tsock = tipc_sk(new_sk);
2004         msg = buf_msg(buf);
2005
2006         /* we lock on new_sk; but lockdep sees the lock on sk */
2007         lock_sock_nested(new_sk, SINGLE_DEPTH_NESTING);
2008
2009         /*
2010          * Reject any stray messages received by new socket
2011          * before the socket lock was taken (very, very unlikely)
2012          */
2013         tsk_rej_rx_queue(new_sk);
2014
2015         /* Connect new socket to it's peer */
2016         tipc_sk_finish_conn(new_tsock, msg_origport(msg), msg_orignode(msg));
2017         new_sock->state = SS_CONNECTED;
2018
2019         tsk_set_importance(new_tsock, msg_importance(msg));
2020         if (msg_named(msg)) {
2021                 new_tsock->conn_type = msg_nametype(msg);
2022                 new_tsock->conn_instance = msg_nameinst(msg);
2023         }
2024
2025         /*
2026          * Respond to 'SYN-' by discarding it & returning 'ACK'-.
2027          * Respond to 'SYN+' by queuing it on new socket.
2028          */
2029         if (!msg_data_sz(msg)) {
2030                 struct msghdr m = {NULL,};
2031
2032                 tsk_advance_rx_queue(sk);
2033                 tipc_send_packet(NULL, new_sock, &m, 0);
2034         } else {
2035                 __skb_dequeue(&sk->sk_receive_queue);
2036                 __skb_queue_head(&new_sk->sk_receive_queue, buf);
2037                 skb_set_owner_r(buf, new_sk);
2038         }
2039         release_sock(new_sk);
2040 exit:
2041         release_sock(sk);
2042         return res;
2043 }
2044
2045 /**
2046  * tipc_shutdown - shutdown socket connection
2047  * @sock: socket structure
2048  * @how: direction to close (must be SHUT_RDWR)
2049  *
2050  * Terminates connection (if necessary), then purges socket's receive queue.
2051  *
2052  * Returns 0 on success, errno otherwise
2053  */
2054 static int tipc_shutdown(struct socket *sock, int how)
2055 {
2056         struct sock *sk = sock->sk;
2057         struct net *net = sock_net(sk);
2058         struct tipc_net *tn = net_generic(net, tipc_net_id);
2059         struct tipc_sock *tsk = tipc_sk(sk);
2060         struct sk_buff *skb;
2061         u32 dnode;
2062         int res;
2063
2064         if (how != SHUT_RDWR)
2065                 return -EINVAL;
2066
2067         lock_sock(sk);
2068
2069         switch (sock->state) {
2070         case SS_CONNECTING:
2071         case SS_CONNECTED:
2072
2073 restart:
2074                 /* Disconnect and send a 'FIN+' or 'FIN-' message to peer */
2075                 skb = __skb_dequeue(&sk->sk_receive_queue);
2076                 if (skb) {
2077                         if (TIPC_SKB_CB(skb)->handle != NULL) {
2078                                 kfree_skb(skb);
2079                                 goto restart;
2080                         }
2081                         if (tipc_msg_reverse(net, skb, &dnode,
2082                                              TIPC_CONN_SHUTDOWN))
2083                                 tipc_link_xmit_skb(net, skb, dnode,
2084                                                    tsk->portid);
2085                         tipc_node_remove_conn(net, dnode, tsk->portid);
2086                 } else {
2087                         dnode = tsk_peer_node(tsk);
2088                         skb = tipc_msg_create(net, TIPC_CRITICAL_IMPORTANCE,
2089                                               TIPC_CONN_MSG, SHORT_H_SIZE,
2090                                               0, dnode, tn->own_addr,
2091                                               tsk_peer_port(tsk),
2092                                               tsk->portid, TIPC_CONN_SHUTDOWN);
2093                         tipc_link_xmit_skb(net, skb, dnode, tsk->portid);
2094                 }
2095                 tsk->connected = 0;
2096                 sock->state = SS_DISCONNECTING;
2097                 tipc_node_remove_conn(net, dnode, tsk->portid);
2098                 /* fall through */
2099
2100         case SS_DISCONNECTING:
2101
2102                 /* Discard any unreceived messages */
2103                 __skb_queue_purge(&sk->sk_receive_queue);
2104
2105                 /* Wake up anyone sleeping in poll */
2106                 sk->sk_state_change(sk);
2107                 res = 0;
2108                 break;
2109
2110         default:
2111                 res = -ENOTCONN;
2112         }
2113
2114         release_sock(sk);
2115         return res;
2116 }
2117
2118 static void tipc_sk_timeout(unsigned long data)
2119 {
2120         struct tipc_sock *tsk = (struct tipc_sock *)data;
2121         struct sock *sk = &tsk->sk;
2122         struct net *net = sock_net(sk);
2123         struct tipc_net *tn = net_generic(net, tipc_net_id);
2124         struct sk_buff *skb = NULL;
2125         u32 peer_port, peer_node;
2126
2127         bh_lock_sock(sk);
2128         if (!tsk->connected) {
2129                 bh_unlock_sock(sk);
2130                 goto exit;
2131         }
2132         peer_port = tsk_peer_port(tsk);
2133         peer_node = tsk_peer_node(tsk);
2134
2135         if (tsk->probing_state == TIPC_CONN_PROBING) {
2136                 /* Previous probe not answered -> self abort */
2137                 skb = tipc_msg_create(net, TIPC_CRITICAL_IMPORTANCE,
2138                                       TIPC_CONN_MSG, SHORT_H_SIZE, 0,
2139                                       tn->own_addr, peer_node, tsk->portid,
2140                                       peer_port, TIPC_ERR_NO_PORT);
2141         } else {
2142                 skb = tipc_msg_create(net, CONN_MANAGER, CONN_PROBE, INT_H_SIZE,
2143                                       0, peer_node, tn->own_addr,
2144                                       peer_port, tsk->portid, TIPC_OK);
2145                 tsk->probing_state = TIPC_CONN_PROBING;
2146                 sk_reset_timer(sk, &sk->sk_timer, jiffies + tsk->probing_intv);
2147         }
2148         bh_unlock_sock(sk);
2149         if (skb)
2150                 tipc_link_xmit_skb(sock_net(sk), skb, peer_node, tsk->portid);
2151 exit:
2152         sock_put(sk);
2153 }
2154
2155 static int tipc_sk_publish(struct tipc_sock *tsk, uint scope,
2156                            struct tipc_name_seq const *seq)
2157 {
2158         struct net *net = sock_net(&tsk->sk);
2159         struct publication *publ;
2160         u32 key;
2161
2162         if (tsk->connected)
2163                 return -EINVAL;
2164         key = tsk->portid + tsk->pub_count + 1;
2165         if (key == tsk->portid)
2166                 return -EADDRINUSE;
2167
2168         publ = tipc_nametbl_publish(net, seq->type, seq->lower, seq->upper,
2169                                     scope, tsk->portid, key);
2170         if (unlikely(!publ))
2171                 return -EINVAL;
2172
2173         list_add(&publ->pport_list, &tsk->publications);
2174         tsk->pub_count++;
2175         tsk->published = 1;
2176         return 0;
2177 }
2178
2179 static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope,
2180                             struct tipc_name_seq const *seq)
2181 {
2182         struct net *net = sock_net(&tsk->sk);
2183         struct publication *publ;
2184         struct publication *safe;
2185         int rc = -EINVAL;
2186
2187         list_for_each_entry_safe(publ, safe, &tsk->publications, pport_list) {
2188                 if (seq) {
2189                         if (publ->scope != scope)
2190                                 continue;
2191                         if (publ->type != seq->type)
2192                                 continue;
2193                         if (publ->lower != seq->lower)
2194                                 continue;
2195                         if (publ->upper != seq->upper)
2196                                 break;
2197                         tipc_nametbl_withdraw(net, publ->type, publ->lower,
2198                                               publ->ref, publ->key);
2199                         rc = 0;
2200                         break;
2201                 }
2202                 tipc_nametbl_withdraw(net, publ->type, publ->lower,
2203                                       publ->ref, publ->key);
2204                 rc = 0;
2205         }
2206         if (list_empty(&tsk->publications))
2207                 tsk->published = 0;
2208         return rc;
2209 }
2210
2211 static int tipc_sk_show(struct tipc_sock *tsk, char *buf,
2212                         int len, int full_id)
2213 {
2214         struct net *net = sock_net(&tsk->sk);
2215         struct tipc_net *tn = net_generic(net, tipc_net_id);
2216         struct publication *publ;
2217         int ret;
2218
2219         if (full_id)
2220                 ret = tipc_snprintf(buf, len, "<%u.%u.%u:%u>:",
2221                                     tipc_zone(tn->own_addr),
2222                                     tipc_cluster(tn->own_addr),
2223                                     tipc_node(tn->own_addr), tsk->portid);
2224         else
2225                 ret = tipc_snprintf(buf, len, "%-10u:", tsk->portid);
2226
2227         if (tsk->connected) {
2228                 u32 dport = tsk_peer_port(tsk);
2229                 u32 destnode = tsk_peer_node(tsk);
2230
2231                 ret += tipc_snprintf(buf + ret, len - ret,
2232                                      " connected to <%u.%u.%u:%u>",
2233                                      tipc_zone(destnode),
2234                                      tipc_cluster(destnode),
2235                                      tipc_node(destnode), dport);
2236                 if (tsk->conn_type != 0)
2237                         ret += tipc_snprintf(buf + ret, len - ret,
2238                                              " via {%u,%u}", tsk->conn_type,
2239                                              tsk->conn_instance);
2240         } else if (tsk->published) {
2241                 ret += tipc_snprintf(buf + ret, len - ret, " bound to");
2242                 list_for_each_entry(publ, &tsk->publications, pport_list) {
2243                         if (publ->lower == publ->upper)
2244                                 ret += tipc_snprintf(buf + ret, len - ret,
2245                                                      " {%u,%u}", publ->type,
2246                                                      publ->lower);
2247                         else
2248                                 ret += tipc_snprintf(buf + ret, len - ret,
2249                                                      " {%u,%u,%u}", publ->type,
2250                                                      publ->lower, publ->upper);
2251                 }
2252         }
2253         ret += tipc_snprintf(buf + ret, len - ret, "\n");
2254         return ret;
2255 }
2256
2257 struct sk_buff *tipc_sk_socks_show(struct net *net)
2258 {
2259         struct tipc_net *tn = net_generic(net, tipc_net_id);
2260         const struct bucket_table *tbl;
2261         struct rhash_head *pos;
2262         struct sk_buff *buf;
2263         struct tlv_desc *rep_tlv;
2264         char *pb;
2265         int pb_len;
2266         struct tipc_sock *tsk;
2267         int str_len = 0;
2268         int i;
2269
2270         buf = tipc_cfg_reply_alloc(TLV_SPACE(ULTRA_STRING_MAX_LEN));
2271         if (!buf)
2272                 return NULL;
2273         rep_tlv = (struct tlv_desc *)buf->data;
2274         pb = TLV_DATA(rep_tlv);
2275         pb_len = ULTRA_STRING_MAX_LEN;
2276
2277         rcu_read_lock();
2278         tbl = rht_dereference_rcu((&tn->sk_rht)->tbl, &tn->sk_rht);
2279         for (i = 0; i < tbl->size; i++) {
2280                 rht_for_each_entry_rcu(tsk, pos, tbl, i, node) {
2281                         spin_lock_bh(&tsk->sk.sk_lock.slock);
2282                         str_len += tipc_sk_show(tsk, pb + str_len,
2283                                                 pb_len - str_len, 0);
2284                         spin_unlock_bh(&tsk->sk.sk_lock.slock);
2285                 }
2286         }
2287         rcu_read_unlock();
2288
2289         str_len += 1;   /* for "\0" */
2290         skb_put(buf, TLV_SPACE(str_len));
2291         TLV_SET(rep_tlv, TIPC_TLV_ULTRA_STRING, NULL, str_len);
2292
2293         return buf;
2294 }
2295
2296 /* tipc_sk_reinit: set non-zero address in all existing sockets
2297  *                 when we go from standalone to network mode.
2298  */
2299 void tipc_sk_reinit(struct net *net)
2300 {
2301         struct tipc_net *tn = net_generic(net, tipc_net_id);
2302         const struct bucket_table *tbl;
2303         struct rhash_head *pos;
2304         struct tipc_sock *tsk;
2305         struct tipc_msg *msg;
2306         int i;
2307
2308         rcu_read_lock();
2309         tbl = rht_dereference_rcu((&tn->sk_rht)->tbl, &tn->sk_rht);
2310         for (i = 0; i < tbl->size; i++) {
2311                 rht_for_each_entry_rcu(tsk, pos, tbl, i, node) {
2312                         spin_lock_bh(&tsk->sk.sk_lock.slock);
2313                         msg = &tsk->phdr;
2314                         msg_set_prevnode(msg, tn->own_addr);
2315                         msg_set_orignode(msg, tn->own_addr);
2316                         spin_unlock_bh(&tsk->sk.sk_lock.slock);
2317                 }
2318         }
2319         rcu_read_unlock();
2320 }
2321
2322 static struct tipc_sock *tipc_sk_lookup(struct net *net, u32 portid)
2323 {
2324         struct tipc_net *tn = net_generic(net, tipc_net_id);
2325         struct tipc_sock *tsk;
2326
2327         rcu_read_lock();
2328         tsk = rhashtable_lookup(&tn->sk_rht, &portid);
2329         if (tsk)
2330                 sock_hold(&tsk->sk);
2331         rcu_read_unlock();
2332
2333         return tsk;
2334 }
2335
2336 static int tipc_sk_insert(struct tipc_sock *tsk)
2337 {
2338         struct sock *sk = &tsk->sk;
2339         struct net *net = sock_net(sk);
2340         struct tipc_net *tn = net_generic(net, tipc_net_id);
2341         u32 remaining = (TIPC_MAX_PORT - TIPC_MIN_PORT) + 1;
2342         u32 portid = prandom_u32() % remaining + TIPC_MIN_PORT;
2343
2344         while (remaining--) {
2345                 portid++;
2346                 if ((portid < TIPC_MIN_PORT) || (portid > TIPC_MAX_PORT))
2347                         portid = TIPC_MIN_PORT;
2348                 tsk->portid = portid;
2349                 sock_hold(&tsk->sk);
2350                 if (rhashtable_lookup_insert(&tn->sk_rht, &tsk->node))
2351                         return 0;
2352                 sock_put(&tsk->sk);
2353         }
2354
2355         return -1;
2356 }
2357
2358 static void tipc_sk_remove(struct tipc_sock *tsk)
2359 {
2360         struct sock *sk = &tsk->sk;
2361         struct tipc_net *tn = net_generic(sock_net(sk), tipc_net_id);
2362
2363         if (rhashtable_remove(&tn->sk_rht, &tsk->node)) {
2364                 WARN_ON(atomic_read(&sk->sk_refcnt) == 1);
2365                 __sock_put(sk);
2366         }
2367 }
2368
2369 int tipc_sk_rht_init(struct net *net)
2370 {
2371         struct tipc_net *tn = net_generic(net, tipc_net_id);
2372         struct rhashtable_params rht_params = {
2373                 .nelem_hint = 192,
2374                 .head_offset = offsetof(struct tipc_sock, node),
2375                 .key_offset = offsetof(struct tipc_sock, portid),
2376                 .key_len = sizeof(u32), /* portid */
2377                 .hashfn = jhash,
2378                 .max_shift = 20, /* 1M */
2379                 .min_shift = 8,  /* 256 */
2380                 .grow_decision = rht_grow_above_75,
2381                 .shrink_decision = rht_shrink_below_30,
2382         };
2383
2384         return rhashtable_init(&tn->sk_rht, &rht_params);
2385 }
2386
2387 void tipc_sk_rht_destroy(struct net *net)
2388 {
2389         struct tipc_net *tn = net_generic(net, tipc_net_id);
2390
2391         /* Wait for socket readers to complete */
2392         synchronize_net();
2393
2394         rhashtable_destroy(&tn->sk_rht);
2395 }
2396
2397 /**
2398  * tipc_setsockopt - set socket option
2399  * @sock: socket structure
2400  * @lvl: option level
2401  * @opt: option identifier
2402  * @ov: pointer to new option value
2403  * @ol: length of option value
2404  *
2405  * For stream sockets only, accepts and ignores all IPPROTO_TCP options
2406  * (to ease compatibility).
2407  *
2408  * Returns 0 on success, errno otherwise
2409  */
2410 static int tipc_setsockopt(struct socket *sock, int lvl, int opt,
2411                            char __user *ov, unsigned int ol)
2412 {
2413         struct sock *sk = sock->sk;
2414         struct tipc_sock *tsk = tipc_sk(sk);
2415         u32 value;
2416         int res;
2417
2418         if ((lvl == IPPROTO_TCP) && (sock->type == SOCK_STREAM))
2419                 return 0;
2420         if (lvl != SOL_TIPC)
2421                 return -ENOPROTOOPT;
2422         if (ol < sizeof(value))
2423                 return -EINVAL;
2424         res = get_user(value, (u32 __user *)ov);
2425         if (res)
2426                 return res;
2427
2428         lock_sock(sk);
2429
2430         switch (opt) {
2431         case TIPC_IMPORTANCE:
2432                 res = tsk_set_importance(tsk, value);
2433                 break;
2434         case TIPC_SRC_DROPPABLE:
2435                 if (sock->type != SOCK_STREAM)
2436                         tsk_set_unreliable(tsk, value);
2437                 else
2438                         res = -ENOPROTOOPT;
2439                 break;
2440         case TIPC_DEST_DROPPABLE:
2441                 tsk_set_unreturnable(tsk, value);
2442                 break;
2443         case TIPC_CONN_TIMEOUT:
2444                 tipc_sk(sk)->conn_timeout = value;
2445                 /* no need to set "res", since already 0 at this point */
2446                 break;
2447         default:
2448                 res = -EINVAL;
2449         }
2450
2451         release_sock(sk);
2452
2453         return res;
2454 }
2455
2456 /**
2457  * tipc_getsockopt - get socket option
2458  * @sock: socket structure
2459  * @lvl: option level
2460  * @opt: option identifier
2461  * @ov: receptacle for option value
2462  * @ol: receptacle for length of option value
2463  *
2464  * For stream sockets only, returns 0 length result for all IPPROTO_TCP options
2465  * (to ease compatibility).
2466  *
2467  * Returns 0 on success, errno otherwise
2468  */
2469 static int tipc_getsockopt(struct socket *sock, int lvl, int opt,
2470                            char __user *ov, int __user *ol)
2471 {
2472         struct sock *sk = sock->sk;
2473         struct tipc_sock *tsk = tipc_sk(sk);
2474         int len;
2475         u32 value;
2476         int res;
2477
2478         if ((lvl == IPPROTO_TCP) && (sock->type == SOCK_STREAM))
2479                 return put_user(0, ol);
2480         if (lvl != SOL_TIPC)
2481                 return -ENOPROTOOPT;
2482         res = get_user(len, ol);
2483         if (res)
2484                 return res;
2485
2486         lock_sock(sk);
2487
2488         switch (opt) {
2489         case TIPC_IMPORTANCE:
2490                 value = tsk_importance(tsk);
2491                 break;
2492         case TIPC_SRC_DROPPABLE:
2493                 value = tsk_unreliable(tsk);
2494                 break;
2495         case TIPC_DEST_DROPPABLE:
2496                 value = tsk_unreturnable(tsk);
2497                 break;
2498         case TIPC_CONN_TIMEOUT:
2499                 value = tsk->conn_timeout;
2500                 /* no need to set "res", since already 0 at this point */
2501                 break;
2502         case TIPC_NODE_RECVQ_DEPTH:
2503                 value = 0; /* was tipc_queue_size, now obsolete */
2504                 break;
2505         case TIPC_SOCK_RECVQ_DEPTH:
2506                 value = skb_queue_len(&sk->sk_receive_queue);
2507                 break;
2508         default:
2509                 res = -EINVAL;
2510         }
2511
2512         release_sock(sk);
2513
2514         if (res)
2515                 return res;     /* "get" failed */
2516
2517         if (len < sizeof(value))
2518                 return -EINVAL;
2519
2520         if (copy_to_user(ov, &value, sizeof(value)))
2521                 return -EFAULT;
2522
2523         return put_user(sizeof(value), ol);
2524 }
2525
2526 static int tipc_ioctl(struct socket *sock, unsigned int cmd, unsigned long arg)
2527 {
2528         struct sock *sk = sock->sk;
2529         struct tipc_sioc_ln_req lnr;
2530         void __user *argp = (void __user *)arg;
2531
2532         switch (cmd) {
2533         case SIOCGETLINKNAME:
2534                 if (copy_from_user(&lnr, argp, sizeof(lnr)))
2535                         return -EFAULT;
2536                 if (!tipc_node_get_linkname(sock_net(sk),
2537                                             lnr.bearer_id & 0xffff, lnr.peer,
2538                                             lnr.linkname, TIPC_MAX_LINK_NAME)) {
2539                         if (copy_to_user(argp, &lnr, sizeof(lnr)))
2540                                 return -EFAULT;
2541                         return 0;
2542                 }
2543                 return -EADDRNOTAVAIL;
2544         default:
2545                 return -ENOIOCTLCMD;
2546         }
2547 }
2548
2549 /* Protocol switches for the various types of TIPC sockets */
2550
2551 static const struct proto_ops msg_ops = {
2552         .owner          = THIS_MODULE,
2553         .family         = AF_TIPC,
2554         .release        = tipc_release,
2555         .bind           = tipc_bind,
2556         .connect        = tipc_connect,
2557         .socketpair     = sock_no_socketpair,
2558         .accept         = sock_no_accept,
2559         .getname        = tipc_getname,
2560         .poll           = tipc_poll,
2561         .ioctl          = tipc_ioctl,
2562         .listen         = sock_no_listen,
2563         .shutdown       = tipc_shutdown,
2564         .setsockopt     = tipc_setsockopt,
2565         .getsockopt     = tipc_getsockopt,
2566         .sendmsg        = tipc_sendmsg,
2567         .recvmsg        = tipc_recvmsg,
2568         .mmap           = sock_no_mmap,
2569         .sendpage       = sock_no_sendpage
2570 };
2571
2572 static const struct proto_ops packet_ops = {
2573         .owner          = THIS_MODULE,
2574         .family         = AF_TIPC,
2575         .release        = tipc_release,
2576         .bind           = tipc_bind,
2577         .connect        = tipc_connect,
2578         .socketpair     = sock_no_socketpair,
2579         .accept         = tipc_accept,
2580         .getname        = tipc_getname,
2581         .poll           = tipc_poll,
2582         .ioctl          = tipc_ioctl,
2583         .listen         = tipc_listen,
2584         .shutdown       = tipc_shutdown,
2585         .setsockopt     = tipc_setsockopt,
2586         .getsockopt     = tipc_getsockopt,
2587         .sendmsg        = tipc_send_packet,
2588         .recvmsg        = tipc_recvmsg,
2589         .mmap           = sock_no_mmap,
2590         .sendpage       = sock_no_sendpage
2591 };
2592
2593 static const struct proto_ops stream_ops = {
2594         .owner          = THIS_MODULE,
2595         .family         = AF_TIPC,
2596         .release        = tipc_release,
2597         .bind           = tipc_bind,
2598         .connect        = tipc_connect,
2599         .socketpair     = sock_no_socketpair,
2600         .accept         = tipc_accept,
2601         .getname        = tipc_getname,
2602         .poll           = tipc_poll,
2603         .ioctl          = tipc_ioctl,
2604         .listen         = tipc_listen,
2605         .shutdown       = tipc_shutdown,
2606         .setsockopt     = tipc_setsockopt,
2607         .getsockopt     = tipc_getsockopt,
2608         .sendmsg        = tipc_send_stream,
2609         .recvmsg        = tipc_recv_stream,
2610         .mmap           = sock_no_mmap,
2611         .sendpage       = sock_no_sendpage
2612 };
2613
2614 static const struct net_proto_family tipc_family_ops = {
2615         .owner          = THIS_MODULE,
2616         .family         = AF_TIPC,
2617         .create         = tipc_sk_create
2618 };
2619
2620 static struct proto tipc_proto = {
2621         .name           = "TIPC",
2622         .owner          = THIS_MODULE,
2623         .obj_size       = sizeof(struct tipc_sock),
2624         .sysctl_rmem    = sysctl_tipc_rmem
2625 };
2626
2627 static struct proto tipc_proto_kern = {
2628         .name           = "TIPC",
2629         .obj_size       = sizeof(struct tipc_sock),
2630         .sysctl_rmem    = sysctl_tipc_rmem
2631 };
2632
2633 /**
2634  * tipc_socket_init - initialize TIPC socket interface
2635  *
2636  * Returns 0 on success, errno otherwise
2637  */
2638 int tipc_socket_init(void)
2639 {
2640         int res;
2641
2642         res = proto_register(&tipc_proto, 1);
2643         if (res) {
2644                 pr_err("Failed to register TIPC protocol type\n");
2645                 goto out;
2646         }
2647
2648         res = sock_register(&tipc_family_ops);
2649         if (res) {
2650                 pr_err("Failed to register TIPC socket type\n");
2651                 proto_unregister(&tipc_proto);
2652                 goto out;
2653         }
2654  out:
2655         return res;
2656 }
2657
2658 /**
2659  * tipc_socket_stop - stop TIPC socket interface
2660  */
2661 void tipc_socket_stop(void)
2662 {
2663         sock_unregister(tipc_family_ops.family);
2664         proto_unregister(&tipc_proto);
2665 }
2666
2667 /* Caller should hold socket lock for the passed tipc socket. */
2668 static int __tipc_nl_add_sk_con(struct sk_buff *skb, struct tipc_sock *tsk)
2669 {
2670         u32 peer_node;
2671         u32 peer_port;
2672         struct nlattr *nest;
2673
2674         peer_node = tsk_peer_node(tsk);
2675         peer_port = tsk_peer_port(tsk);
2676
2677         nest = nla_nest_start(skb, TIPC_NLA_SOCK_CON);
2678
2679         if (nla_put_u32(skb, TIPC_NLA_CON_NODE, peer_node))
2680                 goto msg_full;
2681         if (nla_put_u32(skb, TIPC_NLA_CON_SOCK, peer_port))
2682                 goto msg_full;
2683
2684         if (tsk->conn_type != 0) {
2685                 if (nla_put_flag(skb, TIPC_NLA_CON_FLAG))
2686                         goto msg_full;
2687                 if (nla_put_u32(skb, TIPC_NLA_CON_TYPE, tsk->conn_type))
2688                         goto msg_full;
2689                 if (nla_put_u32(skb, TIPC_NLA_CON_INST, tsk->conn_instance))
2690                         goto msg_full;
2691         }
2692         nla_nest_end(skb, nest);
2693
2694         return 0;
2695
2696 msg_full:
2697         nla_nest_cancel(skb, nest);
2698
2699         return -EMSGSIZE;
2700 }
2701
2702 /* Caller should hold socket lock for the passed tipc socket. */
2703 static int __tipc_nl_add_sk(struct sk_buff *skb, struct netlink_callback *cb,
2704                             struct tipc_sock *tsk)
2705 {
2706         int err;
2707         void *hdr;
2708         struct nlattr *attrs;
2709         struct net *net = sock_net(skb->sk);
2710         struct tipc_net *tn = net_generic(net, tipc_net_id);
2711
2712         hdr = genlmsg_put(skb, NETLINK_CB(cb->skb).portid, cb->nlh->nlmsg_seq,
2713                           &tipc_genl_v2_family, NLM_F_MULTI, TIPC_NL_SOCK_GET);
2714         if (!hdr)
2715                 goto msg_cancel;
2716
2717         attrs = nla_nest_start(skb, TIPC_NLA_SOCK);
2718         if (!attrs)
2719                 goto genlmsg_cancel;
2720         if (nla_put_u32(skb, TIPC_NLA_SOCK_REF, tsk->portid))
2721                 goto attr_msg_cancel;
2722         if (nla_put_u32(skb, TIPC_NLA_SOCK_ADDR, tn->own_addr))
2723                 goto attr_msg_cancel;
2724
2725         if (tsk->connected) {
2726                 err = __tipc_nl_add_sk_con(skb, tsk);
2727                 if (err)
2728                         goto attr_msg_cancel;
2729         } else if (!list_empty(&tsk->publications)) {
2730                 if (nla_put_flag(skb, TIPC_NLA_SOCK_HAS_PUBL))
2731                         goto attr_msg_cancel;
2732         }
2733         nla_nest_end(skb, attrs);
2734         genlmsg_end(skb, hdr);
2735
2736         return 0;
2737
2738 attr_msg_cancel:
2739         nla_nest_cancel(skb, attrs);
2740 genlmsg_cancel:
2741         genlmsg_cancel(skb, hdr);
2742 msg_cancel:
2743         return -EMSGSIZE;
2744 }
2745
2746 int tipc_nl_sk_dump(struct sk_buff *skb, struct netlink_callback *cb)
2747 {
2748         int err;
2749         struct tipc_sock *tsk;
2750         const struct bucket_table *tbl;
2751         struct rhash_head *pos;
2752         struct net *net = sock_net(skb->sk);
2753         struct tipc_net *tn = net_generic(net, tipc_net_id);
2754         u32 tbl_id = cb->args[0];
2755         u32 prev_portid = cb->args[1];
2756
2757         rcu_read_lock();
2758         tbl = rht_dereference_rcu((&tn->sk_rht)->tbl, &tn->sk_rht);
2759         for (; tbl_id < tbl->size; tbl_id++) {
2760                 rht_for_each_entry_rcu(tsk, pos, tbl, tbl_id, node) {
2761                         spin_lock_bh(&tsk->sk.sk_lock.slock);
2762                         if (prev_portid && prev_portid != tsk->portid) {
2763                                 spin_unlock_bh(&tsk->sk.sk_lock.slock);
2764                                 continue;
2765                         }
2766
2767                         err = __tipc_nl_add_sk(skb, cb, tsk);
2768                         if (err) {
2769                                 prev_portid = tsk->portid;
2770                                 spin_unlock_bh(&tsk->sk.sk_lock.slock);
2771                                 goto out;
2772                         }
2773                         prev_portid = 0;
2774                         spin_unlock_bh(&tsk->sk.sk_lock.slock);
2775                 }
2776         }
2777 out:
2778         rcu_read_unlock();
2779         cb->args[0] = tbl_id;
2780         cb->args[1] = prev_portid;
2781
2782         return skb->len;
2783 }
2784
2785 /* Caller should hold socket lock for the passed tipc socket. */
2786 static int __tipc_nl_add_sk_publ(struct sk_buff *skb,
2787                                  struct netlink_callback *cb,
2788                                  struct publication *publ)
2789 {
2790         void *hdr;
2791         struct nlattr *attrs;
2792
2793         hdr = genlmsg_put(skb, NETLINK_CB(cb->skb).portid, cb->nlh->nlmsg_seq,
2794                           &tipc_genl_v2_family, NLM_F_MULTI, TIPC_NL_PUBL_GET);
2795         if (!hdr)
2796                 goto msg_cancel;
2797
2798         attrs = nla_nest_start(skb, TIPC_NLA_PUBL);
2799         if (!attrs)
2800                 goto genlmsg_cancel;
2801
2802         if (nla_put_u32(skb, TIPC_NLA_PUBL_KEY, publ->key))
2803                 goto attr_msg_cancel;
2804         if (nla_put_u32(skb, TIPC_NLA_PUBL_TYPE, publ->type))
2805                 goto attr_msg_cancel;
2806         if (nla_put_u32(skb, TIPC_NLA_PUBL_LOWER, publ->lower))
2807                 goto attr_msg_cancel;
2808         if (nla_put_u32(skb, TIPC_NLA_PUBL_UPPER, publ->upper))
2809                 goto attr_msg_cancel;
2810
2811         nla_nest_end(skb, attrs);
2812         genlmsg_end(skb, hdr);
2813
2814         return 0;
2815
2816 attr_msg_cancel:
2817         nla_nest_cancel(skb, attrs);
2818 genlmsg_cancel:
2819         genlmsg_cancel(skb, hdr);
2820 msg_cancel:
2821         return -EMSGSIZE;
2822 }
2823
2824 /* Caller should hold socket lock for the passed tipc socket. */
2825 static int __tipc_nl_list_sk_publ(struct sk_buff *skb,
2826                                   struct netlink_callback *cb,
2827                                   struct tipc_sock *tsk, u32 *last_publ)
2828 {
2829         int err;
2830         struct publication *p;
2831
2832         if (*last_publ) {
2833                 list_for_each_entry(p, &tsk->publications, pport_list) {
2834                         if (p->key == *last_publ)
2835                                 break;
2836                 }
2837                 if (p->key != *last_publ) {
2838                         /* We never set seq or call nl_dump_check_consistent()
2839                          * this means that setting prev_seq here will cause the
2840                          * consistence check to fail in the netlink callback
2841                          * handler. Resulting in the last NLMSG_DONE message
2842                          * having the NLM_F_DUMP_INTR flag set.
2843                          */
2844                         cb->prev_seq = 1;
2845                         *last_publ = 0;
2846                         return -EPIPE;
2847                 }
2848         } else {
2849                 p = list_first_entry(&tsk->publications, struct publication,
2850                                      pport_list);
2851         }
2852
2853         list_for_each_entry_from(p, &tsk->publications, pport_list) {
2854                 err = __tipc_nl_add_sk_publ(skb, cb, p);
2855                 if (err) {
2856                         *last_publ = p->key;
2857                         return err;
2858                 }
2859         }
2860         *last_publ = 0;
2861
2862         return 0;
2863 }
2864
2865 int tipc_nl_publ_dump(struct sk_buff *skb, struct netlink_callback *cb)
2866 {
2867         int err;
2868         u32 tsk_portid = cb->args[0];
2869         u32 last_publ = cb->args[1];
2870         u32 done = cb->args[2];
2871         struct net *net = sock_net(skb->sk);
2872         struct tipc_sock *tsk;
2873
2874         if (!tsk_portid) {
2875                 struct nlattr **attrs;
2876                 struct nlattr *sock[TIPC_NLA_SOCK_MAX + 1];
2877
2878                 err = tipc_nlmsg_parse(cb->nlh, &attrs);
2879                 if (err)
2880                         return err;
2881
2882                 err = nla_parse_nested(sock, TIPC_NLA_SOCK_MAX,
2883                                        attrs[TIPC_NLA_SOCK],
2884                                        tipc_nl_sock_policy);
2885                 if (err)
2886                         return err;
2887
2888                 if (!sock[TIPC_NLA_SOCK_REF])
2889                         return -EINVAL;
2890
2891                 tsk_portid = nla_get_u32(sock[TIPC_NLA_SOCK_REF]);
2892         }
2893
2894         if (done)
2895                 return 0;
2896
2897         tsk = tipc_sk_lookup(net, tsk_portid);
2898         if (!tsk)
2899                 return -EINVAL;
2900
2901         lock_sock(&tsk->sk);
2902         err = __tipc_nl_list_sk_publ(skb, cb, tsk, &last_publ);
2903         if (!err)
2904                 done = 1;
2905         release_sock(&tsk->sk);
2906         sock_put(&tsk->sk);
2907
2908         cb->args[0] = tsk_portid;
2909         cb->args[1] = last_publ;
2910         cb->args[2] = done;
2911
2912         return skb->len;
2913 }