28bcd7be23c6d81ba34904387994fca265b4b679
[cascardo/linux.git] / net / tipc / node.c
1 /*
2  * net/tipc/node.c: TIPC node management routines
3  *
4  * Copyright (c) 2000-2006, 2012-2015, Ericsson AB
5  * Copyright (c) 2005-2006, 2010-2014, Wind River Systems
6  * All rights reserved.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in the
15  *    documentation and/or other materials provided with the distribution.
16  * 3. Neither the names of the copyright holders nor the names of its
17  *    contributors may be used to endorse or promote products derived from
18  *    this software without specific prior written permission.
19  *
20  * Alternatively, this software may be distributed under the terms of the
21  * GNU General Public License ("GPL") version 2 as published by the Free
22  * Software Foundation.
23  *
24  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
25  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
26  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
27  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
28  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
29  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
30  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
31  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
32  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
33  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
34  * POSSIBILITY OF SUCH DAMAGE.
35  */
36
37 #include "core.h"
38 #include "link.h"
39 #include "node.h"
40 #include "name_distr.h"
41 #include "socket.h"
42 #include "bcast.h"
43 #include "discover.h"
44
45 /* Node FSM states and events:
46  */
47 enum {
48         SELF_DOWN_PEER_DOWN    = 0xdd,
49         SELF_UP_PEER_UP        = 0xaa,
50         SELF_DOWN_PEER_LEAVING = 0xd1,
51         SELF_UP_PEER_COMING    = 0xac,
52         SELF_COMING_PEER_UP    = 0xca,
53         SELF_LEAVING_PEER_DOWN = 0x1d,
54         NODE_FAILINGOVER       = 0xf0,
55         NODE_SYNCHING          = 0xcc
56 };
57
58 enum {
59         SELF_ESTABL_CONTACT_EVT = 0xece,
60         SELF_LOST_CONTACT_EVT   = 0x1ce,
61         PEER_ESTABL_CONTACT_EVT = 0x9ece,
62         PEER_LOST_CONTACT_EVT   = 0x91ce,
63         NODE_FAILOVER_BEGIN_EVT = 0xfbe,
64         NODE_FAILOVER_END_EVT   = 0xfee,
65         NODE_SYNCH_BEGIN_EVT    = 0xcbe,
66         NODE_SYNCH_END_EVT      = 0xcee
67 };
68
69 static void __tipc_node_link_down(struct tipc_node *n, int *bearer_id,
70                                   struct sk_buff_head *xmitq,
71                                   struct tipc_media_addr **maddr);
72 static void tipc_node_link_down(struct tipc_node *n, int bearer_id,
73                                 bool delete);
74 static void node_lost_contact(struct tipc_node *n, struct sk_buff_head *inputq);
75 static void node_established_contact(struct tipc_node *n_ptr);
76 static void tipc_node_delete(struct tipc_node *node);
77 static void tipc_node_timeout(unsigned long data);
78 static void tipc_node_fsm_evt(struct tipc_node *n, int evt);
79
80 struct tipc_sock_conn {
81         u32 port;
82         u32 peer_port;
83         u32 peer_node;
84         struct list_head list;
85 };
86
87 static const struct nla_policy tipc_nl_node_policy[TIPC_NLA_NODE_MAX + 1] = {
88         [TIPC_NLA_NODE_UNSPEC]          = { .type = NLA_UNSPEC },
89         [TIPC_NLA_NODE_ADDR]            = { .type = NLA_U32 },
90         [TIPC_NLA_NODE_UP]              = { .type = NLA_FLAG }
91 };
92
93 /*
94  * A trivial power-of-two bitmask technique is used for speed, since this
95  * operation is done for every incoming TIPC packet. The number of hash table
96  * entries has been chosen so that no hash chain exceeds 8 nodes and will
97  * usually be much smaller (typically only a single node).
98  */
99 static unsigned int tipc_hashfn(u32 addr)
100 {
101         return addr & (NODE_HTABLE_SIZE - 1);
102 }
103
104 static void tipc_node_kref_release(struct kref *kref)
105 {
106         struct tipc_node *node = container_of(kref, struct tipc_node, kref);
107
108         tipc_node_delete(node);
109 }
110
111 void tipc_node_put(struct tipc_node *node)
112 {
113         kref_put(&node->kref, tipc_node_kref_release);
114 }
115
116 static void tipc_node_get(struct tipc_node *node)
117 {
118         kref_get(&node->kref);
119 }
120
121 /*
122  * tipc_node_find - locate specified node object, if it exists
123  */
124 struct tipc_node *tipc_node_find(struct net *net, u32 addr)
125 {
126         struct tipc_net *tn = net_generic(net, tipc_net_id);
127         struct tipc_node *node;
128
129         if (unlikely(!in_own_cluster_exact(net, addr)))
130                 return NULL;
131
132         rcu_read_lock();
133         hlist_for_each_entry_rcu(node, &tn->node_htable[tipc_hashfn(addr)],
134                                  hash) {
135                 if (node->addr == addr) {
136                         tipc_node_get(node);
137                         rcu_read_unlock();
138                         return node;
139                 }
140         }
141         rcu_read_unlock();
142         return NULL;
143 }
144
145 struct tipc_node *tipc_node_create(struct net *net, u32 addr, u16 capabilities)
146 {
147         struct tipc_net *tn = net_generic(net, tipc_net_id);
148         struct tipc_node *n_ptr, *temp_node;
149
150         spin_lock_bh(&tn->node_list_lock);
151         n_ptr = tipc_node_find(net, addr);
152         if (n_ptr)
153                 goto exit;
154         n_ptr = kzalloc(sizeof(*n_ptr), GFP_ATOMIC);
155         if (!n_ptr) {
156                 pr_warn("Node creation failed, no memory\n");
157                 goto exit;
158         }
159         n_ptr->addr = addr;
160         n_ptr->net = net;
161         n_ptr->capabilities = capabilities;
162         kref_init(&n_ptr->kref);
163         spin_lock_init(&n_ptr->lock);
164         INIT_HLIST_NODE(&n_ptr->hash);
165         INIT_LIST_HEAD(&n_ptr->list);
166         INIT_LIST_HEAD(&n_ptr->publ_list);
167         INIT_LIST_HEAD(&n_ptr->conn_sks);
168         skb_queue_head_init(&n_ptr->bclink.namedq);
169         __skb_queue_head_init(&n_ptr->bclink.deferdq);
170         hlist_add_head_rcu(&n_ptr->hash, &tn->node_htable[tipc_hashfn(addr)]);
171         list_for_each_entry_rcu(temp_node, &tn->node_list, list) {
172                 if (n_ptr->addr < temp_node->addr)
173                         break;
174         }
175         list_add_tail_rcu(&n_ptr->list, &temp_node->list);
176         n_ptr->state = SELF_DOWN_PEER_LEAVING;
177         n_ptr->signature = INVALID_NODE_SIG;
178         n_ptr->active_links[0] = INVALID_BEARER_ID;
179         n_ptr->active_links[1] = INVALID_BEARER_ID;
180         tipc_node_get(n_ptr);
181         setup_timer(&n_ptr->timer, tipc_node_timeout, (unsigned long)n_ptr);
182         n_ptr->keepalive_intv = U32_MAX;
183 exit:
184         spin_unlock_bh(&tn->node_list_lock);
185         return n_ptr;
186 }
187
188 static void tipc_node_calculate_timer(struct tipc_node *n, struct tipc_link *l)
189 {
190         unsigned long tol = l->tolerance;
191         unsigned long intv = ((tol / 4) > 500) ? 500 : tol / 4;
192         unsigned long keepalive_intv = msecs_to_jiffies(intv);
193
194         /* Link with lowest tolerance determines timer interval */
195         if (keepalive_intv < n->keepalive_intv)
196                 n->keepalive_intv = keepalive_intv;
197
198         /* Ensure link's abort limit corresponds to current interval */
199         l->abort_limit = l->tolerance / jiffies_to_msecs(n->keepalive_intv);
200 }
201
202 static void tipc_node_delete(struct tipc_node *node)
203 {
204         list_del_rcu(&node->list);
205         hlist_del_rcu(&node->hash);
206         kfree_rcu(node, rcu);
207 }
208
209 void tipc_node_stop(struct net *net)
210 {
211         struct tipc_net *tn = net_generic(net, tipc_net_id);
212         struct tipc_node *node, *t_node;
213
214         spin_lock_bh(&tn->node_list_lock);
215         list_for_each_entry_safe(node, t_node, &tn->node_list, list) {
216                 if (del_timer(&node->timer))
217                         tipc_node_put(node);
218                 tipc_node_put(node);
219         }
220         spin_unlock_bh(&tn->node_list_lock);
221 }
222
223 int tipc_node_add_conn(struct net *net, u32 dnode, u32 port, u32 peer_port)
224 {
225         struct tipc_node *node;
226         struct tipc_sock_conn *conn;
227         int err = 0;
228
229         if (in_own_node(net, dnode))
230                 return 0;
231
232         node = tipc_node_find(net, dnode);
233         if (!node) {
234                 pr_warn("Connecting sock to node 0x%x failed\n", dnode);
235                 return -EHOSTUNREACH;
236         }
237         conn = kmalloc(sizeof(*conn), GFP_ATOMIC);
238         if (!conn) {
239                 err = -EHOSTUNREACH;
240                 goto exit;
241         }
242         conn->peer_node = dnode;
243         conn->port = port;
244         conn->peer_port = peer_port;
245
246         tipc_node_lock(node);
247         list_add_tail(&conn->list, &node->conn_sks);
248         tipc_node_unlock(node);
249 exit:
250         tipc_node_put(node);
251         return err;
252 }
253
254 void tipc_node_remove_conn(struct net *net, u32 dnode, u32 port)
255 {
256         struct tipc_node *node;
257         struct tipc_sock_conn *conn, *safe;
258
259         if (in_own_node(net, dnode))
260                 return;
261
262         node = tipc_node_find(net, dnode);
263         if (!node)
264                 return;
265
266         tipc_node_lock(node);
267         list_for_each_entry_safe(conn, safe, &node->conn_sks, list) {
268                 if (port != conn->port)
269                         continue;
270                 list_del(&conn->list);
271                 kfree(conn);
272         }
273         tipc_node_unlock(node);
274         tipc_node_put(node);
275 }
276
277 /* tipc_node_timeout - handle expiration of node timer
278  */
279 static void tipc_node_timeout(unsigned long data)
280 {
281         struct tipc_node *n = (struct tipc_node *)data;
282         struct tipc_link_entry *le;
283         struct sk_buff_head xmitq;
284         int bearer_id;
285         int rc = 0;
286
287         __skb_queue_head_init(&xmitq);
288
289         for (bearer_id = 0; bearer_id < MAX_BEARERS; bearer_id++) {
290                 tipc_node_lock(n);
291                 le = &n->links[bearer_id];
292                 if (le->link) {
293                         /* Link tolerance may change asynchronously: */
294                         tipc_node_calculate_timer(n, le->link);
295                         rc = tipc_link_timeout(le->link, &xmitq);
296                 }
297                 tipc_node_unlock(n);
298                 tipc_bearer_xmit(n->net, bearer_id, &xmitq, &le->maddr);
299                 if (rc & TIPC_LINK_DOWN_EVT)
300                         tipc_node_link_down(n, bearer_id, false);
301         }
302         if (!mod_timer(&n->timer, jiffies + n->keepalive_intv))
303                 tipc_node_get(n);
304         tipc_node_put(n);
305 }
306
307 /**
308  * __tipc_node_link_up - handle addition of link
309  * Node lock must be held by caller
310  * Link becomes active (alone or shared) or standby, depending on its priority.
311  */
312 static void __tipc_node_link_up(struct tipc_node *n, int bearer_id,
313                                 struct sk_buff_head *xmitq)
314 {
315         int *slot0 = &n->active_links[0];
316         int *slot1 = &n->active_links[1];
317         struct tipc_link *ol = node_active_link(n, 0);
318         struct tipc_link *nl = n->links[bearer_id].link;
319
320         if (!nl)
321                 return;
322
323         tipc_link_fsm_evt(nl, LINK_ESTABLISH_EVT);
324         if (!tipc_link_is_up(nl))
325                 return;
326
327         n->working_links++;
328         n->action_flags |= TIPC_NOTIFY_LINK_UP;
329         n->link_id = nl->peer_bearer_id << 16 | bearer_id;
330
331         /* Leave room for tunnel header when returning 'mtu' to users: */
332         n->links[bearer_id].mtu = nl->mtu - INT_H_SIZE;
333
334         tipc_bearer_add_dest(n->net, bearer_id, n->addr);
335
336         pr_debug("Established link <%s> on network plane %c\n",
337                  nl->name, nl->net_plane);
338
339         /* First link? => give it both slots */
340         if (!ol) {
341                 *slot0 = bearer_id;
342                 *slot1 = bearer_id;
343                 tipc_link_build_bcast_sync_msg(nl, xmitq);
344                 node_established_contact(n);
345                 return;
346         }
347
348         /* Second link => redistribute slots */
349         if (nl->priority > ol->priority) {
350                 pr_debug("Old link <%s> becomes standby\n", ol->name);
351                 *slot0 = bearer_id;
352                 *slot1 = bearer_id;
353         } else if (nl->priority == ol->priority) {
354                 *slot0 = bearer_id;
355         } else {
356                 pr_debug("New link <%s> is standby\n", nl->name);
357         }
358
359         /* Prepare synchronization with first link */
360         tipc_link_tnl_prepare(ol, nl, SYNCH_MSG, xmitq);
361 }
362
363 /**
364  * tipc_node_link_up - handle addition of link
365  *
366  * Link becomes active (alone or shared) or standby, depending on its priority.
367  */
368 static void tipc_node_link_up(struct tipc_node *n, int bearer_id,
369                               struct sk_buff_head *xmitq)
370 {
371         tipc_node_lock(n);
372         __tipc_node_link_up(n, bearer_id, xmitq);
373         tipc_node_unlock(n);
374 }
375
376 /**
377  * __tipc_node_link_down - handle loss of link
378  */
379 static void __tipc_node_link_down(struct tipc_node *n, int *bearer_id,
380                                   struct sk_buff_head *xmitq,
381                                   struct tipc_media_addr **maddr)
382 {
383         struct tipc_link_entry *le = &n->links[*bearer_id];
384         int *slot0 = &n->active_links[0];
385         int *slot1 = &n->active_links[1];
386         int i, highest = 0;
387         struct tipc_link *l, *_l, *tnl;
388
389         l = n->links[*bearer_id].link;
390         if (!l || tipc_link_is_reset(l))
391                 return;
392
393         n->working_links--;
394         n->action_flags |= TIPC_NOTIFY_LINK_DOWN;
395         n->link_id = l->peer_bearer_id << 16 | *bearer_id;
396
397         tipc_bearer_remove_dest(n->net, *bearer_id, n->addr);
398
399         pr_debug("Lost link <%s> on network plane %c\n",
400                  l->name, l->net_plane);
401
402         /* Select new active link if any available */
403         *slot0 = INVALID_BEARER_ID;
404         *slot1 = INVALID_BEARER_ID;
405         for (i = 0; i < MAX_BEARERS; i++) {
406                 _l = n->links[i].link;
407                 if (!_l || !tipc_link_is_up(_l))
408                         continue;
409                 if (_l == l)
410                         continue;
411                 if (_l->priority < highest)
412                         continue;
413                 if (_l->priority > highest) {
414                         highest = _l->priority;
415                         *slot0 = i;
416                         *slot1 = i;
417                         continue;
418                 }
419                 *slot1 = i;
420         }
421
422         if (!tipc_node_is_up(n)) {
423                 if (tipc_link_peer_is_down(l))
424                         tipc_node_fsm_evt(n, PEER_LOST_CONTACT_EVT);
425                 tipc_node_fsm_evt(n, SELF_LOST_CONTACT_EVT);
426                 tipc_link_fsm_evt(l, LINK_RESET_EVT);
427                 tipc_link_reset(l);
428                 tipc_link_build_reset_msg(l, xmitq);
429                 *maddr = &n->links[*bearer_id].maddr;
430                 node_lost_contact(n, &le->inputq);
431                 return;
432         }
433
434         /* There is still a working link => initiate failover */
435         tnl = node_active_link(n, 0);
436         tipc_link_fsm_evt(tnl, LINK_SYNCH_END_EVT);
437         tipc_node_fsm_evt(n, NODE_SYNCH_END_EVT);
438         n->sync_point = tnl->rcv_nxt + (U16_MAX / 2 - 1);
439         tipc_link_tnl_prepare(l, tnl, FAILOVER_MSG, xmitq);
440         tipc_link_reset(l);
441         tipc_link_fsm_evt(l, LINK_RESET_EVT);
442         tipc_link_fsm_evt(l, LINK_FAILOVER_BEGIN_EVT);
443         tipc_node_fsm_evt(n, NODE_FAILOVER_BEGIN_EVT);
444         *maddr = &n->links[tnl->bearer_id].maddr;
445         *bearer_id = tnl->bearer_id;
446 }
447
448 static void tipc_node_link_down(struct tipc_node *n, int bearer_id, bool delete)
449 {
450         struct tipc_link_entry *le = &n->links[bearer_id];
451         struct tipc_link *l = le->link;
452         struct tipc_media_addr *maddr;
453         struct sk_buff_head xmitq;
454
455         if (!l)
456                 return;
457
458         __skb_queue_head_init(&xmitq);
459
460         tipc_node_lock(n);
461         if (!tipc_link_is_establishing(l)) {
462                 __tipc_node_link_down(n, &bearer_id, &xmitq, &maddr);
463                 if (delete) {
464                         kfree(l);
465                         le->link = NULL;
466                         n->link_cnt--;
467                 }
468         } else {
469                 /* Defuse pending tipc_node_link_up() */
470                 tipc_link_fsm_evt(l, LINK_RESET_EVT);
471         }
472         tipc_node_unlock(n);
473         tipc_bearer_xmit(n->net, bearer_id, &xmitq, maddr);
474         tipc_sk_rcv(n->net, &le->inputq);
475 }
476
477 bool tipc_node_is_up(struct tipc_node *n)
478 {
479         return n->active_links[0] != INVALID_BEARER_ID;
480 }
481
482 void tipc_node_check_dest(struct net *net, u32 onode,
483                           struct tipc_bearer *b,
484                           u16 capabilities, u32 signature,
485                           struct tipc_media_addr *maddr,
486                           bool *respond, bool *dupl_addr)
487 {
488         struct tipc_node *n;
489         struct tipc_link *l;
490         struct tipc_link_entry *le;
491         bool addr_match = false;
492         bool sign_match = false;
493         bool link_up = false;
494         bool accept_addr = false;
495         bool reset = true;
496         char *if_name;
497
498         *dupl_addr = false;
499         *respond = false;
500
501         n = tipc_node_create(net, onode, capabilities);
502         if (!n)
503                 return;
504
505         tipc_node_lock(n);
506
507         le = &n->links[b->identity];
508
509         /* Prepare to validate requesting node's signature and media address */
510         l = le->link;
511         link_up = l && tipc_link_is_up(l);
512         addr_match = l && !memcmp(&le->maddr, maddr, sizeof(*maddr));
513         sign_match = (signature == n->signature);
514
515         /* These three flags give us eight permutations: */
516
517         if (sign_match && addr_match && link_up) {
518                 /* All is fine. Do nothing. */
519                 reset = false;
520         } else if (sign_match && addr_match && !link_up) {
521                 /* Respond. The link will come up in due time */
522                 *respond = true;
523         } else if (sign_match && !addr_match && link_up) {
524                 /* Peer has changed i/f address without rebooting.
525                  * If so, the link will reset soon, and the next
526                  * discovery will be accepted. So we can ignore it.
527                  * It may also be an cloned or malicious peer having
528                  * chosen the same node address and signature as an
529                  * existing one.
530                  * Ignore requests until the link goes down, if ever.
531                  */
532                 *dupl_addr = true;
533         } else if (sign_match && !addr_match && !link_up) {
534                 /* Peer link has changed i/f address without rebooting.
535                  * It may also be a cloned or malicious peer; we can't
536                  * distinguish between the two.
537                  * The signature is correct, so we must accept.
538                  */
539                 accept_addr = true;
540                 *respond = true;
541         } else if (!sign_match && addr_match && link_up) {
542                 /* Peer node rebooted. Two possibilities:
543                  *  - Delayed re-discovery; this link endpoint has already
544                  *    reset and re-established contact with the peer, before
545                  *    receiving a discovery message from that node.
546                  *    (The peer happened to receive one from this node first).
547                  *  - The peer came back so fast that our side has not
548                  *    discovered it yet. Probing from this side will soon
549                  *    reset the link, since there can be no working link
550                  *    endpoint at the peer end, and the link will re-establish.
551                  *  Accept the signature, since it comes from a known peer.
552                  */
553                 n->signature = signature;
554         } else if (!sign_match && addr_match && !link_up) {
555                 /*  The peer node has rebooted.
556                  *  Accept signature, since it is a known peer.
557                  */
558                 n->signature = signature;
559                 *respond = true;
560         } else if (!sign_match && !addr_match && link_up) {
561                 /* Peer rebooted with new address, or a new/duplicate peer.
562                  * Ignore until the link goes down, if ever.
563                  */
564                 *dupl_addr = true;
565         } else if (!sign_match && !addr_match && !link_up) {
566                 /* Peer rebooted with new address, or it is a new peer.
567                  * Accept signature and address.
568                  */
569                 n->signature = signature;
570                 accept_addr = true;
571                 *respond = true;
572         }
573
574         if (!accept_addr)
575                 goto exit;
576
577         /* Now create new link if not already existing */
578         if (!l) {
579                 if (n->link_cnt == 2) {
580                         pr_warn("Cannot establish 3rd link to %x\n", n->addr);
581                         goto exit;
582                 }
583                 if_name = strchr(b->name, ':') + 1;
584                 if (!tipc_link_create(n, if_name, b->identity, b->tolerance,
585                                       b->net_plane, b->mtu, b->priority,
586                                       b->window, mod(tipc_net(net)->random),
587                                       tipc_own_addr(net), onode,
588                                       n->capabilities,
589                                       &le->maddr, &le->inputq,
590                                       &n->bclink.namedq, &l)) {
591                         *respond = false;
592                         goto exit;
593                 }
594                 tipc_link_reset(l);
595                 tipc_link_fsm_evt(l, LINK_RESET_EVT);
596                 if (n->state == NODE_FAILINGOVER)
597                         tipc_link_fsm_evt(l, LINK_FAILOVER_BEGIN_EVT);
598                 le->link = l;
599                 n->link_cnt++;
600                 tipc_node_calculate_timer(n, l);
601                 if (n->link_cnt == 1)
602                         if (!mod_timer(&n->timer, jiffies + n->keepalive_intv))
603                                 tipc_node_get(n);
604         }
605         memcpy(&le->maddr, maddr, sizeof(*maddr));
606 exit:
607         tipc_node_unlock(n);
608         if (reset && !tipc_link_is_reset(l))
609                 tipc_node_link_down(n, b->identity, false);
610         tipc_node_put(n);
611 }
612
613 void tipc_node_delete_links(struct net *net, int bearer_id)
614 {
615         struct tipc_net *tn = net_generic(net, tipc_net_id);
616         struct tipc_node *n;
617
618         rcu_read_lock();
619         list_for_each_entry_rcu(n, &tn->node_list, list) {
620                 tipc_node_link_down(n, bearer_id, true);
621         }
622         rcu_read_unlock();
623 }
624
625 static void tipc_node_reset_links(struct tipc_node *n)
626 {
627         char addr_string[16];
628         int i;
629
630         pr_warn("Resetting all links to %s\n",
631                 tipc_addr_string_fill(addr_string, n->addr));
632
633         for (i = 0; i < MAX_BEARERS; i++) {
634                 tipc_node_link_down(n, i, false);
635         }
636 }
637
638 /* tipc_node_fsm_evt - node finite state machine
639  * Determines when contact is allowed with peer node
640  */
641 static void tipc_node_fsm_evt(struct tipc_node *n, int evt)
642 {
643         int state = n->state;
644
645         switch (state) {
646         case SELF_DOWN_PEER_DOWN:
647                 switch (evt) {
648                 case SELF_ESTABL_CONTACT_EVT:
649                         state = SELF_UP_PEER_COMING;
650                         break;
651                 case PEER_ESTABL_CONTACT_EVT:
652                         state = SELF_COMING_PEER_UP;
653                         break;
654                 case SELF_LOST_CONTACT_EVT:
655                 case PEER_LOST_CONTACT_EVT:
656                         break;
657                 case NODE_SYNCH_END_EVT:
658                 case NODE_SYNCH_BEGIN_EVT:
659                 case NODE_FAILOVER_BEGIN_EVT:
660                 case NODE_FAILOVER_END_EVT:
661                 default:
662                         goto illegal_evt;
663                 }
664                 break;
665         case SELF_UP_PEER_UP:
666                 switch (evt) {
667                 case SELF_LOST_CONTACT_EVT:
668                         state = SELF_DOWN_PEER_LEAVING;
669                         break;
670                 case PEER_LOST_CONTACT_EVT:
671                         state = SELF_LEAVING_PEER_DOWN;
672                         break;
673                 case NODE_SYNCH_BEGIN_EVT:
674                         state = NODE_SYNCHING;
675                         break;
676                 case NODE_FAILOVER_BEGIN_EVT:
677                         state = NODE_FAILINGOVER;
678                         break;
679                 case SELF_ESTABL_CONTACT_EVT:
680                 case PEER_ESTABL_CONTACT_EVT:
681                 case NODE_SYNCH_END_EVT:
682                 case NODE_FAILOVER_END_EVT:
683                         break;
684                 default:
685                         goto illegal_evt;
686                 }
687                 break;
688         case SELF_DOWN_PEER_LEAVING:
689                 switch (evt) {
690                 case PEER_LOST_CONTACT_EVT:
691                         state = SELF_DOWN_PEER_DOWN;
692                         break;
693                 case SELF_ESTABL_CONTACT_EVT:
694                 case PEER_ESTABL_CONTACT_EVT:
695                 case SELF_LOST_CONTACT_EVT:
696                         break;
697                 case NODE_SYNCH_END_EVT:
698                 case NODE_SYNCH_BEGIN_EVT:
699                 case NODE_FAILOVER_BEGIN_EVT:
700                 case NODE_FAILOVER_END_EVT:
701                 default:
702                         goto illegal_evt;
703                 }
704                 break;
705         case SELF_UP_PEER_COMING:
706                 switch (evt) {
707                 case PEER_ESTABL_CONTACT_EVT:
708                         state = SELF_UP_PEER_UP;
709                         break;
710                 case SELF_LOST_CONTACT_EVT:
711                         state = SELF_DOWN_PEER_LEAVING;
712                         break;
713                 case SELF_ESTABL_CONTACT_EVT:
714                 case PEER_LOST_CONTACT_EVT:
715                 case NODE_SYNCH_END_EVT:
716                 case NODE_FAILOVER_BEGIN_EVT:
717                         break;
718                 case NODE_SYNCH_BEGIN_EVT:
719                 case NODE_FAILOVER_END_EVT:
720                 default:
721                         goto illegal_evt;
722                 }
723                 break;
724         case SELF_COMING_PEER_UP:
725                 switch (evt) {
726                 case SELF_ESTABL_CONTACT_EVT:
727                         state = SELF_UP_PEER_UP;
728                         break;
729                 case PEER_LOST_CONTACT_EVT:
730                         state = SELF_LEAVING_PEER_DOWN;
731                         break;
732                 case SELF_LOST_CONTACT_EVT:
733                 case PEER_ESTABL_CONTACT_EVT:
734                         break;
735                 case NODE_SYNCH_END_EVT:
736                 case NODE_SYNCH_BEGIN_EVT:
737                 case NODE_FAILOVER_BEGIN_EVT:
738                 case NODE_FAILOVER_END_EVT:
739                 default:
740                         goto illegal_evt;
741                 }
742                 break;
743         case SELF_LEAVING_PEER_DOWN:
744                 switch (evt) {
745                 case SELF_LOST_CONTACT_EVT:
746                         state = SELF_DOWN_PEER_DOWN;
747                         break;
748                 case SELF_ESTABL_CONTACT_EVT:
749                 case PEER_ESTABL_CONTACT_EVT:
750                 case PEER_LOST_CONTACT_EVT:
751                         break;
752                 case NODE_SYNCH_END_EVT:
753                 case NODE_SYNCH_BEGIN_EVT:
754                 case NODE_FAILOVER_BEGIN_EVT:
755                 case NODE_FAILOVER_END_EVT:
756                 default:
757                         goto illegal_evt;
758                 }
759                 break;
760         case NODE_FAILINGOVER:
761                 switch (evt) {
762                 case SELF_LOST_CONTACT_EVT:
763                         state = SELF_DOWN_PEER_LEAVING;
764                         break;
765                 case PEER_LOST_CONTACT_EVT:
766                         state = SELF_LEAVING_PEER_DOWN;
767                         break;
768                 case NODE_FAILOVER_END_EVT:
769                         state = SELF_UP_PEER_UP;
770                         break;
771                 case NODE_FAILOVER_BEGIN_EVT:
772                 case SELF_ESTABL_CONTACT_EVT:
773                 case PEER_ESTABL_CONTACT_EVT:
774                         break;
775                 case NODE_SYNCH_BEGIN_EVT:
776                 case NODE_SYNCH_END_EVT:
777                 default:
778                         goto illegal_evt;
779                 }
780                 break;
781         case NODE_SYNCHING:
782                 switch (evt) {
783                 case SELF_LOST_CONTACT_EVT:
784                         state = SELF_DOWN_PEER_LEAVING;
785                         break;
786                 case PEER_LOST_CONTACT_EVT:
787                         state = SELF_LEAVING_PEER_DOWN;
788                         break;
789                 case NODE_SYNCH_END_EVT:
790                         state = SELF_UP_PEER_UP;
791                         break;
792                 case NODE_FAILOVER_BEGIN_EVT:
793                         state = NODE_FAILINGOVER;
794                         break;
795                 case NODE_SYNCH_BEGIN_EVT:
796                 case SELF_ESTABL_CONTACT_EVT:
797                 case PEER_ESTABL_CONTACT_EVT:
798                         break;
799                 case NODE_FAILOVER_END_EVT:
800                 default:
801                         goto illegal_evt;
802                 }
803                 break;
804         default:
805                 pr_err("Unknown node fsm state %x\n", state);
806                 break;
807         }
808         n->state = state;
809         return;
810
811 illegal_evt:
812         pr_err("Illegal node fsm evt %x in state %x\n", evt, state);
813 }
814
815 bool tipc_node_filter_pkt(struct tipc_node *n, struct tipc_msg *hdr)
816 {
817         int state = n->state;
818
819         if (likely(state == SELF_UP_PEER_UP))
820                 return true;
821
822         if (state == SELF_LEAVING_PEER_DOWN)
823                 return false;
824
825         if (state == SELF_DOWN_PEER_LEAVING) {
826                 if (msg_peer_node_is_up(hdr))
827                         return false;
828         }
829
830         return true;
831 }
832
833 static void node_established_contact(struct tipc_node *n_ptr)
834 {
835         tipc_node_fsm_evt(n_ptr, SELF_ESTABL_CONTACT_EVT);
836         n_ptr->action_flags |= TIPC_NOTIFY_NODE_UP;
837         n_ptr->bclink.oos_state = 0;
838         n_ptr->bclink.acked = tipc_bclink_get_last_sent(n_ptr->net);
839         tipc_bclink_add_node(n_ptr->net, n_ptr->addr);
840 }
841
842 static void node_lost_contact(struct tipc_node *n_ptr,
843                               struct sk_buff_head *inputq)
844 {
845         char addr_string[16];
846         struct tipc_sock_conn *conn, *safe;
847         struct tipc_link *l;
848         struct list_head *conns = &n_ptr->conn_sks;
849         struct sk_buff *skb;
850         struct tipc_net *tn = net_generic(n_ptr->net, tipc_net_id);
851         uint i;
852
853         pr_debug("Lost contact with %s\n",
854                  tipc_addr_string_fill(addr_string, n_ptr->addr));
855
856         /* Flush broadcast link info associated with lost node */
857         if (n_ptr->bclink.recv_permitted) {
858                 __skb_queue_purge(&n_ptr->bclink.deferdq);
859
860                 if (n_ptr->bclink.reasm_buf) {
861                         kfree_skb(n_ptr->bclink.reasm_buf);
862                         n_ptr->bclink.reasm_buf = NULL;
863                 }
864
865                 tipc_bclink_remove_node(n_ptr->net, n_ptr->addr);
866                 tipc_bclink_acknowledge(n_ptr, INVALID_LINK_SEQ);
867
868                 n_ptr->bclink.recv_permitted = false;
869         }
870
871         /* Abort any ongoing link failover */
872         for (i = 0; i < MAX_BEARERS; i++) {
873                 l = n_ptr->links[i].link;
874                 if (l)
875                         tipc_link_fsm_evt(l, LINK_FAILOVER_END_EVT);
876         }
877
878         /* Notify publications from this node */
879         n_ptr->action_flags |= TIPC_NOTIFY_NODE_DOWN;
880
881         /* Notify sockets connected to node */
882         list_for_each_entry_safe(conn, safe, conns, list) {
883                 skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, TIPC_CONN_MSG,
884                                       SHORT_H_SIZE, 0, tn->own_addr,
885                                       conn->peer_node, conn->port,
886                                       conn->peer_port, TIPC_ERR_NO_NODE);
887                 if (likely(skb))
888                         skb_queue_tail(inputq, skb);
889                 list_del(&conn->list);
890                 kfree(conn);
891         }
892 }
893
894 /**
895  * tipc_node_get_linkname - get the name of a link
896  *
897  * @bearer_id: id of the bearer
898  * @node: peer node address
899  * @linkname: link name output buffer
900  *
901  * Returns 0 on success
902  */
903 int tipc_node_get_linkname(struct net *net, u32 bearer_id, u32 addr,
904                            char *linkname, size_t len)
905 {
906         struct tipc_link *link;
907         int err = -EINVAL;
908         struct tipc_node *node = tipc_node_find(net, addr);
909
910         if (!node)
911                 return err;
912
913         if (bearer_id >= MAX_BEARERS)
914                 goto exit;
915
916         tipc_node_lock(node);
917         link = node->links[bearer_id].link;
918         if (link) {
919                 strncpy(linkname, link->name, len);
920                 err = 0;
921         }
922 exit:
923         tipc_node_unlock(node);
924         tipc_node_put(node);
925         return err;
926 }
927
928 void tipc_node_unlock(struct tipc_node *node)
929 {
930         struct net *net = node->net;
931         u32 addr = 0;
932         u32 flags = node->action_flags;
933         u32 link_id = 0;
934         struct list_head *publ_list;
935
936         if (likely(!flags)) {
937                 spin_unlock_bh(&node->lock);
938                 return;
939         }
940
941         addr = node->addr;
942         link_id = node->link_id;
943         publ_list = &node->publ_list;
944
945         node->action_flags &= ~(TIPC_NOTIFY_NODE_DOWN | TIPC_NOTIFY_NODE_UP |
946                                 TIPC_NOTIFY_LINK_DOWN | TIPC_NOTIFY_LINK_UP |
947                                 TIPC_WAKEUP_BCAST_USERS | TIPC_BCAST_MSG_EVT |
948                                 TIPC_BCAST_RESET);
949
950         spin_unlock_bh(&node->lock);
951
952         if (flags & TIPC_NOTIFY_NODE_DOWN)
953                 tipc_publ_notify(net, publ_list, addr);
954
955         if (flags & TIPC_WAKEUP_BCAST_USERS)
956                 tipc_bclink_wakeup_users(net);
957
958         if (flags & TIPC_NOTIFY_NODE_UP)
959                 tipc_named_node_up(net, addr);
960
961         if (flags & TIPC_NOTIFY_LINK_UP)
962                 tipc_nametbl_publish(net, TIPC_LINK_STATE, addr, addr,
963                                      TIPC_NODE_SCOPE, link_id, addr);
964
965         if (flags & TIPC_NOTIFY_LINK_DOWN)
966                 tipc_nametbl_withdraw(net, TIPC_LINK_STATE, addr,
967                                       link_id, addr);
968
969         if (flags & TIPC_BCAST_MSG_EVT)
970                 tipc_bclink_input(net);
971
972         if (flags & TIPC_BCAST_RESET)
973                 tipc_node_reset_links(node);
974 }
975
976 /* Caller should hold node lock for the passed node */
977 static int __tipc_nl_add_node(struct tipc_nl_msg *msg, struct tipc_node *node)
978 {
979         void *hdr;
980         struct nlattr *attrs;
981
982         hdr = genlmsg_put(msg->skb, msg->portid, msg->seq, &tipc_genl_family,
983                           NLM_F_MULTI, TIPC_NL_NODE_GET);
984         if (!hdr)
985                 return -EMSGSIZE;
986
987         attrs = nla_nest_start(msg->skb, TIPC_NLA_NODE);
988         if (!attrs)
989                 goto msg_full;
990
991         if (nla_put_u32(msg->skb, TIPC_NLA_NODE_ADDR, node->addr))
992                 goto attr_msg_full;
993         if (tipc_node_is_up(node))
994                 if (nla_put_flag(msg->skb, TIPC_NLA_NODE_UP))
995                         goto attr_msg_full;
996
997         nla_nest_end(msg->skb, attrs);
998         genlmsg_end(msg->skb, hdr);
999
1000         return 0;
1001
1002 attr_msg_full:
1003         nla_nest_cancel(msg->skb, attrs);
1004 msg_full:
1005         genlmsg_cancel(msg->skb, hdr);
1006
1007         return -EMSGSIZE;
1008 }
1009
1010 static struct tipc_link *tipc_node_select_link(struct tipc_node *n, int sel,
1011                                                int *bearer_id,
1012                                                struct tipc_media_addr **maddr)
1013 {
1014         int id = n->active_links[sel & 1];
1015
1016         if (unlikely(id < 0))
1017                 return NULL;
1018
1019         *bearer_id = id;
1020         *maddr = &n->links[id].maddr;
1021         return n->links[id].link;
1022 }
1023
1024 /**
1025  * tipc_node_xmit() is the general link level function for message sending
1026  * @net: the applicable net namespace
1027  * @list: chain of buffers containing message
1028  * @dnode: address of destination node
1029  * @selector: a number used for deterministic link selection
1030  * Consumes the buffer chain, except when returning -ELINKCONG
1031  * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE
1032  */
1033 int tipc_node_xmit(struct net *net, struct sk_buff_head *list,
1034                    u32 dnode, int selector)
1035 {
1036         struct tipc_link *l = NULL;
1037         struct tipc_node *n;
1038         struct sk_buff_head xmitq;
1039         struct tipc_media_addr *maddr;
1040         int bearer_id;
1041         int rc = -EHOSTUNREACH;
1042
1043         __skb_queue_head_init(&xmitq);
1044         n = tipc_node_find(net, dnode);
1045         if (likely(n)) {
1046                 tipc_node_lock(n);
1047                 l = tipc_node_select_link(n, selector, &bearer_id, &maddr);
1048                 if (likely(l))
1049                         rc = tipc_link_xmit(l, list, &xmitq);
1050                 tipc_node_unlock(n);
1051                 if (unlikely(rc == -ENOBUFS))
1052                         tipc_node_link_down(n, bearer_id, false);
1053                 tipc_node_put(n);
1054         }
1055         if (likely(!rc)) {
1056                 tipc_bearer_xmit(net, bearer_id, &xmitq, maddr);
1057                 return 0;
1058         }
1059         if (likely(in_own_node(net, dnode))) {
1060                 tipc_sk_rcv(net, list);
1061                 return 0;
1062         }
1063         return rc;
1064 }
1065
1066 /* tipc_node_xmit_skb(): send single buffer to destination
1067  * Buffers sent via this functon are generally TIPC_SYSTEM_IMPORTANCE
1068  * messages, which will not be rejected
1069  * The only exception is datagram messages rerouted after secondary
1070  * lookup, which are rare and safe to dispose of anyway.
1071  * TODO: Return real return value, and let callers use
1072  * tipc_wait_for_sendpkt() where applicable
1073  */
1074 int tipc_node_xmit_skb(struct net *net, struct sk_buff *skb, u32 dnode,
1075                        u32 selector)
1076 {
1077         struct sk_buff_head head;
1078         int rc;
1079
1080         skb_queue_head_init(&head);
1081         __skb_queue_tail(&head, skb);
1082         rc = tipc_node_xmit(net, &head, dnode, selector);
1083         if (rc == -ELINKCONG)
1084                 kfree_skb(skb);
1085         return 0;
1086 }
1087
1088 /**
1089  * tipc_node_check_state - check and if necessary update node state
1090  * @skb: TIPC packet
1091  * @bearer_id: identity of bearer delivering the packet
1092  * Returns true if state is ok, otherwise consumes buffer and returns false
1093  */
1094 static bool tipc_node_check_state(struct tipc_node *n, struct sk_buff *skb,
1095                                   int bearer_id, struct sk_buff_head *xmitq)
1096 {
1097         struct tipc_msg *hdr = buf_msg(skb);
1098         int usr = msg_user(hdr);
1099         int mtyp = msg_type(hdr);
1100         u16 oseqno = msg_seqno(hdr);
1101         u16 iseqno = msg_seqno(msg_get_wrapped(hdr));
1102         u16 exp_pkts = msg_msgcnt(hdr);
1103         u16 rcv_nxt, syncpt, dlv_nxt;
1104         int state = n->state;
1105         struct tipc_link *l, *tnl, *pl = NULL;
1106         struct tipc_media_addr *maddr;
1107         int i, pb_id;
1108
1109         l = n->links[bearer_id].link;
1110         if (!l)
1111                 return false;
1112         rcv_nxt = l->rcv_nxt;
1113
1114
1115         if (likely((state == SELF_UP_PEER_UP) && (usr != TUNNEL_PROTOCOL)))
1116                 return true;
1117
1118         /* Find parallel link, if any */
1119         for (i = 0; i < MAX_BEARERS; i++) {
1120                 if ((i != bearer_id) && n->links[i].link) {
1121                         pl = n->links[i].link;
1122                         break;
1123                 }
1124         }
1125
1126         /* Update node accesibility if applicable */
1127         if (state == SELF_UP_PEER_COMING) {
1128                 if (!tipc_link_is_up(l))
1129                         return true;
1130                 if (!msg_peer_link_is_up(hdr))
1131                         return true;
1132                 tipc_node_fsm_evt(n, PEER_ESTABL_CONTACT_EVT);
1133         }
1134
1135         if (state == SELF_DOWN_PEER_LEAVING) {
1136                 if (msg_peer_node_is_up(hdr))
1137                         return false;
1138                 tipc_node_fsm_evt(n, PEER_LOST_CONTACT_EVT);
1139         }
1140
1141         /* Ignore duplicate packets */
1142         if ((usr != LINK_PROTOCOL) && less(oseqno, rcv_nxt))
1143                 return true;
1144
1145         /* Initiate or update failover mode if applicable */
1146         if ((usr == TUNNEL_PROTOCOL) && (mtyp == FAILOVER_MSG)) {
1147                 syncpt = oseqno + exp_pkts - 1;
1148                 if (pl && tipc_link_is_up(pl)) {
1149                         pb_id = pl->bearer_id;
1150                         __tipc_node_link_down(n, &pb_id, xmitq, &maddr);
1151                         tipc_skb_queue_splice_tail_init(pl->inputq, l->inputq);
1152                 }
1153                 /* If pkts arrive out of order, use lowest calculated syncpt */
1154                 if (less(syncpt, n->sync_point))
1155                         n->sync_point = syncpt;
1156         }
1157
1158         /* Open parallel link when tunnel link reaches synch point */
1159         if ((n->state == NODE_FAILINGOVER) && tipc_link_is_up(l)) {
1160                 if (!more(rcv_nxt, n->sync_point))
1161                         return true;
1162                 tipc_node_fsm_evt(n, NODE_FAILOVER_END_EVT);
1163                 if (pl)
1164                         tipc_link_fsm_evt(pl, LINK_FAILOVER_END_EVT);
1165                 return true;
1166         }
1167
1168         /* No synching needed if only one link */
1169         if (!pl || !tipc_link_is_up(pl))
1170                 return true;
1171
1172         /* Initiate synch mode if applicable */
1173         if ((usr == TUNNEL_PROTOCOL) && (mtyp == SYNCH_MSG) && (oseqno == 1)) {
1174                 syncpt = iseqno + exp_pkts - 1;
1175                 if (!tipc_link_is_up(l)) {
1176                         tipc_link_fsm_evt(l, LINK_ESTABLISH_EVT);
1177                         __tipc_node_link_up(n, bearer_id, xmitq);
1178                 }
1179                 if (n->state == SELF_UP_PEER_UP) {
1180                         n->sync_point = syncpt;
1181                         tipc_link_fsm_evt(l, LINK_SYNCH_BEGIN_EVT);
1182                         tipc_node_fsm_evt(n, NODE_SYNCH_BEGIN_EVT);
1183                 }
1184                 if (less(syncpt, n->sync_point))
1185                         n->sync_point = syncpt;
1186         }
1187
1188         /* Open tunnel link when parallel link reaches synch point */
1189         if ((n->state == NODE_SYNCHING) && tipc_link_is_synching(l)) {
1190                 if (tipc_link_is_synching(l)) {
1191                         tnl = l;
1192                 } else {
1193                         tnl = pl;
1194                         pl = l;
1195                 }
1196                 dlv_nxt = pl->rcv_nxt - mod(skb_queue_len(pl->inputq));
1197                 if (more(dlv_nxt, n->sync_point)) {
1198                         tipc_link_fsm_evt(tnl, LINK_SYNCH_END_EVT);
1199                         tipc_node_fsm_evt(n, NODE_SYNCH_END_EVT);
1200                         return true;
1201                 }
1202                 if (l == pl)
1203                         return true;
1204                 if ((usr == TUNNEL_PROTOCOL) && (mtyp == SYNCH_MSG))
1205                         return true;
1206                 if (usr == LINK_PROTOCOL)
1207                         return true;
1208                 return false;
1209         }
1210         return true;
1211 }
1212
1213 /**
1214  * tipc_rcv - process TIPC packets/messages arriving from off-node
1215  * @net: the applicable net namespace
1216  * @skb: TIPC packet
1217  * @bearer: pointer to bearer message arrived on
1218  *
1219  * Invoked with no locks held. Bearer pointer must point to a valid bearer
1220  * structure (i.e. cannot be NULL), but bearer can be inactive.
1221  */
1222 void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b)
1223 {
1224         struct sk_buff_head xmitq;
1225         struct tipc_node *n;
1226         struct tipc_msg *hdr = buf_msg(skb);
1227         int usr = msg_user(hdr);
1228         int bearer_id = b->identity;
1229         struct tipc_link_entry *le;
1230         int rc = 0;
1231
1232         __skb_queue_head_init(&xmitq);
1233
1234         /* Ensure message is well-formed */
1235         if (unlikely(!tipc_msg_validate(skb)))
1236                 goto discard;
1237
1238         /* Handle arrival of a non-unicast link packet */
1239         if (unlikely(msg_non_seq(hdr))) {
1240                 if (usr ==  LINK_CONFIG)
1241                         tipc_disc_rcv(net, skb, b);
1242                 else
1243                         tipc_bclink_rcv(net, skb);
1244                 return;
1245         }
1246
1247         /* Locate neighboring node that sent packet */
1248         n = tipc_node_find(net, msg_prevnode(hdr));
1249         if (unlikely(!n))
1250                 goto discard;
1251         le = &n->links[bearer_id];
1252
1253         tipc_node_lock(n);
1254
1255         /* Is reception permitted at the moment ? */
1256         if (!tipc_node_filter_pkt(n, hdr))
1257                 goto unlock;
1258
1259         if (unlikely(msg_user(hdr) == LINK_PROTOCOL))
1260                 tipc_bclink_sync_state(n, hdr);
1261
1262         /* Release acked broadcast packets */
1263         if (unlikely(n->bclink.acked != msg_bcast_ack(hdr)))
1264                 tipc_bclink_acknowledge(n, msg_bcast_ack(hdr));
1265
1266         /* Check and if necessary update node state */
1267         if (likely(tipc_node_check_state(n, skb, bearer_id, &xmitq))) {
1268                 rc = tipc_link_rcv(le->link, skb, &xmitq);
1269                 skb = NULL;
1270         }
1271 unlock:
1272         tipc_node_unlock(n);
1273
1274         if (unlikely(rc & TIPC_LINK_UP_EVT))
1275                 tipc_node_link_up(n, bearer_id, &xmitq);
1276
1277         if (unlikely(rc & TIPC_LINK_DOWN_EVT))
1278                 tipc_node_link_down(n, bearer_id, false);
1279
1280         if (unlikely(!skb_queue_empty(&n->bclink.namedq)))
1281                 tipc_named_rcv(net, &n->bclink.namedq);
1282
1283         if (!skb_queue_empty(&le->inputq))
1284                 tipc_sk_rcv(net, &le->inputq);
1285
1286         if (!skb_queue_empty(&xmitq))
1287                 tipc_bearer_xmit(net, bearer_id, &xmitq, &le->maddr);
1288
1289         tipc_node_put(n);
1290 discard:
1291         kfree_skb(skb);
1292 }
1293
1294 int tipc_nl_node_dump(struct sk_buff *skb, struct netlink_callback *cb)
1295 {
1296         int err;
1297         struct net *net = sock_net(skb->sk);
1298         struct tipc_net *tn = net_generic(net, tipc_net_id);
1299         int done = cb->args[0];
1300         int last_addr = cb->args[1];
1301         struct tipc_node *node;
1302         struct tipc_nl_msg msg;
1303
1304         if (done)
1305                 return 0;
1306
1307         msg.skb = skb;
1308         msg.portid = NETLINK_CB(cb->skb).portid;
1309         msg.seq = cb->nlh->nlmsg_seq;
1310
1311         rcu_read_lock();
1312         if (last_addr) {
1313                 node = tipc_node_find(net, last_addr);
1314                 if (!node) {
1315                         rcu_read_unlock();
1316                         /* We never set seq or call nl_dump_check_consistent()
1317                          * this means that setting prev_seq here will cause the
1318                          * consistence check to fail in the netlink callback
1319                          * handler. Resulting in the NLMSG_DONE message having
1320                          * the NLM_F_DUMP_INTR flag set if the node state
1321                          * changed while we released the lock.
1322                          */
1323                         cb->prev_seq = 1;
1324                         return -EPIPE;
1325                 }
1326                 tipc_node_put(node);
1327         }
1328
1329         list_for_each_entry_rcu(node, &tn->node_list, list) {
1330                 if (last_addr) {
1331                         if (node->addr == last_addr)
1332                                 last_addr = 0;
1333                         else
1334                                 continue;
1335                 }
1336
1337                 tipc_node_lock(node);
1338                 err = __tipc_nl_add_node(&msg, node);
1339                 if (err) {
1340                         last_addr = node->addr;
1341                         tipc_node_unlock(node);
1342                         goto out;
1343                 }
1344
1345                 tipc_node_unlock(node);
1346         }
1347         done = 1;
1348 out:
1349         cb->args[0] = done;
1350         cb->args[1] = last_addr;
1351         rcu_read_unlock();
1352
1353         return skb->len;
1354 }