tipc: move bcast definitions to bcast.c
[cascardo/linux.git] / net / tipc / bcast.c
1 /*
2  * net/tipc/bcast.c: TIPC broadcast code
3  *
4  * Copyright (c) 2004-2006, 2014-2015, Ericsson AB
5  * Copyright (c) 2004, Intel Corporation.
6  * Copyright (c) 2005, 2010-2011, Wind River Systems
7  * All rights reserved.
8  *
9  * Redistribution and use in source and binary forms, with or without
10  * modification, are permitted provided that the following conditions are met:
11  *
12  * 1. Redistributions of source code must retain the above copyright
13  *    notice, this list of conditions and the following disclaimer.
14  * 2. Redistributions in binary form must reproduce the above copyright
15  *    notice, this list of conditions and the following disclaimer in the
16  *    documentation and/or other materials provided with the distribution.
17  * 3. Neither the names of the copyright holders nor the names of its
18  *    contributors may be used to endorse or promote products derived from
19  *    this software without specific prior written permission.
20  *
21  * Alternatively, this software may be distributed under the terms of the
22  * GNU General Public License ("GPL") version 2 as published by the Free
23  * Software Foundation.
24  *
25  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
26  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
27  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
28  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
29  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
30  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
31  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
32  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
33  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
34  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
35  * POSSIBILITY OF SUCH DAMAGE.
36  */
37
38 #include <linux/tipc_config.h>
39 #include "socket.h"
40 #include "msg.h"
41 #include "bcast.h"
42 #include "name_distr.h"
43 #include "link.h"
44 #include "node.h"
45
46 #define MAX_PKT_DEFAULT_MCAST   1500    /* bcast link max packet size (fixed) */
47 #define BCLINK_WIN_DEFAULT      50      /* bcast link window size (default) */
48 #define BCLINK_WIN_MIN          32      /* bcast minimum link window size */
49
50 const char tipc_bclink_name[] = "broadcast-link";
51
52 /**
53  * struct tipc_bcbearer_pair - a pair of bearers used by broadcast link
54  * @primary: pointer to primary bearer
55  * @secondary: pointer to secondary bearer
56  *
57  * Bearers must have same priority and same set of reachable destinations
58  * to be paired.
59  */
60
61 struct tipc_bcbearer_pair {
62         struct tipc_bearer *primary;
63         struct tipc_bearer *secondary;
64 };
65
66 #define BCBEARER                MAX_BEARERS
67
68 /**
69  * struct tipc_bcbearer - bearer used by broadcast link
70  * @bearer: (non-standard) broadcast bearer structure
71  * @media: (non-standard) broadcast media structure
72  * @bpairs: array of bearer pairs
73  * @bpairs_temp: temporary array of bearer pairs used by tipc_bcbearer_sort()
74  * @remains: temporary node map used by tipc_bcbearer_send()
75  * @remains_new: temporary node map used tipc_bcbearer_send()
76  *
77  * Note: The fields labelled "temporary" are incorporated into the bearer
78  * to avoid consuming potentially limited stack space through the use of
79  * large local variables within multicast routines.  Concurrent access is
80  * prevented through use of the spinlock "bcast_lock".
81  */
82 struct tipc_bcbearer {
83         struct tipc_bearer bearer;
84         struct tipc_media media;
85         struct tipc_bcbearer_pair bpairs[MAX_BEARERS];
86         struct tipc_bcbearer_pair bpairs_temp[TIPC_MAX_LINK_PRI + 1];
87         struct tipc_node_map remains;
88         struct tipc_node_map remains_new;
89 };
90
91 /**
92  * struct tipc_bc_base - link used for broadcast messages
93  * @lock: spinlock governing access to structure
94  * @link: (non-standard) broadcast link structure
95  * @node: (non-standard) node structure representing b'cast link's peer node
96  * @bcast_nodes: map of broadcast-capable nodes
97  * @retransmit_to: node that most recently requested a retransmit
98  *
99  * Handles sequence numbering, fragmentation, bundling, etc.
100  */
101 struct tipc_bc_base {
102         spinlock_t lock; /* spinlock protecting broadcast structs */
103         struct tipc_link link;
104         struct tipc_node node;
105         struct sk_buff_head arrvq;
106         struct sk_buff_head inputq;
107         struct tipc_node_map bcast_nodes;
108         struct tipc_node *retransmit_to;
109 };
110
111 /**
112  * tipc_nmap_equal - test for equality of node maps
113  */
114 static int tipc_nmap_equal(struct tipc_node_map *nm_a,
115                            struct tipc_node_map *nm_b)
116 {
117         return !memcmp(nm_a, nm_b, sizeof(*nm_a));
118 }
119
120 static void tipc_nmap_diff(struct tipc_node_map *nm_a,
121                            struct tipc_node_map *nm_b,
122                            struct tipc_node_map *nm_diff);
123 static void tipc_nmap_add(struct tipc_node_map *nm_ptr, u32 node);
124 static void tipc_nmap_remove(struct tipc_node_map *nm_ptr, u32 node);
125 static void tipc_bclink_lock(struct net *net)
126 {
127         struct tipc_net *tn = net_generic(net, tipc_net_id);
128
129         spin_lock_bh(&tn->bcbase->lock);
130 }
131
132 static void tipc_bclink_unlock(struct net *net)
133 {
134         struct tipc_net *tn = net_generic(net, tipc_net_id);
135
136         spin_unlock_bh(&tn->bcbase->lock);
137 }
138
139 void tipc_bclink_input(struct net *net)
140 {
141         struct tipc_net *tn = net_generic(net, tipc_net_id);
142
143         tipc_sk_mcast_rcv(net, &tn->bcbase->arrvq, &tn->bcbase->inputq);
144 }
145
146 uint  tipc_bcast_get_mtu(void)
147 {
148         return MAX_PKT_DEFAULT_MCAST;
149 }
150
151 static u32 bcbuf_acks(struct sk_buff *buf)
152 {
153         return (u32)(unsigned long)TIPC_SKB_CB(buf)->handle;
154 }
155
156 static void bcbuf_set_acks(struct sk_buff *buf, u32 acks)
157 {
158         TIPC_SKB_CB(buf)->handle = (void *)(unsigned long)acks;
159 }
160
161 static void bcbuf_decr_acks(struct sk_buff *buf)
162 {
163         bcbuf_set_acks(buf, bcbuf_acks(buf) - 1);
164 }
165
166 void tipc_bclink_add_node(struct net *net, u32 addr)
167 {
168         struct tipc_net *tn = net_generic(net, tipc_net_id);
169
170         tipc_bclink_lock(net);
171         tipc_nmap_add(&tn->bcbase->bcast_nodes, addr);
172         tipc_bclink_unlock(net);
173 }
174
175 void tipc_bclink_remove_node(struct net *net, u32 addr)
176 {
177         struct tipc_net *tn = net_generic(net, tipc_net_id);
178
179         tipc_bclink_lock(net);
180         tipc_nmap_remove(&tn->bcbase->bcast_nodes, addr);
181
182         /* Last node? => reset backlog queue */
183         if (!tn->bcbase->bcast_nodes.count)
184                 tipc_link_purge_backlog(&tn->bcbase->link);
185
186         tipc_bclink_unlock(net);
187 }
188
189 static void bclink_set_last_sent(struct net *net)
190 {
191         struct tipc_net *tn = net_generic(net, tipc_net_id);
192         struct tipc_link *bcl = tn->bcl;
193
194         bcl->silent_intv_cnt = mod(bcl->snd_nxt - 1);
195 }
196
197 u32 tipc_bclink_get_last_sent(struct net *net)
198 {
199         struct tipc_net *tn = net_generic(net, tipc_net_id);
200
201         return tn->bcl->silent_intv_cnt;
202 }
203
204 static void bclink_update_last_sent(struct tipc_node *node, u32 seqno)
205 {
206         node->bclink.last_sent = less_eq(node->bclink.last_sent, seqno) ?
207                                                 seqno : node->bclink.last_sent;
208 }
209
210 /**
211  * tipc_bclink_retransmit_to - get most recent node to request retransmission
212  *
213  * Called with bclink_lock locked
214  */
215 struct tipc_node *tipc_bclink_retransmit_to(struct net *net)
216 {
217         struct tipc_net *tn = net_generic(net, tipc_net_id);
218
219         return tn->bcbase->retransmit_to;
220 }
221
222 /**
223  * bclink_retransmit_pkt - retransmit broadcast packets
224  * @after: sequence number of last packet to *not* retransmit
225  * @to: sequence number of last packet to retransmit
226  *
227  * Called with bclink_lock locked
228  */
229 static void bclink_retransmit_pkt(struct tipc_net *tn, u32 after, u32 to)
230 {
231         struct sk_buff *skb;
232         struct tipc_link *bcl = tn->bcl;
233
234         skb_queue_walk(&bcl->transmq, skb) {
235                 if (more(buf_seqno(skb), after)) {
236                         tipc_link_retransmit(bcl, skb, mod(to - after));
237                         break;
238                 }
239         }
240 }
241
242 /**
243  * bclink_prepare_wakeup - prepare users for wakeup after congestion
244  * @bcl: broadcast link
245  * @resultq: queue for users which can be woken up
246  * Move a number of waiting users, as permitted by available space in
247  * the send queue, from link wait queue to specified queue for wakeup
248  */
249 static void bclink_prepare_wakeup(struct tipc_link *bcl, struct sk_buff_head *resultq)
250 {
251         int pnd[TIPC_SYSTEM_IMPORTANCE + 1] = {0,};
252         int imp, lim;
253         struct sk_buff *skb, *tmp;
254
255         skb_queue_walk_safe(&bcl->wakeupq, skb, tmp) {
256                 imp = TIPC_SKB_CB(skb)->chain_imp;
257                 lim = bcl->window + bcl->backlog[imp].limit;
258                 pnd[imp] += TIPC_SKB_CB(skb)->chain_sz;
259                 if ((pnd[imp] + bcl->backlog[imp].len) >= lim)
260                         continue;
261                 skb_unlink(skb, &bcl->wakeupq);
262                 skb_queue_tail(resultq, skb);
263         }
264 }
265
266 /**
267  * tipc_bclink_wakeup_users - wake up pending users
268  *
269  * Called with no locks taken
270  */
271 void tipc_bclink_wakeup_users(struct net *net)
272 {
273         struct tipc_net *tn = net_generic(net, tipc_net_id);
274         struct tipc_link *bcl = tn->bcl;
275         struct sk_buff_head resultq;
276
277         skb_queue_head_init(&resultq);
278         bclink_prepare_wakeup(bcl, &resultq);
279         tipc_sk_rcv(net, &resultq);
280 }
281
282 /**
283  * tipc_bclink_acknowledge - handle acknowledgement of broadcast packets
284  * @n_ptr: node that sent acknowledgement info
285  * @acked: broadcast sequence # that has been acknowledged
286  *
287  * Node is locked, bclink_lock unlocked.
288  */
289 void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked)
290 {
291         struct sk_buff *skb, *tmp;
292         unsigned int released = 0;
293         struct net *net = n_ptr->net;
294         struct tipc_net *tn = net_generic(net, tipc_net_id);
295
296         if (unlikely(!n_ptr->bclink.recv_permitted))
297                 return;
298
299         tipc_bclink_lock(net);
300
301         /* Bail out if tx queue is empty (no clean up is required) */
302         skb = skb_peek(&tn->bcl->transmq);
303         if (!skb)
304                 goto exit;
305
306         /* Determine which messages need to be acknowledged */
307         if (acked == INVALID_LINK_SEQ) {
308                 /*
309                  * Contact with specified node has been lost, so need to
310                  * acknowledge sent messages only (if other nodes still exist)
311                  * or both sent and unsent messages (otherwise)
312                  */
313                 if (tn->bcbase->bcast_nodes.count)
314                         acked = tn->bcl->silent_intv_cnt;
315                 else
316                         acked = tn->bcl->snd_nxt;
317         } else {
318                 /*
319                  * Bail out if specified sequence number does not correspond
320                  * to a message that has been sent and not yet acknowledged
321                  */
322                 if (less(acked, buf_seqno(skb)) ||
323                     less(tn->bcl->silent_intv_cnt, acked) ||
324                     less_eq(acked, n_ptr->bclink.acked))
325                         goto exit;
326         }
327
328         /* Skip over packets that node has previously acknowledged */
329         skb_queue_walk(&tn->bcl->transmq, skb) {
330                 if (more(buf_seqno(skb), n_ptr->bclink.acked))
331                         break;
332         }
333
334         /* Update packets that node is now acknowledging */
335         skb_queue_walk_from_safe(&tn->bcl->transmq, skb, tmp) {
336                 if (more(buf_seqno(skb), acked))
337                         break;
338                 bcbuf_decr_acks(skb);
339                 bclink_set_last_sent(net);
340                 if (bcbuf_acks(skb) == 0) {
341                         __skb_unlink(skb, &tn->bcl->transmq);
342                         kfree_skb(skb);
343                         released = 1;
344                 }
345         }
346         n_ptr->bclink.acked = acked;
347
348         /* Try resolving broadcast link congestion, if necessary */
349         if (unlikely(skb_peek(&tn->bcl->backlogq))) {
350                 tipc_link_push_packets(tn->bcl);
351                 bclink_set_last_sent(net);
352         }
353         if (unlikely(released && !skb_queue_empty(&tn->bcl->wakeupq)))
354                 n_ptr->action_flags |= TIPC_WAKEUP_BCAST_USERS;
355 exit:
356         tipc_bclink_unlock(net);
357 }
358
359 /**
360  * tipc_bclink_update_link_state - update broadcast link state
361  *
362  * RCU and node lock set
363  */
364 void tipc_bclink_update_link_state(struct tipc_node *n_ptr,
365                                    u32 last_sent)
366 {
367         struct sk_buff *buf;
368         struct net *net = n_ptr->net;
369         struct tipc_net *tn = net_generic(net, tipc_net_id);
370
371         /* Ignore "stale" link state info */
372         if (less_eq(last_sent, n_ptr->bclink.last_in))
373                 return;
374
375         /* Update link synchronization state; quit if in sync */
376         bclink_update_last_sent(n_ptr, last_sent);
377
378         if (n_ptr->bclink.last_sent == n_ptr->bclink.last_in)
379                 return;
380
381         /* Update out-of-sync state; quit if loss is still unconfirmed */
382         if ((++n_ptr->bclink.oos_state) == 1) {
383                 if (n_ptr->bclink.deferred_size < (TIPC_MIN_LINK_WIN / 2))
384                         return;
385                 n_ptr->bclink.oos_state++;
386         }
387
388         /* Don't NACK if one has been recently sent (or seen) */
389         if (n_ptr->bclink.oos_state & 0x1)
390                 return;
391
392         /* Send NACK */
393         buf = tipc_buf_acquire(INT_H_SIZE);
394         if (buf) {
395                 struct tipc_msg *msg = buf_msg(buf);
396                 struct sk_buff *skb = skb_peek(&n_ptr->bclink.deferdq);
397                 u32 to = skb ? buf_seqno(skb) - 1 : n_ptr->bclink.last_sent;
398
399                 tipc_msg_init(tn->own_addr, msg, BCAST_PROTOCOL, STATE_MSG,
400                               INT_H_SIZE, n_ptr->addr);
401                 msg_set_non_seq(msg, 1);
402                 msg_set_mc_netid(msg, tn->net_id);
403                 msg_set_bcast_ack(msg, n_ptr->bclink.last_in);
404                 msg_set_bcgap_after(msg, n_ptr->bclink.last_in);
405                 msg_set_bcgap_to(msg, to);
406
407                 tipc_bclink_lock(net);
408                 tipc_bearer_send(net, MAX_BEARERS, buf, NULL);
409                 tn->bcl->stats.sent_nacks++;
410                 tipc_bclink_unlock(net);
411                 kfree_skb(buf);
412
413                 n_ptr->bclink.oos_state++;
414         }
415 }
416
417 void tipc_bclink_sync_state(struct tipc_node *n, struct tipc_msg *hdr)
418 {
419         u16 last = msg_last_bcast(hdr);
420         int mtyp = msg_type(hdr);
421
422         if (unlikely(msg_user(hdr) != LINK_PROTOCOL))
423                 return;
424         if (mtyp == STATE_MSG) {
425                 tipc_bclink_update_link_state(n, last);
426                 return;
427         }
428         /* Compatibility: older nodes don't know BCAST_PROTOCOL synchronization,
429          * and transfer synch info in LINK_PROTOCOL messages.
430          */
431         if (tipc_node_is_up(n))
432                 return;
433         if ((mtyp != RESET_MSG) && (mtyp != ACTIVATE_MSG))
434                 return;
435         n->bclink.last_sent = last;
436         n->bclink.last_in = last;
437         n->bclink.oos_state = 0;
438 }
439
440 /**
441  * bclink_peek_nack - monitor retransmission requests sent by other nodes
442  *
443  * Delay any upcoming NACK by this node if another node has already
444  * requested the first message this node is going to ask for.
445  */
446 static void bclink_peek_nack(struct net *net, struct tipc_msg *msg)
447 {
448         struct tipc_node *n_ptr = tipc_node_find(net, msg_destnode(msg));
449
450         if (unlikely(!n_ptr))
451                 return;
452
453         tipc_node_lock(n_ptr);
454         if (n_ptr->bclink.recv_permitted &&
455             (n_ptr->bclink.last_in != n_ptr->bclink.last_sent) &&
456             (n_ptr->bclink.last_in == msg_bcgap_after(msg)))
457                 n_ptr->bclink.oos_state = 2;
458         tipc_node_unlock(n_ptr);
459         tipc_node_put(n_ptr);
460 }
461
462 /* tipc_bcast_xmit - deliver buffer chain to all nodes in cluster
463  *                    and to identified node local sockets
464  * @net: the applicable net namespace
465  * @list: chain of buffers containing message
466  * Consumes the buffer chain, except when returning -ELINKCONG
467  * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE
468  */
469 int tipc_bcast_xmit(struct net *net, struct sk_buff_head *list)
470 {
471         struct tipc_net *tn = net_generic(net, tipc_net_id);
472         struct tipc_link *bcl = tn->bcl;
473         struct tipc_bc_base *bclink = tn->bcbase;
474         int rc = 0;
475         int bc = 0;
476         struct sk_buff *skb;
477         struct sk_buff_head arrvq;
478         struct sk_buff_head inputq;
479
480         /* Prepare clone of message for local node */
481         skb = tipc_msg_reassemble(list);
482         if (unlikely(!skb))
483                 return -EHOSTUNREACH;
484
485         /* Broadcast to all nodes */
486         if (likely(bclink)) {
487                 tipc_bclink_lock(net);
488                 if (likely(bclink->bcast_nodes.count)) {
489                         rc = __tipc_link_xmit(net, bcl, list);
490                         if (likely(!rc)) {
491                                 u32 len = skb_queue_len(&bcl->transmq);
492
493                                 bclink_set_last_sent(net);
494                                 bcl->stats.queue_sz_counts++;
495                                 bcl->stats.accu_queue_sz += len;
496                         }
497                         bc = 1;
498                 }
499                 tipc_bclink_unlock(net);
500         }
501
502         if (unlikely(!bc))
503                 __skb_queue_purge(list);
504
505         if (unlikely(rc)) {
506                 kfree_skb(skb);
507                 return rc;
508         }
509         /* Deliver message clone */
510         __skb_queue_head_init(&arrvq);
511         skb_queue_head_init(&inputq);
512         __skb_queue_tail(&arrvq, skb);
513         tipc_sk_mcast_rcv(net, &arrvq, &inputq);
514         return rc;
515 }
516
517 /**
518  * bclink_accept_pkt - accept an incoming, in-sequence broadcast packet
519  *
520  * Called with both sending node's lock and bclink_lock taken.
521  */
522 static void bclink_accept_pkt(struct tipc_node *node, u32 seqno)
523 {
524         struct tipc_net *tn = net_generic(node->net, tipc_net_id);
525
526         bclink_update_last_sent(node, seqno);
527         node->bclink.last_in = seqno;
528         node->bclink.oos_state = 0;
529         tn->bcl->stats.recv_info++;
530
531         /*
532          * Unicast an ACK periodically, ensuring that
533          * all nodes in the cluster don't ACK at the same time
534          */
535         if (((seqno - tn->own_addr) % TIPC_MIN_LINK_WIN) == 0) {
536                 tipc_link_proto_xmit(node_active_link(node, node->addr),
537                                      STATE_MSG, 0, 0, 0, 0);
538                 tn->bcl->stats.sent_acks++;
539         }
540 }
541
542 /**
543  * tipc_bclink_rcv - receive a broadcast packet, and deliver upwards
544  *
545  * RCU is locked, no other locks set
546  */
547 void tipc_bclink_rcv(struct net *net, struct sk_buff *buf)
548 {
549         struct tipc_net *tn = net_generic(net, tipc_net_id);
550         struct tipc_link *bcl = tn->bcl;
551         struct tipc_msg *msg = buf_msg(buf);
552         struct tipc_node *node;
553         u32 next_in;
554         u32 seqno;
555         int deferred = 0;
556         int pos = 0;
557         struct sk_buff *iskb;
558         struct sk_buff_head *arrvq, *inputq;
559
560         /* Screen out unwanted broadcast messages */
561         if (msg_mc_netid(msg) != tn->net_id)
562                 goto exit;
563
564         node = tipc_node_find(net, msg_prevnode(msg));
565         if (unlikely(!node))
566                 goto exit;
567
568         tipc_node_lock(node);
569         if (unlikely(!node->bclink.recv_permitted))
570                 goto unlock;
571
572         /* Handle broadcast protocol message */
573         if (unlikely(msg_user(msg) == BCAST_PROTOCOL)) {
574                 if (msg_type(msg) != STATE_MSG)
575                         goto unlock;
576                 if (msg_destnode(msg) == tn->own_addr) {
577                         tipc_bclink_acknowledge(node, msg_bcast_ack(msg));
578                         tipc_bclink_lock(net);
579                         bcl->stats.recv_nacks++;
580                         tn->bcbase->retransmit_to = node;
581                         bclink_retransmit_pkt(tn, msg_bcgap_after(msg),
582                                               msg_bcgap_to(msg));
583                         tipc_bclink_unlock(net);
584                         tipc_node_unlock(node);
585                 } else {
586                         tipc_node_unlock(node);
587                         bclink_peek_nack(net, msg);
588                 }
589                 tipc_node_put(node);
590                 goto exit;
591         }
592
593         /* Handle in-sequence broadcast message */
594         seqno = msg_seqno(msg);
595         next_in = mod(node->bclink.last_in + 1);
596         arrvq = &tn->bcbase->arrvq;
597         inputq = &tn->bcbase->inputq;
598
599         if (likely(seqno == next_in)) {
600 receive:
601                 /* Deliver message to destination */
602                 if (likely(msg_isdata(msg))) {
603                         tipc_bclink_lock(net);
604                         bclink_accept_pkt(node, seqno);
605                         spin_lock_bh(&inputq->lock);
606                         __skb_queue_tail(arrvq, buf);
607                         spin_unlock_bh(&inputq->lock);
608                         node->action_flags |= TIPC_BCAST_MSG_EVT;
609                         tipc_bclink_unlock(net);
610                         tipc_node_unlock(node);
611                 } else if (msg_user(msg) == MSG_BUNDLER) {
612                         tipc_bclink_lock(net);
613                         bclink_accept_pkt(node, seqno);
614                         bcl->stats.recv_bundles++;
615                         bcl->stats.recv_bundled += msg_msgcnt(msg);
616                         pos = 0;
617                         while (tipc_msg_extract(buf, &iskb, &pos)) {
618                                 spin_lock_bh(&inputq->lock);
619                                 __skb_queue_tail(arrvq, iskb);
620                                 spin_unlock_bh(&inputq->lock);
621                         }
622                         node->action_flags |= TIPC_BCAST_MSG_EVT;
623                         tipc_bclink_unlock(net);
624                         tipc_node_unlock(node);
625                 } else if (msg_user(msg) == MSG_FRAGMENTER) {
626                         tipc_bclink_lock(net);
627                         bclink_accept_pkt(node, seqno);
628                         tipc_buf_append(&node->bclink.reasm_buf, &buf);
629                         if (unlikely(!buf && !node->bclink.reasm_buf)) {
630                                 tipc_bclink_unlock(net);
631                                 goto unlock;
632                         }
633                         bcl->stats.recv_fragments++;
634                         if (buf) {
635                                 bcl->stats.recv_fragmented++;
636                                 msg = buf_msg(buf);
637                                 tipc_bclink_unlock(net);
638                                 goto receive;
639                         }
640                         tipc_bclink_unlock(net);
641                         tipc_node_unlock(node);
642                 } else {
643                         tipc_bclink_lock(net);
644                         bclink_accept_pkt(node, seqno);
645                         tipc_bclink_unlock(net);
646                         tipc_node_unlock(node);
647                         kfree_skb(buf);
648                 }
649                 buf = NULL;
650
651                 /* Determine new synchronization state */
652                 tipc_node_lock(node);
653                 if (unlikely(!tipc_node_is_up(node)))
654                         goto unlock;
655
656                 if (node->bclink.last_in == node->bclink.last_sent)
657                         goto unlock;
658
659                 if (skb_queue_empty(&node->bclink.deferdq)) {
660                         node->bclink.oos_state = 1;
661                         goto unlock;
662                 }
663
664                 msg = buf_msg(skb_peek(&node->bclink.deferdq));
665                 seqno = msg_seqno(msg);
666                 next_in = mod(next_in + 1);
667                 if (seqno != next_in)
668                         goto unlock;
669
670                 /* Take in-sequence message from deferred queue & deliver it */
671                 buf = __skb_dequeue(&node->bclink.deferdq);
672                 goto receive;
673         }
674
675         /* Handle out-of-sequence broadcast message */
676         if (less(next_in, seqno)) {
677                 deferred = tipc_link_defer_pkt(&node->bclink.deferdq,
678                                                buf);
679                 bclink_update_last_sent(node, seqno);
680                 buf = NULL;
681         }
682
683         tipc_bclink_lock(net);
684
685         if (deferred)
686                 bcl->stats.deferred_recv++;
687         else
688                 bcl->stats.duplicates++;
689
690         tipc_bclink_unlock(net);
691
692 unlock:
693         tipc_node_unlock(node);
694         tipc_node_put(node);
695 exit:
696         kfree_skb(buf);
697 }
698
699 u32 tipc_bclink_acks_missing(struct tipc_node *n_ptr)
700 {
701         return (n_ptr->bclink.recv_permitted &&
702                 (tipc_bclink_get_last_sent(n_ptr->net) != n_ptr->bclink.acked));
703 }
704
705
706 /**
707  * tipc_bcbearer_send - send a packet through the broadcast pseudo-bearer
708  *
709  * Send packet over as many bearers as necessary to reach all nodes
710  * that have joined the broadcast link.
711  *
712  * Returns 0 (packet sent successfully) under all circumstances,
713  * since the broadcast link's pseudo-bearer never blocks
714  */
715 static int tipc_bcbearer_send(struct net *net, struct sk_buff *buf,
716                               struct tipc_bearer *unused1,
717                               struct tipc_media_addr *unused2)
718 {
719         int bp_index;
720         struct tipc_msg *msg = buf_msg(buf);
721         struct tipc_net *tn = net_generic(net, tipc_net_id);
722         struct tipc_bcbearer *bcbearer = tn->bcbearer;
723         struct tipc_bc_base *bclink = tn->bcbase;
724
725         /* Prepare broadcast link message for reliable transmission,
726          * if first time trying to send it;
727          * preparation is skipped for broadcast link protocol messages
728          * since they are sent in an unreliable manner and don't need it
729          */
730         if (likely(!msg_non_seq(buf_msg(buf)))) {
731                 bcbuf_set_acks(buf, bclink->bcast_nodes.count);
732                 msg_set_non_seq(msg, 1);
733                 msg_set_mc_netid(msg, tn->net_id);
734                 tn->bcl->stats.sent_info++;
735                 if (WARN_ON(!bclink->bcast_nodes.count)) {
736                         dump_stack();
737                         return 0;
738                 }
739         }
740
741         /* Send buffer over bearers until all targets reached */
742         bcbearer->remains = bclink->bcast_nodes;
743
744         for (bp_index = 0; bp_index < MAX_BEARERS; bp_index++) {
745                 struct tipc_bearer *p = bcbearer->bpairs[bp_index].primary;
746                 struct tipc_bearer *s = bcbearer->bpairs[bp_index].secondary;
747                 struct tipc_bearer *bp[2] = {p, s};
748                 struct tipc_bearer *b = bp[msg_link_selector(msg)];
749                 struct sk_buff *tbuf;
750
751                 if (!p)
752                         break; /* No more bearers to try */
753                 if (!b)
754                         b = p;
755                 tipc_nmap_diff(&bcbearer->remains, &b->nodes,
756                                &bcbearer->remains_new);
757                 if (bcbearer->remains_new.count == bcbearer->remains.count)
758                         continue; /* Nothing added by bearer pair */
759
760                 if (bp_index == 0) {
761                         /* Use original buffer for first bearer */
762                         tipc_bearer_send(net, b->identity, buf, &b->bcast_addr);
763                 } else {
764                         /* Avoid concurrent buffer access */
765                         tbuf = pskb_copy_for_clone(buf, GFP_ATOMIC);
766                         if (!tbuf)
767                                 break;
768                         tipc_bearer_send(net, b->identity, tbuf,
769                                          &b->bcast_addr);
770                         kfree_skb(tbuf); /* Bearer keeps a clone */
771                 }
772                 if (bcbearer->remains_new.count == 0)
773                         break; /* All targets reached */
774
775                 bcbearer->remains = bcbearer->remains_new;
776         }
777
778         return 0;
779 }
780
781 /**
782  * tipc_bcbearer_sort - create sets of bearer pairs used by broadcast bearer
783  */
784 void tipc_bcbearer_sort(struct net *net, struct tipc_node_map *nm_ptr,
785                         u32 node, bool action)
786 {
787         struct tipc_net *tn = net_generic(net, tipc_net_id);
788         struct tipc_bcbearer *bcbearer = tn->bcbearer;
789         struct tipc_bcbearer_pair *bp_temp = bcbearer->bpairs_temp;
790         struct tipc_bcbearer_pair *bp_curr;
791         struct tipc_bearer *b;
792         int b_index;
793         int pri;
794
795         tipc_bclink_lock(net);
796
797         if (action)
798                 tipc_nmap_add(nm_ptr, node);
799         else
800                 tipc_nmap_remove(nm_ptr, node);
801
802         /* Group bearers by priority (can assume max of two per priority) */
803         memset(bp_temp, 0, sizeof(bcbearer->bpairs_temp));
804
805         rcu_read_lock();
806         for (b_index = 0; b_index < MAX_BEARERS; b_index++) {
807                 b = rcu_dereference_rtnl(tn->bearer_list[b_index]);
808                 if (!b || !b->nodes.count)
809                         continue;
810
811                 if (!bp_temp[b->priority].primary)
812                         bp_temp[b->priority].primary = b;
813                 else
814                         bp_temp[b->priority].secondary = b;
815         }
816         rcu_read_unlock();
817
818         /* Create array of bearer pairs for broadcasting */
819         bp_curr = bcbearer->bpairs;
820         memset(bcbearer->bpairs, 0, sizeof(bcbearer->bpairs));
821
822         for (pri = TIPC_MAX_LINK_PRI; pri >= 0; pri--) {
823
824                 if (!bp_temp[pri].primary)
825                         continue;
826
827                 bp_curr->primary = bp_temp[pri].primary;
828
829                 if (bp_temp[pri].secondary) {
830                         if (tipc_nmap_equal(&bp_temp[pri].primary->nodes,
831                                             &bp_temp[pri].secondary->nodes)) {
832                                 bp_curr->secondary = bp_temp[pri].secondary;
833                         } else {
834                                 bp_curr++;
835                                 bp_curr->primary = bp_temp[pri].secondary;
836                         }
837                 }
838
839                 bp_curr++;
840         }
841
842         tipc_bclink_unlock(net);
843 }
844
845 static int __tipc_nl_add_bc_link_stat(struct sk_buff *skb,
846                                       struct tipc_stats *stats)
847 {
848         int i;
849         struct nlattr *nest;
850
851         struct nla_map {
852                 __u32 key;
853                 __u32 val;
854         };
855
856         struct nla_map map[] = {
857                 {TIPC_NLA_STATS_RX_INFO, stats->recv_info},
858                 {TIPC_NLA_STATS_RX_FRAGMENTS, stats->recv_fragments},
859                 {TIPC_NLA_STATS_RX_FRAGMENTED, stats->recv_fragmented},
860                 {TIPC_NLA_STATS_RX_BUNDLES, stats->recv_bundles},
861                 {TIPC_NLA_STATS_RX_BUNDLED, stats->recv_bundled},
862                 {TIPC_NLA_STATS_TX_INFO, stats->sent_info},
863                 {TIPC_NLA_STATS_TX_FRAGMENTS, stats->sent_fragments},
864                 {TIPC_NLA_STATS_TX_FRAGMENTED, stats->sent_fragmented},
865                 {TIPC_NLA_STATS_TX_BUNDLES, stats->sent_bundles},
866                 {TIPC_NLA_STATS_TX_BUNDLED, stats->sent_bundled},
867                 {TIPC_NLA_STATS_RX_NACKS, stats->recv_nacks},
868                 {TIPC_NLA_STATS_RX_DEFERRED, stats->deferred_recv},
869                 {TIPC_NLA_STATS_TX_NACKS, stats->sent_nacks},
870                 {TIPC_NLA_STATS_TX_ACKS, stats->sent_acks},
871                 {TIPC_NLA_STATS_RETRANSMITTED, stats->retransmitted},
872                 {TIPC_NLA_STATS_DUPLICATES, stats->duplicates},
873                 {TIPC_NLA_STATS_LINK_CONGS, stats->link_congs},
874                 {TIPC_NLA_STATS_MAX_QUEUE, stats->max_queue_sz},
875                 {TIPC_NLA_STATS_AVG_QUEUE, stats->queue_sz_counts ?
876                         (stats->accu_queue_sz / stats->queue_sz_counts) : 0}
877         };
878
879         nest = nla_nest_start(skb, TIPC_NLA_LINK_STATS);
880         if (!nest)
881                 return -EMSGSIZE;
882
883         for (i = 0; i <  ARRAY_SIZE(map); i++)
884                 if (nla_put_u32(skb, map[i].key, map[i].val))
885                         goto msg_full;
886
887         nla_nest_end(skb, nest);
888
889         return 0;
890 msg_full:
891         nla_nest_cancel(skb, nest);
892
893         return -EMSGSIZE;
894 }
895
896 int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg)
897 {
898         int err;
899         void *hdr;
900         struct nlattr *attrs;
901         struct nlattr *prop;
902         struct tipc_net *tn = net_generic(net, tipc_net_id);
903         struct tipc_link *bcl = tn->bcl;
904
905         if (!bcl)
906                 return 0;
907
908         tipc_bclink_lock(net);
909
910         hdr = genlmsg_put(msg->skb, msg->portid, msg->seq, &tipc_genl_family,
911                           NLM_F_MULTI, TIPC_NL_LINK_GET);
912         if (!hdr)
913                 return -EMSGSIZE;
914
915         attrs = nla_nest_start(msg->skb, TIPC_NLA_LINK);
916         if (!attrs)
917                 goto msg_full;
918
919         /* The broadcast link is always up */
920         if (nla_put_flag(msg->skb, TIPC_NLA_LINK_UP))
921                 goto attr_msg_full;
922
923         if (nla_put_flag(msg->skb, TIPC_NLA_LINK_BROADCAST))
924                 goto attr_msg_full;
925         if (nla_put_string(msg->skb, TIPC_NLA_LINK_NAME, bcl->name))
926                 goto attr_msg_full;
927         if (nla_put_u32(msg->skb, TIPC_NLA_LINK_RX, bcl->rcv_nxt))
928                 goto attr_msg_full;
929         if (nla_put_u32(msg->skb, TIPC_NLA_LINK_TX, bcl->snd_nxt))
930                 goto attr_msg_full;
931
932         prop = nla_nest_start(msg->skb, TIPC_NLA_LINK_PROP);
933         if (!prop)
934                 goto attr_msg_full;
935         if (nla_put_u32(msg->skb, TIPC_NLA_PROP_WIN, bcl->window))
936                 goto prop_msg_full;
937         nla_nest_end(msg->skb, prop);
938
939         err = __tipc_nl_add_bc_link_stat(msg->skb, &bcl->stats);
940         if (err)
941                 goto attr_msg_full;
942
943         tipc_bclink_unlock(net);
944         nla_nest_end(msg->skb, attrs);
945         genlmsg_end(msg->skb, hdr);
946
947         return 0;
948
949 prop_msg_full:
950         nla_nest_cancel(msg->skb, prop);
951 attr_msg_full:
952         nla_nest_cancel(msg->skb, attrs);
953 msg_full:
954         tipc_bclink_unlock(net);
955         genlmsg_cancel(msg->skb, hdr);
956
957         return -EMSGSIZE;
958 }
959
960 int tipc_bclink_reset_stats(struct net *net)
961 {
962         struct tipc_net *tn = net_generic(net, tipc_net_id);
963         struct tipc_link *bcl = tn->bcl;
964
965         if (!bcl)
966                 return -ENOPROTOOPT;
967
968         tipc_bclink_lock(net);
969         memset(&bcl->stats, 0, sizeof(bcl->stats));
970         tipc_bclink_unlock(net);
971         return 0;
972 }
973
974 int tipc_bclink_set_queue_limits(struct net *net, u32 limit)
975 {
976         struct tipc_net *tn = net_generic(net, tipc_net_id);
977         struct tipc_link *bcl = tn->bcl;
978
979         if (!bcl)
980                 return -ENOPROTOOPT;
981         if (limit < BCLINK_WIN_MIN)
982                 limit = BCLINK_WIN_MIN;
983         if (limit > TIPC_MAX_LINK_WIN)
984                 return -EINVAL;
985         tipc_bclink_lock(net);
986         tipc_link_set_queue_limits(bcl, limit);
987         tipc_bclink_unlock(net);
988         return 0;
989 }
990
991 int tipc_nl_bc_link_set(struct net *net, struct nlattr *attrs[])
992 {
993         int err;
994         u32 win;
995         struct nlattr *props[TIPC_NLA_PROP_MAX + 1];
996
997         if (!attrs[TIPC_NLA_LINK_PROP])
998                 return -EINVAL;
999
1000         err = tipc_nl_parse_link_prop(attrs[TIPC_NLA_LINK_PROP], props);
1001         if (err)
1002                 return err;
1003
1004         if (!props[TIPC_NLA_PROP_WIN])
1005                 return -EOPNOTSUPP;
1006
1007         win = nla_get_u32(props[TIPC_NLA_PROP_WIN]);
1008
1009         return tipc_bclink_set_queue_limits(net, win);
1010 }
1011
1012 int tipc_bcast_init(struct net *net)
1013 {
1014         struct tipc_net *tn = net_generic(net, tipc_net_id);
1015         struct tipc_bcbearer *bcbearer;
1016         struct tipc_bc_base *bclink;
1017         struct tipc_link *bcl;
1018
1019         bcbearer = kzalloc(sizeof(*bcbearer), GFP_ATOMIC);
1020         if (!bcbearer)
1021                 return -ENOMEM;
1022
1023         bclink = kzalloc(sizeof(*bclink), GFP_ATOMIC);
1024         if (!bclink) {
1025                 kfree(bcbearer);
1026                 return -ENOMEM;
1027         }
1028
1029         bcl = &bclink->link;
1030         bcbearer->bearer.media = &bcbearer->media;
1031         bcbearer->media.send_msg = tipc_bcbearer_send;
1032         sprintf(bcbearer->media.name, "tipc-broadcast");
1033
1034         spin_lock_init(&bclink->lock);
1035         __skb_queue_head_init(&bcl->transmq);
1036         __skb_queue_head_init(&bcl->backlogq);
1037         __skb_queue_head_init(&bcl->deferdq);
1038         skb_queue_head_init(&bcl->wakeupq);
1039         bcl->snd_nxt = 1;
1040         spin_lock_init(&bclink->node.lock);
1041         __skb_queue_head_init(&bclink->arrvq);
1042         skb_queue_head_init(&bclink->inputq);
1043         bcl->owner = &bclink->node;
1044         bcl->owner->net = net;
1045         bcl->mtu = MAX_PKT_DEFAULT_MCAST;
1046         tipc_link_set_queue_limits(bcl, BCLINK_WIN_DEFAULT);
1047         bcl->bearer_id = MAX_BEARERS;
1048         rcu_assign_pointer(tn->bearer_list[MAX_BEARERS], &bcbearer->bearer);
1049         bcl->pmsg = (struct tipc_msg *)&bcl->proto_msg;
1050         msg_set_prevnode(bcl->pmsg, tn->own_addr);
1051         strlcpy(bcl->name, tipc_bclink_name, TIPC_MAX_LINK_NAME);
1052         tn->bcbearer = bcbearer;
1053         tn->bcbase = bclink;
1054         tn->bcl = bcl;
1055         return 0;
1056 }
1057
1058 void tipc_bcast_stop(struct net *net)
1059 {
1060         struct tipc_net *tn = net_generic(net, tipc_net_id);
1061
1062         tipc_bclink_lock(net);
1063         tipc_link_purge_queues(tn->bcl);
1064         tipc_bclink_unlock(net);
1065
1066         RCU_INIT_POINTER(tn->bearer_list[BCBEARER], NULL);
1067         synchronize_net();
1068         kfree(tn->bcbearer);
1069         kfree(tn->bcbase);
1070 }
1071
1072 /**
1073  * tipc_nmap_add - add a node to a node map
1074  */
1075 static void tipc_nmap_add(struct tipc_node_map *nm_ptr, u32 node)
1076 {
1077         int n = tipc_node(node);
1078         int w = n / WSIZE;
1079         u32 mask = (1 << (n % WSIZE));
1080
1081         if ((nm_ptr->map[w] & mask) == 0) {
1082                 nm_ptr->count++;
1083                 nm_ptr->map[w] |= mask;
1084         }
1085 }
1086
1087 /**
1088  * tipc_nmap_remove - remove a node from a node map
1089  */
1090 static void tipc_nmap_remove(struct tipc_node_map *nm_ptr, u32 node)
1091 {
1092         int n = tipc_node(node);
1093         int w = n / WSIZE;
1094         u32 mask = (1 << (n % WSIZE));
1095
1096         if ((nm_ptr->map[w] & mask) != 0) {
1097                 nm_ptr->map[w] &= ~mask;
1098                 nm_ptr->count--;
1099         }
1100 }
1101
1102 /**
1103  * tipc_nmap_diff - find differences between node maps
1104  * @nm_a: input node map A
1105  * @nm_b: input node map B
1106  * @nm_diff: output node map A-B (i.e. nodes of A that are not in B)
1107  */
1108 static void tipc_nmap_diff(struct tipc_node_map *nm_a,
1109                            struct tipc_node_map *nm_b,
1110                            struct tipc_node_map *nm_diff)
1111 {
1112         int stop = ARRAY_SIZE(nm_a->map);
1113         int w;
1114         int b;
1115         u32 map;
1116
1117         memset(nm_diff, 0, sizeof(*nm_diff));
1118         for (w = 0; w < stop; w++) {
1119                 map = nm_a->map[w] ^ (nm_a->map[w] & nm_b->map[w]);
1120                 nm_diff->map[w] = map;
1121                 if (map != 0) {
1122                         for (b = 0 ; b < WSIZE; b++) {
1123                                 if (map & (1 << b))
1124                                         nm_diff->count++;
1125                         }
1126                 }
1127         }
1128 }