7fdf895e797385cbf8e96cd139042e4454500fd8
[cascardo/linux.git] / net / tipc / bcast.c
1 /*
2  * net/tipc/bcast.c: TIPC broadcast code
3  *
4  * Copyright (c) 2004-2006, 2014-2015, Ericsson AB
5  * Copyright (c) 2004, Intel Corporation.
6  * Copyright (c) 2005, 2010-2011, Wind River Systems
7  * All rights reserved.
8  *
9  * Redistribution and use in source and binary forms, with or without
10  * modification, are permitted provided that the following conditions are met:
11  *
12  * 1. Redistributions of source code must retain the above copyright
13  *    notice, this list of conditions and the following disclaimer.
14  * 2. Redistributions in binary form must reproduce the above copyright
15  *    notice, this list of conditions and the following disclaimer in the
16  *    documentation and/or other materials provided with the distribution.
17  * 3. Neither the names of the copyright holders nor the names of its
18  *    contributors may be used to endorse or promote products derived from
19  *    this software without specific prior written permission.
20  *
21  * Alternatively, this software may be distributed under the terms of the
22  * GNU General Public License ("GPL") version 2 as published by the Free
23  * Software Foundation.
24  *
25  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
26  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
27  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
28  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
29  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
30  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
31  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
32  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
33  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
34  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
35  * POSSIBILITY OF SUCH DAMAGE.
36  */
37
38 #include <linux/tipc_config.h>
39 #include "socket.h"
40 #include "msg.h"
41 #include "bcast.h"
42 #include "name_distr.h"
43 #include "link.h"
44 #include "node.h"
45
46 #define MAX_PKT_DEFAULT_MCAST   1500    /* bcast link max packet size (fixed) */
47 #define BCLINK_WIN_DEFAULT      50      /* bcast link window size (default) */
48 #define BCLINK_WIN_MIN          32      /* bcast minimum link window size */
49
50 const char tipc_bclink_name[] = "broadcast-link";
51
52 /**
53  * struct tipc_bcbearer_pair - a pair of bearers used by broadcast link
54  * @primary: pointer to primary bearer
55  * @secondary: pointer to secondary bearer
56  *
57  * Bearers must have same priority and same set of reachable destinations
58  * to be paired.
59  */
60
61 struct tipc_bcbearer_pair {
62         struct tipc_bearer *primary;
63         struct tipc_bearer *secondary;
64 };
65
66 #define BCBEARER                MAX_BEARERS
67
68 /**
69  * struct tipc_bcbearer - bearer used by broadcast link
70  * @bearer: (non-standard) broadcast bearer structure
71  * @media: (non-standard) broadcast media structure
72  * @bpairs: array of bearer pairs
73  * @bpairs_temp: temporary array of bearer pairs used by tipc_bcbearer_sort()
74  * @remains: temporary node map used by tipc_bcbearer_send()
75  * @remains_new: temporary node map used tipc_bcbearer_send()
76  *
77  * Note: The fields labelled "temporary" are incorporated into the bearer
78  * to avoid consuming potentially limited stack space through the use of
79  * large local variables within multicast routines.  Concurrent access is
80  * prevented through use of the spinlock "bcast_lock".
81  */
82 struct tipc_bcbearer {
83         struct tipc_bearer bearer;
84         struct tipc_media media;
85         struct tipc_bcbearer_pair bpairs[MAX_BEARERS];
86         struct tipc_bcbearer_pair bpairs_temp[TIPC_MAX_LINK_PRI + 1];
87         struct tipc_node_map remains;
88         struct tipc_node_map remains_new;
89 };
90
91 /**
92  * struct tipc_bc_base - link used for broadcast messages
93  * @link: (non-standard) broadcast link structure
94  * @node: (non-standard) node structure representing b'cast link's peer node
95  * @bcast_nodes: map of broadcast-capable nodes
96  * @retransmit_to: node that most recently requested a retransmit
97  *
98  * Handles sequence numbering, fragmentation, bundling, etc.
99  */
100 struct tipc_bc_base {
101         struct tipc_link *link;
102         struct tipc_node node;
103         struct sk_buff_head arrvq;
104         struct sk_buff_head inputq;
105         struct sk_buff_head namedq;
106         struct tipc_node_map bcast_nodes;
107         struct tipc_node *retransmit_to;
108 };
109
110 static struct tipc_bc_base *tipc_bc_base(struct net *net)
111 {
112         return tipc_net(net)->bcbase;
113 }
114
115 static struct tipc_link *tipc_bc_sndlink(struct net *net)
116 {
117         return tipc_net(net)->bcl;
118 }
119
120 /**
121  * tipc_nmap_equal - test for equality of node maps
122  */
123 static int tipc_nmap_equal(struct tipc_node_map *nm_a,
124                            struct tipc_node_map *nm_b)
125 {
126         return !memcmp(nm_a, nm_b, sizeof(*nm_a));
127 }
128
129 static void tipc_bcbearer_xmit(struct net *net, struct sk_buff_head *xmitq);
130 static void tipc_nmap_diff(struct tipc_node_map *nm_a,
131                            struct tipc_node_map *nm_b,
132                            struct tipc_node_map *nm_diff);
133 static void tipc_nmap_add(struct tipc_node_map *nm_ptr, u32 node);
134 static void tipc_nmap_remove(struct tipc_node_map *nm_ptr, u32 node);
135 static void tipc_bclink_lock(struct net *net)
136 {
137         tipc_bcast_lock(net);
138 }
139
140 static void tipc_bclink_unlock(struct net *net)
141 {
142         tipc_bcast_unlock(net);
143 }
144
145 void tipc_bclink_input(struct net *net)
146 {
147         struct tipc_net *tn = net_generic(net, tipc_net_id);
148
149         tipc_sk_mcast_rcv(net, &tn->bcbase->arrvq, &tn->bcbase->inputq);
150 }
151
152 uint  tipc_bcast_get_mtu(void)
153 {
154         return MAX_PKT_DEFAULT_MCAST;
155 }
156
157 static u16 bcbuf_acks(struct sk_buff *skb)
158 {
159         return TIPC_SKB_CB(skb)->ackers;
160 }
161
162 static void bcbuf_set_acks(struct sk_buff *buf, u16 ackers)
163 {
164         TIPC_SKB_CB(buf)->ackers = ackers;
165 }
166
167 static void bcbuf_decr_acks(struct sk_buff *buf)
168 {
169         bcbuf_set_acks(buf, bcbuf_acks(buf) - 1);
170 }
171
172 void tipc_bclink_add_node(struct net *net, u32 addr)
173 {
174         struct tipc_net *tn = net_generic(net, tipc_net_id);
175         struct tipc_link *l = tipc_bc_sndlink(net);
176         tipc_bclink_lock(net);
177         tipc_nmap_add(&tn->bcbase->bcast_nodes, addr);
178         tipc_link_add_bc_peer(l);
179         tipc_bclink_unlock(net);
180 }
181
182 void tipc_bclink_remove_node(struct net *net, u32 addr)
183 {
184         struct tipc_net *tn = net_generic(net, tipc_net_id);
185
186         tipc_bclink_lock(net);
187         tipc_nmap_remove(&tn->bcbase->bcast_nodes, addr);
188         tn->bcl->ackers--;
189
190         /* Last node? => reset backlog queue */
191         if (!tn->bcbase->bcast_nodes.count)
192                 tipc_link_purge_backlog(tn->bcbase->link);
193
194         tipc_bclink_unlock(net);
195 }
196
197 static void bclink_set_last_sent(struct net *net)
198 {
199         struct tipc_net *tn = net_generic(net, tipc_net_id);
200         struct tipc_link *bcl = tn->bcl;
201
202         bcl->silent_intv_cnt = mod(bcl->snd_nxt - 1);
203 }
204
205 u32 tipc_bclink_get_last_sent(struct net *net)
206 {
207         struct tipc_net *tn = net_generic(net, tipc_net_id);
208
209         return tn->bcl->silent_intv_cnt;
210 }
211
212 static void bclink_update_last_sent(struct tipc_node *node, u32 seqno)
213 {
214         node->bclink.last_sent = less_eq(node->bclink.last_sent, seqno) ?
215                                                 seqno : node->bclink.last_sent;
216 }
217
218 /**
219  * tipc_bclink_retransmit_to - get most recent node to request retransmission
220  *
221  * Called with bclink_lock locked
222  */
223 struct tipc_node *tipc_bclink_retransmit_to(struct net *net)
224 {
225         struct tipc_net *tn = net_generic(net, tipc_net_id);
226
227         return tn->bcbase->retransmit_to;
228 }
229
230 /**
231  * bclink_retransmit_pkt - retransmit broadcast packets
232  * @after: sequence number of last packet to *not* retransmit
233  * @to: sequence number of last packet to retransmit
234  *
235  * Called with bclink_lock locked
236  */
237 static void bclink_retransmit_pkt(struct tipc_net *tn, u32 after, u32 to)
238 {
239         struct sk_buff *skb;
240         struct tipc_link *bcl = tn->bcl;
241
242         skb_queue_walk(&bcl->transmq, skb) {
243                 if (more(buf_seqno(skb), after)) {
244                         tipc_link_retransmit(bcl, skb, mod(to - after));
245                         break;
246                 }
247         }
248 }
249
250 /**
251  * bclink_prepare_wakeup - prepare users for wakeup after congestion
252  * @bcl: broadcast link
253  * @resultq: queue for users which can be woken up
254  * Move a number of waiting users, as permitted by available space in
255  * the send queue, from link wait queue to specified queue for wakeup
256  */
257 static void bclink_prepare_wakeup(struct tipc_link *bcl, struct sk_buff_head *resultq)
258 {
259         int pnd[TIPC_SYSTEM_IMPORTANCE + 1] = {0,};
260         int imp, lim;
261         struct sk_buff *skb, *tmp;
262
263         skb_queue_walk_safe(&bcl->wakeupq, skb, tmp) {
264                 imp = TIPC_SKB_CB(skb)->chain_imp;
265                 lim = bcl->window + bcl->backlog[imp].limit;
266                 pnd[imp] += TIPC_SKB_CB(skb)->chain_sz;
267                 if ((pnd[imp] + bcl->backlog[imp].len) >= lim)
268                         continue;
269                 skb_unlink(skb, &bcl->wakeupq);
270                 skb_queue_tail(resultq, skb);
271         }
272 }
273
274 /**
275  * tipc_bclink_wakeup_users - wake up pending users
276  *
277  * Called with no locks taken
278  */
279 void tipc_bclink_wakeup_users(struct net *net)
280 {
281         struct tipc_net *tn = net_generic(net, tipc_net_id);
282         struct tipc_link *bcl = tn->bcl;
283         struct sk_buff_head resultq;
284
285         skb_queue_head_init(&resultq);
286         bclink_prepare_wakeup(bcl, &resultq);
287         tipc_sk_rcv(net, &resultq);
288 }
289
290 /**
291  * tipc_bclink_acknowledge - handle acknowledgement of broadcast packets
292  * @n_ptr: node that sent acknowledgement info
293  * @acked: broadcast sequence # that has been acknowledged
294  *
295  * Node is locked, bclink_lock unlocked.
296  */
297 void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked)
298 {
299         struct sk_buff *skb, *tmp;
300         unsigned int released = 0;
301         struct net *net = n_ptr->net;
302         struct tipc_net *tn = net_generic(net, tipc_net_id);
303
304         if (unlikely(!n_ptr->bclink.recv_permitted))
305                 return;
306         tipc_bclink_lock(net);
307
308         /* Bail out if tx queue is empty (no clean up is required) */
309         skb = skb_peek(&tn->bcl->transmq);
310         if (!skb)
311                 goto exit;
312
313         /* Determine which messages need to be acknowledged */
314         if (acked == INVALID_LINK_SEQ) {
315                 /*
316                  * Contact with specified node has been lost, so need to
317                  * acknowledge sent messages only (if other nodes still exist)
318                  * or both sent and unsent messages (otherwise)
319                  */
320                 if (tn->bcbase->bcast_nodes.count)
321                         acked = tn->bcl->silent_intv_cnt;
322                 else
323                         acked = tn->bcl->snd_nxt;
324         } else {
325                 /*
326                  * Bail out if specified sequence number does not correspond
327                  * to a message that has been sent and not yet acknowledged
328                  */
329                 if (less(acked, buf_seqno(skb)) ||
330                     less(tn->bcl->silent_intv_cnt, acked) ||
331                     less_eq(acked, n_ptr->bclink.acked))
332                         goto exit;
333         }
334         /* Skip over packets that node has previously acknowledged */
335         skb_queue_walk(&tn->bcl->transmq, skb) {
336                 if (more(buf_seqno(skb), n_ptr->bclink.acked))
337                         break;
338         }
339         /* Update packets that node is now acknowledging */
340         skb_queue_walk_from_safe(&tn->bcl->transmq, skb, tmp) {
341                 if (more(buf_seqno(skb), acked))
342                         break;
343                 bcbuf_decr_acks(skb);
344                 bclink_set_last_sent(net);
345                 if (bcbuf_acks(skb) == 0) {
346                         __skb_unlink(skb, &tn->bcl->transmq);
347                         kfree_skb(skb);
348                         released = 1;
349                 }
350         }
351         n_ptr->bclink.acked = acked;
352
353         /* Try resolving broadcast link congestion, if necessary */
354         if (unlikely(skb_peek(&tn->bcl->backlogq))) {
355                 tipc_link_push_packets(tn->bcl);
356                 bclink_set_last_sent(net);
357         }
358         if (unlikely(released && !skb_queue_empty(&tn->bcl->wakeupq)))
359                 n_ptr->action_flags |= TIPC_WAKEUP_BCAST_USERS;
360 exit:
361         tipc_bclink_unlock(net);
362 }
363
364 /**
365  * tipc_bclink_update_link_state - update broadcast link state
366  *
367  * RCU and node lock set
368  */
369 void tipc_bclink_update_link_state(struct tipc_node *n_ptr,
370                                    u32 last_sent)
371 {
372         struct sk_buff *buf;
373         struct net *net = n_ptr->net;
374         struct tipc_net *tn = net_generic(net, tipc_net_id);
375         struct tipc_link *bcl = tn->bcl;
376
377         /* Ignore "stale" link state info */
378         if (less_eq(last_sent, n_ptr->bclink.last_in))
379                 return;
380
381         /* Update link synchronization state; quit if in sync */
382         bclink_update_last_sent(n_ptr, last_sent);
383
384         /* This is a good location for statistical profiling */
385         bcl->stats.queue_sz_counts++;
386         bcl->stats.accu_queue_sz += skb_queue_len(&bcl->transmq);
387
388         if (n_ptr->bclink.last_sent == n_ptr->bclink.last_in)
389                 return;
390
391         /* Update out-of-sync state; quit if loss is still unconfirmed */
392         if ((++n_ptr->bclink.oos_state) == 1) {
393                 if (n_ptr->bclink.deferred_size < (TIPC_MIN_LINK_WIN / 2))
394                         return;
395                 n_ptr->bclink.oos_state++;
396         }
397
398         /* Don't NACK if one has been recently sent (or seen) */
399         if (n_ptr->bclink.oos_state & 0x1)
400                 return;
401
402         /* Send NACK */
403         buf = tipc_buf_acquire(INT_H_SIZE);
404         if (buf) {
405                 struct tipc_msg *msg = buf_msg(buf);
406                 struct sk_buff *skb = skb_peek(&n_ptr->bclink.deferdq);
407                 u32 to = skb ? buf_seqno(skb) - 1 : n_ptr->bclink.last_sent;
408
409                 tipc_msg_init(tn->own_addr, msg, BCAST_PROTOCOL, STATE_MSG,
410                               INT_H_SIZE, n_ptr->addr);
411                 msg_set_non_seq(msg, 1);
412                 msg_set_mc_netid(msg, tn->net_id);
413                 msg_set_bcast_ack(msg, n_ptr->bclink.last_in);
414                 msg_set_bcgap_after(msg, n_ptr->bclink.last_in);
415                 msg_set_bcgap_to(msg, to);
416
417                 tipc_bclink_lock(net);
418                 tipc_bearer_send(net, MAX_BEARERS, buf, NULL);
419                 tn->bcl->stats.sent_nacks++;
420                 tipc_bclink_unlock(net);
421                 kfree_skb(buf);
422
423                 n_ptr->bclink.oos_state++;
424         }
425 }
426
427 void tipc_bclink_sync_state(struct tipc_node *n, struct tipc_msg *hdr)
428 {
429         u16 last = msg_last_bcast(hdr);
430         int mtyp = msg_type(hdr);
431
432         if (unlikely(msg_user(hdr) != LINK_PROTOCOL))
433                 return;
434         if (mtyp == STATE_MSG) {
435                 tipc_bclink_update_link_state(n, last);
436                 return;
437         }
438         /* Compatibility: older nodes don't know BCAST_PROTOCOL synchronization,
439          * and transfer synch info in LINK_PROTOCOL messages.
440          */
441         if (tipc_node_is_up(n))
442                 return;
443         if ((mtyp != RESET_MSG) && (mtyp != ACTIVATE_MSG))
444                 return;
445         n->bclink.last_sent = last;
446         n->bclink.last_in = last;
447         n->bclink.oos_state = 0;
448 }
449
450 /**
451  * bclink_peek_nack - monitor retransmission requests sent by other nodes
452  *
453  * Delay any upcoming NACK by this node if another node has already
454  * requested the first message this node is going to ask for.
455  */
456 static void bclink_peek_nack(struct net *net, struct tipc_msg *msg)
457 {
458         struct tipc_node *n_ptr = tipc_node_find(net, msg_destnode(msg));
459
460         if (unlikely(!n_ptr))
461                 return;
462
463         tipc_node_lock(n_ptr);
464         if (n_ptr->bclink.recv_permitted &&
465             (n_ptr->bclink.last_in != n_ptr->bclink.last_sent) &&
466             (n_ptr->bclink.last_in == msg_bcgap_after(msg)))
467                 n_ptr->bclink.oos_state = 2;
468         tipc_node_unlock(n_ptr);
469         tipc_node_put(n_ptr);
470 }
471
472 /* tipc_bcast_xmit - deliver buffer chain to all nodes in cluster
473  *                    and to identified node local sockets
474  * @net: the applicable net namespace
475  * @list: chain of buffers containing message
476  * Consumes the buffer chain, except when returning -ELINKCONG
477  * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE
478  */
479 int tipc_bcast_xmit(struct net *net, struct sk_buff_head *list)
480 {
481         struct tipc_link *l = tipc_bc_sndlink(net);
482         struct sk_buff_head xmitq, inputq, rcvq;
483         int rc = 0;
484
485         __skb_queue_head_init(&rcvq);
486         __skb_queue_head_init(&xmitq);
487         skb_queue_head_init(&inputq);
488
489         /* Prepare message clone for local node */
490         if (unlikely(!tipc_msg_reassemble(list, &rcvq)))
491                 return -EHOSTUNREACH;
492
493         tipc_bcast_lock(net);
494         if (tipc_link_bc_peers(l))
495                 rc = tipc_link_xmit(l, list, &xmitq);
496         bclink_set_last_sent(net);
497         tipc_bcast_unlock(net);
498
499         /* Don't send to local node if adding to link failed */
500         if (unlikely(rc)) {
501                 __skb_queue_purge(&rcvq);
502                 return rc;
503         }
504         /* Broadcast to all nodes, inluding local node */
505         tipc_bcbearer_xmit(net, &xmitq);
506         tipc_sk_mcast_rcv(net, &rcvq, &inputq);
507         __skb_queue_purge(list);
508         return 0;
509 }
510 /**
511  * bclink_accept_pkt - accept an incoming, in-sequence broadcast packet
512  *
513  * Called with both sending node's lock and bclink_lock taken.
514  */
515 static void bclink_accept_pkt(struct tipc_node *node, u32 seqno)
516 {
517         struct tipc_net *tn = net_generic(node->net, tipc_net_id);
518
519         bclink_update_last_sent(node, seqno);
520         node->bclink.last_in = seqno;
521         node->bclink.oos_state = 0;
522         tn->bcl->stats.recv_info++;
523
524         /*
525          * Unicast an ACK periodically, ensuring that
526          * all nodes in the cluster don't ACK at the same time
527          */
528         if (((seqno - tn->own_addr) % TIPC_MIN_LINK_WIN) == 0) {
529                 tipc_link_proto_xmit(node_active_link(node, node->addr),
530                                      STATE_MSG, 0, 0, 0, 0);
531                 tn->bcl->stats.sent_acks++;
532         }
533 }
534
535 /**
536  * tipc_bclink_rcv - receive a broadcast packet, and deliver upwards
537  *
538  * RCU is locked, no other locks set
539  */
540 void tipc_bclink_rcv(struct net *net, struct sk_buff *buf)
541 {
542         struct tipc_net *tn = net_generic(net, tipc_net_id);
543         struct tipc_link *bcl = tn->bcl;
544         struct tipc_msg *msg = buf_msg(buf);
545         struct tipc_node *node;
546         u32 next_in;
547         u32 seqno;
548         int deferred = 0;
549         int pos = 0;
550         struct sk_buff *iskb;
551         struct sk_buff_head *arrvq, *inputq;
552
553         /* Screen out unwanted broadcast messages */
554         if (msg_mc_netid(msg) != tn->net_id)
555                 goto exit;
556
557         node = tipc_node_find(net, msg_prevnode(msg));
558         if (unlikely(!node))
559                 goto exit;
560         tipc_node_lock(node);
561         if (unlikely(!node->bclink.recv_permitted))
562                 goto unlock;
563
564         /* Handle broadcast protocol message */
565         if (unlikely(msg_user(msg) == BCAST_PROTOCOL)) {
566                 if (msg_type(msg) != STATE_MSG)
567                         goto unlock;
568                 if (msg_destnode(msg) == tn->own_addr) {
569                         tipc_bclink_acknowledge(node, msg_bcast_ack(msg));
570                         tipc_bclink_lock(net);
571                         bcl->stats.recv_nacks++;
572                         tn->bcbase->retransmit_to = node;
573                         bclink_retransmit_pkt(tn, msg_bcgap_after(msg),
574                                               msg_bcgap_to(msg));
575                         tipc_bclink_unlock(net);
576                         tipc_node_unlock(node);
577                 } else {
578                         tipc_node_unlock(node);
579                         bclink_peek_nack(net, msg);
580                 }
581                 tipc_node_put(node);
582                 goto exit;
583         }
584         /* Handle in-sequence broadcast message */
585         seqno = msg_seqno(msg);
586         next_in = mod(node->bclink.last_in + 1);
587         arrvq = &tn->bcbase->arrvq;
588         inputq = &tn->bcbase->inputq;
589
590         if (likely(seqno == next_in)) {
591 receive:
592                 /* Deliver message to destination */
593                 if (likely(msg_isdata(msg))) {
594                         tipc_bclink_lock(net);
595                         bclink_accept_pkt(node, seqno);
596                         spin_lock_bh(&inputq->lock);
597                         __skb_queue_tail(arrvq, buf);
598                         spin_unlock_bh(&inputq->lock);
599                         node->action_flags |= TIPC_BCAST_MSG_EVT;
600                         tipc_bclink_unlock(net);
601                         tipc_node_unlock(node);
602                 } else if (msg_user(msg) == MSG_BUNDLER) {
603                         tipc_bclink_lock(net);
604                         bclink_accept_pkt(node, seqno);
605                         bcl->stats.recv_bundles++;
606                         bcl->stats.recv_bundled += msg_msgcnt(msg);
607                         pos = 0;
608                         while (tipc_msg_extract(buf, &iskb, &pos)) {
609                                 spin_lock_bh(&inputq->lock);
610                                 __skb_queue_tail(arrvq, iskb);
611                                 spin_unlock_bh(&inputq->lock);
612                         }
613                         node->action_flags |= TIPC_BCAST_MSG_EVT;
614                         tipc_bclink_unlock(net);
615                         tipc_node_unlock(node);
616                 } else if (msg_user(msg) == MSG_FRAGMENTER) {
617                         tipc_bclink_lock(net);
618                         bclink_accept_pkt(node, seqno);
619                         tipc_buf_append(&node->bclink.reasm_buf, &buf);
620                         if (unlikely(!buf && !node->bclink.reasm_buf)) {
621                                 tipc_bclink_unlock(net);
622                                 goto unlock;
623                         }
624                         bcl->stats.recv_fragments++;
625                         if (buf) {
626                                 bcl->stats.recv_fragmented++;
627                                 msg = buf_msg(buf);
628                                 tipc_bclink_unlock(net);
629                                 goto receive;
630                         }
631                         tipc_bclink_unlock(net);
632                         tipc_node_unlock(node);
633                 } else {
634                         tipc_bclink_lock(net);
635                         bclink_accept_pkt(node, seqno);
636                         tipc_bclink_unlock(net);
637                         tipc_node_unlock(node);
638                         kfree_skb(buf);
639                 }
640                 buf = NULL;
641
642                 /* Determine new synchronization state */
643                 tipc_node_lock(node);
644                 if (unlikely(!tipc_node_is_up(node)))
645                         goto unlock;
646
647                 if (node->bclink.last_in == node->bclink.last_sent)
648                         goto unlock;
649
650                 if (skb_queue_empty(&node->bclink.deferdq)) {
651                         node->bclink.oos_state = 1;
652                         goto unlock;
653                 }
654
655                 msg = buf_msg(skb_peek(&node->bclink.deferdq));
656                 seqno = msg_seqno(msg);
657                 next_in = mod(next_in + 1);
658                 if (seqno != next_in)
659                         goto unlock;
660
661                 /* Take in-sequence message from deferred queue & deliver it */
662                 buf = __skb_dequeue(&node->bclink.deferdq);
663                 goto receive;
664         }
665
666         /* Handle out-of-sequence broadcast message */
667         if (less(next_in, seqno)) {
668                 deferred = tipc_link_defer_pkt(&node->bclink.deferdq,
669                                                buf);
670                 bclink_update_last_sent(node, seqno);
671                 buf = NULL;
672         }
673
674         tipc_bclink_lock(net);
675
676         if (deferred)
677                 bcl->stats.deferred_recv++;
678         else
679                 bcl->stats.duplicates++;
680
681         tipc_bclink_unlock(net);
682
683 unlock:
684         tipc_node_unlock(node);
685         tipc_node_put(node);
686 exit:
687         kfree_skb(buf);
688 }
689
690 u32 tipc_bclink_acks_missing(struct tipc_node *n_ptr)
691 {
692         return (n_ptr->bclink.recv_permitted &&
693                 (tipc_bclink_get_last_sent(n_ptr->net) != n_ptr->bclink.acked));
694 }
695
696
697 /**
698  * tipc_bcbearer_send - send a packet through the broadcast pseudo-bearer
699  *
700  * Send packet over as many bearers as necessary to reach all nodes
701  * that have joined the broadcast link.
702  *
703  * Returns 0 (packet sent successfully) under all circumstances,
704  * since the broadcast link's pseudo-bearer never blocks
705  */
706 static int tipc_bcbearer_send(struct net *net, struct sk_buff *buf,
707                               struct tipc_bearer *unused1,
708                               struct tipc_media_addr *unused2)
709 {
710         int bp_index;
711         struct tipc_msg *msg = buf_msg(buf);
712         struct tipc_net *tn = net_generic(net, tipc_net_id);
713         struct tipc_bcbearer *bcbearer = tn->bcbearer;
714         struct tipc_bc_base *bclink = tn->bcbase;
715
716         /* Prepare broadcast link message for reliable transmission,
717          * if first time trying to send it;
718          * preparation is skipped for broadcast link protocol messages
719          * since they are sent in an unreliable manner and don't need it
720          */
721         if (likely(!msg_non_seq(buf_msg(buf)))) {
722                 bcbuf_set_acks(buf, bclink->bcast_nodes.count);
723                 msg_set_non_seq(msg, 1);
724                 msg_set_mc_netid(msg, tn->net_id);
725                 tn->bcl->stats.sent_info++;
726                 if (WARN_ON(!bclink->bcast_nodes.count)) {
727                         dump_stack();
728                         return 0;
729                 }
730         }
731
732         /* Send buffer over bearers until all targets reached */
733         bcbearer->remains = bclink->bcast_nodes;
734
735         for (bp_index = 0; bp_index < MAX_BEARERS; bp_index++) {
736                 struct tipc_bearer *p = bcbearer->bpairs[bp_index].primary;
737                 struct tipc_bearer *s = bcbearer->bpairs[bp_index].secondary;
738                 struct tipc_bearer *bp[2] = {p, s};
739                 struct tipc_bearer *b = bp[msg_link_selector(msg)];
740                 struct sk_buff *tbuf;
741
742                 if (!p)
743                         break; /* No more bearers to try */
744                 if (!b)
745                         b = p;
746                 tipc_nmap_diff(&bcbearer->remains, &b->nodes,
747                                &bcbearer->remains_new);
748                 if (bcbearer->remains_new.count == bcbearer->remains.count)
749                         continue; /* Nothing added by bearer pair */
750
751                 if (bp_index == 0) {
752                         /* Use original buffer for first bearer */
753                         tipc_bearer_send(net, b->identity, buf, &b->bcast_addr);
754                 } else {
755                         /* Avoid concurrent buffer access */
756                         tbuf = pskb_copy_for_clone(buf, GFP_ATOMIC);
757                         if (!tbuf)
758                                 break;
759                         tipc_bearer_send(net, b->identity, tbuf,
760                                          &b->bcast_addr);
761                         kfree_skb(tbuf); /* Bearer keeps a clone */
762                 }
763                 if (bcbearer->remains_new.count == 0)
764                         break; /* All targets reached */
765
766                 bcbearer->remains = bcbearer->remains_new;
767         }
768
769         return 0;
770 }
771
772 static void tipc_bcbearer_xmit(struct net *net, struct sk_buff_head *xmitq)
773 {
774         struct sk_buff *skb, *tmp;
775
776         skb_queue_walk_safe(xmitq, skb, tmp) {
777                 __skb_dequeue(xmitq);
778                 tipc_bcbearer_send(net, skb, NULL, NULL);
779
780                 /* Until we remove cloning in tipc_l2_send_msg(): */
781                 kfree_skb(skb);
782         }
783 }
784
785 /**
786  * tipc_bcbearer_sort - create sets of bearer pairs used by broadcast bearer
787  */
788 void tipc_bcbearer_sort(struct net *net, struct tipc_node_map *nm_ptr,
789                         u32 node, bool action)
790 {
791         struct tipc_net *tn = net_generic(net, tipc_net_id);
792         struct tipc_bcbearer *bcbearer = tn->bcbearer;
793         struct tipc_bcbearer_pair *bp_temp = bcbearer->bpairs_temp;
794         struct tipc_bcbearer_pair *bp_curr;
795         struct tipc_bearer *b;
796         int b_index;
797         int pri;
798
799         tipc_bclink_lock(net);
800
801         if (action)
802                 tipc_nmap_add(nm_ptr, node);
803         else
804                 tipc_nmap_remove(nm_ptr, node);
805
806         /* Group bearers by priority (can assume max of two per priority) */
807         memset(bp_temp, 0, sizeof(bcbearer->bpairs_temp));
808
809         rcu_read_lock();
810         for (b_index = 0; b_index < MAX_BEARERS; b_index++) {
811                 b = rcu_dereference_rtnl(tn->bearer_list[b_index]);
812                 if (!b || !b->nodes.count)
813                         continue;
814
815                 if (!bp_temp[b->priority].primary)
816                         bp_temp[b->priority].primary = b;
817                 else
818                         bp_temp[b->priority].secondary = b;
819         }
820         rcu_read_unlock();
821
822         /* Create array of bearer pairs for broadcasting */
823         bp_curr = bcbearer->bpairs;
824         memset(bcbearer->bpairs, 0, sizeof(bcbearer->bpairs));
825
826         for (pri = TIPC_MAX_LINK_PRI; pri >= 0; pri--) {
827
828                 if (!bp_temp[pri].primary)
829                         continue;
830
831                 bp_curr->primary = bp_temp[pri].primary;
832
833                 if (bp_temp[pri].secondary) {
834                         if (tipc_nmap_equal(&bp_temp[pri].primary->nodes,
835                                             &bp_temp[pri].secondary->nodes)) {
836                                 bp_curr->secondary = bp_temp[pri].secondary;
837                         } else {
838                                 bp_curr++;
839                                 bp_curr->primary = bp_temp[pri].secondary;
840                         }
841                 }
842
843                 bp_curr++;
844         }
845
846         tipc_bclink_unlock(net);
847 }
848
849 static int __tipc_nl_add_bc_link_stat(struct sk_buff *skb,
850                                       struct tipc_stats *stats)
851 {
852         int i;
853         struct nlattr *nest;
854
855         struct nla_map {
856                 __u32 key;
857                 __u32 val;
858         };
859
860         struct nla_map map[] = {
861                 {TIPC_NLA_STATS_RX_INFO, stats->recv_info},
862                 {TIPC_NLA_STATS_RX_FRAGMENTS, stats->recv_fragments},
863                 {TIPC_NLA_STATS_RX_FRAGMENTED, stats->recv_fragmented},
864                 {TIPC_NLA_STATS_RX_BUNDLES, stats->recv_bundles},
865                 {TIPC_NLA_STATS_RX_BUNDLED, stats->recv_bundled},
866                 {TIPC_NLA_STATS_TX_INFO, stats->sent_info},
867                 {TIPC_NLA_STATS_TX_FRAGMENTS, stats->sent_fragments},
868                 {TIPC_NLA_STATS_TX_FRAGMENTED, stats->sent_fragmented},
869                 {TIPC_NLA_STATS_TX_BUNDLES, stats->sent_bundles},
870                 {TIPC_NLA_STATS_TX_BUNDLED, stats->sent_bundled},
871                 {TIPC_NLA_STATS_RX_NACKS, stats->recv_nacks},
872                 {TIPC_NLA_STATS_RX_DEFERRED, stats->deferred_recv},
873                 {TIPC_NLA_STATS_TX_NACKS, stats->sent_nacks},
874                 {TIPC_NLA_STATS_TX_ACKS, stats->sent_acks},
875                 {TIPC_NLA_STATS_RETRANSMITTED, stats->retransmitted},
876                 {TIPC_NLA_STATS_DUPLICATES, stats->duplicates},
877                 {TIPC_NLA_STATS_LINK_CONGS, stats->link_congs},
878                 {TIPC_NLA_STATS_MAX_QUEUE, stats->max_queue_sz},
879                 {TIPC_NLA_STATS_AVG_QUEUE, stats->queue_sz_counts ?
880                         (stats->accu_queue_sz / stats->queue_sz_counts) : 0}
881         };
882
883         nest = nla_nest_start(skb, TIPC_NLA_LINK_STATS);
884         if (!nest)
885                 return -EMSGSIZE;
886
887         for (i = 0; i <  ARRAY_SIZE(map); i++)
888                 if (nla_put_u32(skb, map[i].key, map[i].val))
889                         goto msg_full;
890
891         nla_nest_end(skb, nest);
892
893         return 0;
894 msg_full:
895         nla_nest_cancel(skb, nest);
896
897         return -EMSGSIZE;
898 }
899
900 int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg)
901 {
902         int err;
903         void *hdr;
904         struct nlattr *attrs;
905         struct nlattr *prop;
906         struct tipc_net *tn = net_generic(net, tipc_net_id);
907         struct tipc_link *bcl = tn->bcl;
908
909         if (!bcl)
910                 return 0;
911
912         tipc_bclink_lock(net);
913
914         hdr = genlmsg_put(msg->skb, msg->portid, msg->seq, &tipc_genl_family,
915                           NLM_F_MULTI, TIPC_NL_LINK_GET);
916         if (!hdr)
917                 return -EMSGSIZE;
918
919         attrs = nla_nest_start(msg->skb, TIPC_NLA_LINK);
920         if (!attrs)
921                 goto msg_full;
922
923         /* The broadcast link is always up */
924         if (nla_put_flag(msg->skb, TIPC_NLA_LINK_UP))
925                 goto attr_msg_full;
926
927         if (nla_put_flag(msg->skb, TIPC_NLA_LINK_BROADCAST))
928                 goto attr_msg_full;
929         if (nla_put_string(msg->skb, TIPC_NLA_LINK_NAME, bcl->name))
930                 goto attr_msg_full;
931         if (nla_put_u32(msg->skb, TIPC_NLA_LINK_RX, bcl->rcv_nxt))
932                 goto attr_msg_full;
933         if (nla_put_u32(msg->skb, TIPC_NLA_LINK_TX, bcl->snd_nxt))
934                 goto attr_msg_full;
935
936         prop = nla_nest_start(msg->skb, TIPC_NLA_LINK_PROP);
937         if (!prop)
938                 goto attr_msg_full;
939         if (nla_put_u32(msg->skb, TIPC_NLA_PROP_WIN, bcl->window))
940                 goto prop_msg_full;
941         nla_nest_end(msg->skb, prop);
942
943         err = __tipc_nl_add_bc_link_stat(msg->skb, &bcl->stats);
944         if (err)
945                 goto attr_msg_full;
946
947         tipc_bclink_unlock(net);
948         nla_nest_end(msg->skb, attrs);
949         genlmsg_end(msg->skb, hdr);
950
951         return 0;
952
953 prop_msg_full:
954         nla_nest_cancel(msg->skb, prop);
955 attr_msg_full:
956         nla_nest_cancel(msg->skb, attrs);
957 msg_full:
958         tipc_bclink_unlock(net);
959         genlmsg_cancel(msg->skb, hdr);
960
961         return -EMSGSIZE;
962 }
963
964 int tipc_bclink_reset_stats(struct net *net)
965 {
966         struct tipc_net *tn = net_generic(net, tipc_net_id);
967         struct tipc_link *bcl = tn->bcl;
968
969         if (!bcl)
970                 return -ENOPROTOOPT;
971
972         tipc_bclink_lock(net);
973         memset(&bcl->stats, 0, sizeof(bcl->stats));
974         tipc_bclink_unlock(net);
975         return 0;
976 }
977
978 int tipc_bclink_set_queue_limits(struct net *net, u32 limit)
979 {
980         struct tipc_net *tn = net_generic(net, tipc_net_id);
981         struct tipc_link *bcl = tn->bcl;
982
983         if (!bcl)
984                 return -ENOPROTOOPT;
985         if (limit < BCLINK_WIN_MIN)
986                 limit = BCLINK_WIN_MIN;
987         if (limit > TIPC_MAX_LINK_WIN)
988                 return -EINVAL;
989         tipc_bclink_lock(net);
990         tipc_link_set_queue_limits(bcl, limit);
991         tipc_bclink_unlock(net);
992         return 0;
993 }
994
995 int tipc_nl_bc_link_set(struct net *net, struct nlattr *attrs[])
996 {
997         int err;
998         u32 win;
999         struct nlattr *props[TIPC_NLA_PROP_MAX + 1];
1000
1001         if (!attrs[TIPC_NLA_LINK_PROP])
1002                 return -EINVAL;
1003
1004         err = tipc_nl_parse_link_prop(attrs[TIPC_NLA_LINK_PROP], props);
1005         if (err)
1006                 return err;
1007
1008         if (!props[TIPC_NLA_PROP_WIN])
1009                 return -EOPNOTSUPP;
1010
1011         win = nla_get_u32(props[TIPC_NLA_PROP_WIN]);
1012
1013         return tipc_bclink_set_queue_limits(net, win);
1014 }
1015
1016 int tipc_bcast_init(struct net *net)
1017 {
1018         struct tipc_net *tn = tipc_net(net);
1019         struct tipc_bcbearer *bcb = NULL;
1020         struct tipc_bc_base *bb = NULL;
1021         struct tipc_link *l = NULL;
1022
1023         bcb = kzalloc(sizeof(*bcb), GFP_ATOMIC);
1024         if (!bcb)
1025                 goto enomem;
1026         tn->bcbearer = bcb;
1027
1028         bcb->bearer.window = BCLINK_WIN_DEFAULT;
1029         bcb->bearer.mtu = MAX_PKT_DEFAULT_MCAST;
1030         bcb->bearer.identity = MAX_BEARERS;
1031
1032         bcb->bearer.media = &bcb->media;
1033         bcb->media.send_msg = tipc_bcbearer_send;
1034         sprintf(bcb->media.name, "tipc-broadcast");
1035         strcpy(bcb->bearer.name, bcb->media.name);
1036
1037         bb = kzalloc(sizeof(*bb), GFP_ATOMIC);
1038         if (!bb)
1039                 goto enomem;
1040         tn->bcbase = bb;
1041         __skb_queue_head_init(&bb->arrvq);
1042         spin_lock_init(&tipc_net(net)->bclock);
1043         bb->node.net = net;
1044
1045         if (!tipc_link_bc_create(&bb->node,
1046                                  MAX_PKT_DEFAULT_MCAST,
1047                                  BCLINK_WIN_DEFAULT,
1048                                  0,
1049                                  &bb->inputq,
1050                                  &bb->namedq,
1051                                  &l))
1052                 goto enomem;
1053         bb->link = l;
1054         tn->bcl = l;
1055         rcu_assign_pointer(tn->bearer_list[MAX_BEARERS], &bcb->bearer);
1056         return 0;
1057 enomem:
1058         kfree(bcb);
1059         kfree(bb);
1060         kfree(l);
1061         return -ENOMEM;
1062 }
1063
1064 void tipc_bcast_reinit(struct net *net)
1065 {
1066         struct tipc_bc_base *b = tipc_bc_base(net);
1067
1068         msg_set_prevnode(b->link->pmsg, tipc_own_addr(net));
1069 }
1070
1071 void tipc_bcast_stop(struct net *net)
1072 {
1073         struct tipc_net *tn = net_generic(net, tipc_net_id);
1074
1075         tipc_bclink_lock(net);
1076         tipc_link_purge_queues(tn->bcl);
1077         tipc_bclink_unlock(net);
1078         RCU_INIT_POINTER(tn->bearer_list[BCBEARER], NULL);
1079         synchronize_net();
1080         kfree(tn->bcbearer);
1081         kfree(tn->bcbase);
1082         kfree(tn->bcl);
1083 }
1084
1085 /**
1086  * tipc_nmap_add - add a node to a node map
1087  */
1088 static void tipc_nmap_add(struct tipc_node_map *nm_ptr, u32 node)
1089 {
1090         int n = tipc_node(node);
1091         int w = n / WSIZE;
1092         u32 mask = (1 << (n % WSIZE));
1093
1094         if ((nm_ptr->map[w] & mask) == 0) {
1095                 nm_ptr->count++;
1096                 nm_ptr->map[w] |= mask;
1097         }
1098 }
1099
1100 /**
1101  * tipc_nmap_remove - remove a node from a node map
1102  */
1103 static void tipc_nmap_remove(struct tipc_node_map *nm_ptr, u32 node)
1104 {
1105         int n = tipc_node(node);
1106         int w = n / WSIZE;
1107         u32 mask = (1 << (n % WSIZE));
1108
1109         if ((nm_ptr->map[w] & mask) != 0) {
1110                 nm_ptr->map[w] &= ~mask;
1111                 nm_ptr->count--;
1112         }
1113 }
1114
1115 /**
1116  * tipc_nmap_diff - find differences between node maps
1117  * @nm_a: input node map A
1118  * @nm_b: input node map B
1119  * @nm_diff: output node map A-B (i.e. nodes of A that are not in B)
1120  */
1121 static void tipc_nmap_diff(struct tipc_node_map *nm_a,
1122                            struct tipc_node_map *nm_b,
1123                            struct tipc_node_map *nm_diff)
1124 {
1125         int stop = ARRAY_SIZE(nm_a->map);
1126         int w;
1127         int b;
1128         u32 map;
1129
1130         memset(nm_diff, 0, sizeof(*nm_diff));
1131         for (w = 0; w < stop; w++) {
1132                 map = nm_a->map[w] ^ (nm_a->map[w] & nm_b->map[w]);
1133                 nm_diff->map[w] = map;
1134                 if (map != 0) {
1135                         for (b = 0 ; b < WSIZE; b++) {
1136                                 if (map & (1 << b))
1137                                         nm_diff->count++;
1138                         }
1139                 }
1140         }
1141 }