7c191641b44f64c080745df6615a8eccb237dd38
[cascardo/linux.git] / net / tipc / node.c
1 /*
2  * net/tipc/node.c: TIPC node management routines
3  *
4  * Copyright (c) 2000-2006, 2012-2015, Ericsson AB
5  * Copyright (c) 2005-2006, 2010-2014, Wind River Systems
6  * All rights reserved.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in the
15  *    documentation and/or other materials provided with the distribution.
16  * 3. Neither the names of the copyright holders nor the names of its
17  *    contributors may be used to endorse or promote products derived from
18  *    this software without specific prior written permission.
19  *
20  * Alternatively, this software may be distributed under the terms of the
21  * GNU General Public License ("GPL") version 2 as published by the Free
22  * Software Foundation.
23  *
24  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
25  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
26  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
27  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
28  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
29  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
30  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
31  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
32  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
33  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
34  * POSSIBILITY OF SUCH DAMAGE.
35  */
36
37 #include "core.h"
38 #include "link.h"
39 #include "node.h"
40 #include "name_distr.h"
41 #include "socket.h"
42 #include "bcast.h"
43 #include "discover.h"
44
45 /* Node FSM states and events:
46  */
47 enum {
48         SELF_DOWN_PEER_DOWN    = 0xdd,
49         SELF_UP_PEER_UP        = 0xaa,
50         SELF_DOWN_PEER_LEAVING = 0xd1,
51         SELF_UP_PEER_COMING    = 0xac,
52         SELF_COMING_PEER_UP    = 0xca,
53         SELF_LEAVING_PEER_DOWN = 0x1d,
54         NODE_FAILINGOVER       = 0xf0,
55         NODE_SYNCHING          = 0xcc
56 };
57
58 enum {
59         SELF_ESTABL_CONTACT_EVT = 0xece,
60         SELF_LOST_CONTACT_EVT   = 0x1ce,
61         PEER_ESTABL_CONTACT_EVT = 0x9ece,
62         PEER_LOST_CONTACT_EVT   = 0x91ce,
63         NODE_FAILOVER_BEGIN_EVT = 0xfbe,
64         NODE_FAILOVER_END_EVT   = 0xfee,
65         NODE_SYNCH_BEGIN_EVT    = 0xcbe,
66         NODE_SYNCH_END_EVT      = 0xcee
67 };
68
69 static void __tipc_node_link_down(struct tipc_node *n, int *bearer_id,
70                                   struct sk_buff_head *xmitq,
71                                   struct tipc_media_addr **maddr);
72 static void tipc_node_link_down(struct tipc_node *n, int bearer_id,
73                                 bool delete);
74 static void node_lost_contact(struct tipc_node *n, struct sk_buff_head *inputq);
75 static void node_established_contact(struct tipc_node *n_ptr);
76 static void tipc_node_delete(struct tipc_node *node);
77 static void tipc_node_timeout(unsigned long data);
78 static void tipc_node_fsm_evt(struct tipc_node *n, int evt);
79
80 struct tipc_sock_conn {
81         u32 port;
82         u32 peer_port;
83         u32 peer_node;
84         struct list_head list;
85 };
86
87 static const struct nla_policy tipc_nl_node_policy[TIPC_NLA_NODE_MAX + 1] = {
88         [TIPC_NLA_NODE_UNSPEC]          = { .type = NLA_UNSPEC },
89         [TIPC_NLA_NODE_ADDR]            = { .type = NLA_U32 },
90         [TIPC_NLA_NODE_UP]              = { .type = NLA_FLAG }
91 };
92
93 /*
94  * A trivial power-of-two bitmask technique is used for speed, since this
95  * operation is done for every incoming TIPC packet. The number of hash table
96  * entries has been chosen so that no hash chain exceeds 8 nodes and will
97  * usually be much smaller (typically only a single node).
98  */
99 static unsigned int tipc_hashfn(u32 addr)
100 {
101         return addr & (NODE_HTABLE_SIZE - 1);
102 }
103
104 static void tipc_node_kref_release(struct kref *kref)
105 {
106         struct tipc_node *node = container_of(kref, struct tipc_node, kref);
107
108         tipc_node_delete(node);
109 }
110
111 void tipc_node_put(struct tipc_node *node)
112 {
113         kref_put(&node->kref, tipc_node_kref_release);
114 }
115
116 static void tipc_node_get(struct tipc_node *node)
117 {
118         kref_get(&node->kref);
119 }
120
121 /*
122  * tipc_node_find - locate specified node object, if it exists
123  */
124 struct tipc_node *tipc_node_find(struct net *net, u32 addr)
125 {
126         struct tipc_net *tn = net_generic(net, tipc_net_id);
127         struct tipc_node *node;
128
129         if (unlikely(!in_own_cluster_exact(net, addr)))
130                 return NULL;
131
132         rcu_read_lock();
133         hlist_for_each_entry_rcu(node, &tn->node_htable[tipc_hashfn(addr)],
134                                  hash) {
135                 if (node->addr == addr) {
136                         tipc_node_get(node);
137                         rcu_read_unlock();
138                         return node;
139                 }
140         }
141         rcu_read_unlock();
142         return NULL;
143 }
144
145 struct tipc_node *tipc_node_create(struct net *net, u32 addr, u16 capabilities)
146 {
147         struct tipc_net *tn = net_generic(net, tipc_net_id);
148         struct tipc_node *n_ptr, *temp_node;
149
150         spin_lock_bh(&tn->node_list_lock);
151         n_ptr = tipc_node_find(net, addr);
152         if (n_ptr)
153                 goto exit;
154         n_ptr = kzalloc(sizeof(*n_ptr), GFP_ATOMIC);
155         if (!n_ptr) {
156                 pr_warn("Node creation failed, no memory\n");
157                 goto exit;
158         }
159         n_ptr->addr = addr;
160         n_ptr->net = net;
161         n_ptr->capabilities = capabilities;
162         kref_init(&n_ptr->kref);
163         spin_lock_init(&n_ptr->lock);
164         INIT_HLIST_NODE(&n_ptr->hash);
165         INIT_LIST_HEAD(&n_ptr->list);
166         INIT_LIST_HEAD(&n_ptr->publ_list);
167         INIT_LIST_HEAD(&n_ptr->conn_sks);
168         skb_queue_head_init(&n_ptr->bclink.namedq);
169         __skb_queue_head_init(&n_ptr->bclink.deferdq);
170         hlist_add_head_rcu(&n_ptr->hash, &tn->node_htable[tipc_hashfn(addr)]);
171         list_for_each_entry_rcu(temp_node, &tn->node_list, list) {
172                 if (n_ptr->addr < temp_node->addr)
173                         break;
174         }
175         list_add_tail_rcu(&n_ptr->list, &temp_node->list);
176         n_ptr->state = SELF_DOWN_PEER_LEAVING;
177         n_ptr->signature = INVALID_NODE_SIG;
178         n_ptr->active_links[0] = INVALID_BEARER_ID;
179         n_ptr->active_links[1] = INVALID_BEARER_ID;
180         tipc_node_get(n_ptr);
181         setup_timer(&n_ptr->timer, tipc_node_timeout, (unsigned long)n_ptr);
182         n_ptr->keepalive_intv = U32_MAX;
183 exit:
184         spin_unlock_bh(&tn->node_list_lock);
185         return n_ptr;
186 }
187
188 static void tipc_node_calculate_timer(struct tipc_node *n, struct tipc_link *l)
189 {
190         unsigned long tol = l->tolerance;
191         unsigned long intv = ((tol / 4) > 500) ? 500 : tol / 4;
192         unsigned long keepalive_intv = msecs_to_jiffies(intv);
193
194         /* Link with lowest tolerance determines timer interval */
195         if (keepalive_intv < n->keepalive_intv)
196                 n->keepalive_intv = keepalive_intv;
197
198         /* Ensure link's abort limit corresponds to current interval */
199         l->abort_limit = l->tolerance / jiffies_to_msecs(n->keepalive_intv);
200 }
201
202 static void tipc_node_delete(struct tipc_node *node)
203 {
204         list_del_rcu(&node->list);
205         hlist_del_rcu(&node->hash);
206         kfree_rcu(node, rcu);
207 }
208
209 void tipc_node_stop(struct net *net)
210 {
211         struct tipc_net *tn = net_generic(net, tipc_net_id);
212         struct tipc_node *node, *t_node;
213
214         spin_lock_bh(&tn->node_list_lock);
215         list_for_each_entry_safe(node, t_node, &tn->node_list, list) {
216                 if (del_timer(&node->timer))
217                         tipc_node_put(node);
218                 tipc_node_put(node);
219         }
220         spin_unlock_bh(&tn->node_list_lock);
221 }
222
223 int tipc_node_add_conn(struct net *net, u32 dnode, u32 port, u32 peer_port)
224 {
225         struct tipc_node *node;
226         struct tipc_sock_conn *conn;
227         int err = 0;
228
229         if (in_own_node(net, dnode))
230                 return 0;
231
232         node = tipc_node_find(net, dnode);
233         if (!node) {
234                 pr_warn("Connecting sock to node 0x%x failed\n", dnode);
235                 return -EHOSTUNREACH;
236         }
237         conn = kmalloc(sizeof(*conn), GFP_ATOMIC);
238         if (!conn) {
239                 err = -EHOSTUNREACH;
240                 goto exit;
241         }
242         conn->peer_node = dnode;
243         conn->port = port;
244         conn->peer_port = peer_port;
245
246         tipc_node_lock(node);
247         list_add_tail(&conn->list, &node->conn_sks);
248         tipc_node_unlock(node);
249 exit:
250         tipc_node_put(node);
251         return err;
252 }
253
254 void tipc_node_remove_conn(struct net *net, u32 dnode, u32 port)
255 {
256         struct tipc_node *node;
257         struct tipc_sock_conn *conn, *safe;
258
259         if (in_own_node(net, dnode))
260                 return;
261
262         node = tipc_node_find(net, dnode);
263         if (!node)
264                 return;
265
266         tipc_node_lock(node);
267         list_for_each_entry_safe(conn, safe, &node->conn_sks, list) {
268                 if (port != conn->port)
269                         continue;
270                 list_del(&conn->list);
271                 kfree(conn);
272         }
273         tipc_node_unlock(node);
274         tipc_node_put(node);
275 }
276
277 /* tipc_node_timeout - handle expiration of node timer
278  */
279 static void tipc_node_timeout(unsigned long data)
280 {
281         struct tipc_node *n = (struct tipc_node *)data;
282         struct tipc_link_entry *le;
283         struct sk_buff_head xmitq;
284         int bearer_id;
285         int rc = 0;
286
287         __skb_queue_head_init(&xmitq);
288
289         for (bearer_id = 0; bearer_id < MAX_BEARERS; bearer_id++) {
290                 tipc_node_lock(n);
291                 le = &n->links[bearer_id];
292                 if (le->link) {
293                         /* Link tolerance may change asynchronously: */
294                         tipc_node_calculate_timer(n, le->link);
295                         rc = tipc_link_timeout(le->link, &xmitq);
296                 }
297                 tipc_node_unlock(n);
298                 tipc_bearer_xmit(n->net, bearer_id, &xmitq, &le->maddr);
299                 if (rc & TIPC_LINK_DOWN_EVT)
300                         tipc_node_link_down(n, bearer_id, false);
301         }
302         if (!mod_timer(&n->timer, jiffies + n->keepalive_intv))
303                 tipc_node_get(n);
304         tipc_node_put(n);
305 }
306
307 /**
308  * __tipc_node_link_up - handle addition of link
309  * Node lock must be held by caller
310  * Link becomes active (alone or shared) or standby, depending on its priority.
311  */
312 static void __tipc_node_link_up(struct tipc_node *n, int bearer_id,
313                                 struct sk_buff_head *xmitq)
314 {
315         int *slot0 = &n->active_links[0];
316         int *slot1 = &n->active_links[1];
317         struct tipc_link *ol = node_active_link(n, 0);
318         struct tipc_link *nl = n->links[bearer_id].link;
319
320         if (!nl || !tipc_link_is_up(nl))
321                 return;
322
323         n->working_links++;
324         n->action_flags |= TIPC_NOTIFY_LINK_UP;
325         n->link_id = nl->peer_bearer_id << 16 | bearer_id;
326
327         /* Leave room for tunnel header when returning 'mtu' to users: */
328         n->links[bearer_id].mtu = nl->mtu - INT_H_SIZE;
329
330         tipc_bearer_add_dest(n->net, bearer_id, n->addr);
331
332         pr_debug("Established link <%s> on network plane %c\n",
333                  nl->name, nl->net_plane);
334
335         /* First link? => give it both slots */
336         if (!ol) {
337                 *slot0 = bearer_id;
338                 *slot1 = bearer_id;
339                 tipc_link_build_bcast_sync_msg(nl, xmitq);
340                 node_established_contact(n);
341                 return;
342         }
343
344         /* Second link => redistribute slots */
345         if (nl->priority > ol->priority) {
346                 pr_debug("Old link <%s> becomes standby\n", ol->name);
347                 *slot0 = bearer_id;
348                 *slot1 = bearer_id;
349         } else if (nl->priority == ol->priority) {
350                 *slot0 = bearer_id;
351         } else {
352                 pr_debug("New link <%s> is standby\n", nl->name);
353         }
354
355         /* Prepare synchronization with first link */
356         tipc_link_tnl_prepare(ol, nl, SYNCH_MSG, xmitq);
357 }
358
359 /**
360  * tipc_node_link_up - handle addition of link
361  *
362  * Link becomes active (alone or shared) or standby, depending on its priority.
363  */
364 static void tipc_node_link_up(struct tipc_node *n, int bearer_id,
365                               struct sk_buff_head *xmitq)
366 {
367         tipc_node_lock(n);
368         __tipc_node_link_up(n, bearer_id, xmitq);
369         tipc_node_unlock(n);
370 }
371
372 /**
373  * __tipc_node_link_down - handle loss of link
374  */
375 static void __tipc_node_link_down(struct tipc_node *n, int *bearer_id,
376                                   struct sk_buff_head *xmitq,
377                                   struct tipc_media_addr **maddr)
378 {
379         struct tipc_link_entry *le = &n->links[*bearer_id];
380         int *slot0 = &n->active_links[0];
381         int *slot1 = &n->active_links[1];
382         int i, highest = 0;
383         struct tipc_link *l, *_l, *tnl;
384
385         l = n->links[*bearer_id].link;
386         if (!l || tipc_link_is_reset(l))
387                 return;
388
389         n->working_links--;
390         n->action_flags |= TIPC_NOTIFY_LINK_DOWN;
391         n->link_id = l->peer_bearer_id << 16 | *bearer_id;
392
393         tipc_bearer_remove_dest(n->net, *bearer_id, n->addr);
394
395         pr_debug("Lost link <%s> on network plane %c\n",
396                  l->name, l->net_plane);
397
398         /* Select new active link if any available */
399         *slot0 = INVALID_BEARER_ID;
400         *slot1 = INVALID_BEARER_ID;
401         for (i = 0; i < MAX_BEARERS; i++) {
402                 _l = n->links[i].link;
403                 if (!_l || !tipc_link_is_up(_l))
404                         continue;
405                 if (_l == l)
406                         continue;
407                 if (_l->priority < highest)
408                         continue;
409                 if (_l->priority > highest) {
410                         highest = _l->priority;
411                         *slot0 = i;
412                         *slot1 = i;
413                         continue;
414                 }
415                 *slot1 = i;
416         }
417
418         if (!tipc_node_is_up(n)) {
419                 tipc_link_reset(l);
420                 node_lost_contact(n, &le->inputq);
421                 return;
422         }
423
424         /* There is still a working link => initiate failover */
425         tnl = node_active_link(n, 0);
426         n->sync_point = tnl->rcv_nxt + (U16_MAX / 2 - 1);
427         tipc_link_tnl_prepare(l, tnl, FAILOVER_MSG, xmitq);
428         tipc_link_reset(l);
429         tipc_link_fsm_evt(l, LINK_FAILOVER_BEGIN_EVT);
430         tipc_node_fsm_evt(n, NODE_FAILOVER_BEGIN_EVT);
431         *maddr = &n->links[tnl->bearer_id].maddr;
432         *bearer_id = tnl->bearer_id;
433 }
434
435 static void tipc_node_link_down(struct tipc_node *n, int bearer_id, bool delete)
436 {
437         struct tipc_link_entry *le = &n->links[bearer_id];
438         struct tipc_media_addr *maddr;
439         struct sk_buff_head xmitq;
440
441         __skb_queue_head_init(&xmitq);
442
443         tipc_node_lock(n);
444         __tipc_node_link_down(n, &bearer_id, &xmitq, &maddr);
445         if (delete && le->link) {
446                 kfree(le->link);
447                 le->link = NULL;
448                 n->link_cnt--;
449         }
450         tipc_node_unlock(n);
451
452         tipc_bearer_xmit(n->net, bearer_id, &xmitq, maddr);
453         tipc_sk_rcv(n->net, &le->inputq);
454 }
455
456 bool tipc_node_is_up(struct tipc_node *n)
457 {
458         return n->active_links[0] != INVALID_BEARER_ID;
459 }
460
461 void tipc_node_check_dest(struct net *net, u32 onode,
462                           struct tipc_bearer *b,
463                           u16 capabilities, u32 signature,
464                           struct tipc_media_addr *maddr,
465                           bool *respond, bool *dupl_addr)
466 {
467         struct tipc_node *n;
468         struct tipc_link *l;
469         struct tipc_link_entry *le;
470         bool addr_match = false;
471         bool sign_match = false;
472         bool link_up = false;
473         bool accept_addr = false;
474         bool reset = true;
475
476         *dupl_addr = false;
477         *respond = false;
478
479         n = tipc_node_create(net, onode, capabilities);
480         if (!n)
481                 return;
482
483         tipc_node_lock(n);
484
485         le = &n->links[b->identity];
486
487         /* Prepare to validate requesting node's signature and media address */
488         l = le->link;
489         link_up = l && tipc_link_is_up(l);
490         addr_match = l && !memcmp(&le->maddr, maddr, sizeof(*maddr));
491         sign_match = (signature == n->signature);
492
493         /* These three flags give us eight permutations: */
494
495         if (sign_match && addr_match && link_up) {
496                 /* All is fine. Do nothing. */
497                 reset = false;
498         } else if (sign_match && addr_match && !link_up) {
499                 /* Respond. The link will come up in due time */
500                 *respond = true;
501         } else if (sign_match && !addr_match && link_up) {
502                 /* Peer has changed i/f address without rebooting.
503                  * If so, the link will reset soon, and the next
504                  * discovery will be accepted. So we can ignore it.
505                  * It may also be an cloned or malicious peer having
506                  * chosen the same node address and signature as an
507                  * existing one.
508                  * Ignore requests until the link goes down, if ever.
509                  */
510                 *dupl_addr = true;
511         } else if (sign_match && !addr_match && !link_up) {
512                 /* Peer link has changed i/f address without rebooting.
513                  * It may also be a cloned or malicious peer; we can't
514                  * distinguish between the two.
515                  * The signature is correct, so we must accept.
516                  */
517                 accept_addr = true;
518                 *respond = true;
519         } else if (!sign_match && addr_match && link_up) {
520                 /* Peer node rebooted. Two possibilities:
521                  *  - Delayed re-discovery; this link endpoint has already
522                  *    reset and re-established contact with the peer, before
523                  *    receiving a discovery message from that node.
524                  *    (The peer happened to receive one from this node first).
525                  *  - The peer came back so fast that our side has not
526                  *    discovered it yet. Probing from this side will soon
527                  *    reset the link, since there can be no working link
528                  *    endpoint at the peer end, and the link will re-establish.
529                  *  Accept the signature, since it comes from a known peer.
530                  */
531                 n->signature = signature;
532         } else if (!sign_match && addr_match && !link_up) {
533                 /*  The peer node has rebooted.
534                  *  Accept signature, since it is a known peer.
535                  */
536                 n->signature = signature;
537                 *respond = true;
538         } else if (!sign_match && !addr_match && link_up) {
539                 /* Peer rebooted with new address, or a new/duplicate peer.
540                  * Ignore until the link goes down, if ever.
541                  */
542                 *dupl_addr = true;
543         } else if (!sign_match && !addr_match && !link_up) {
544                 /* Peer rebooted with new address, or it is a new peer.
545                  * Accept signature and address.
546                  */
547                 n->signature = signature;
548                 accept_addr = true;
549                 *respond = true;
550         }
551
552         if (!accept_addr)
553                 goto exit;
554
555         /* Now create new link if not already existing */
556         if (!l) {
557                 if (n->link_cnt == 2) {
558                         pr_warn("Cannot establish 3rd link to %x\n", n->addr);
559                         goto exit;
560                 }
561                 if (!tipc_link_create(n, b, mod(tipc_net(net)->random),
562                                       tipc_own_addr(net), onode, &le->maddr,
563                                       &le->inputq, &n->bclink.namedq, &l)) {
564                         *respond = false;
565                         goto exit;
566                 }
567                 tipc_link_reset(l);
568                 le->link = l;
569                 n->link_cnt++;
570                 tipc_node_calculate_timer(n, l);
571                 if (n->link_cnt == 1)
572                         if (!mod_timer(&n->timer, jiffies + n->keepalive_intv))
573                                 tipc_node_get(n);
574         }
575         memcpy(&le->maddr, maddr, sizeof(*maddr));
576 exit:
577         tipc_node_unlock(n);
578         if (reset)
579                 tipc_node_link_down(n, b->identity, false);
580         tipc_node_put(n);
581 }
582
583 void tipc_node_delete_links(struct net *net, int bearer_id)
584 {
585         struct tipc_net *tn = net_generic(net, tipc_net_id);
586         struct tipc_node *n;
587
588         rcu_read_lock();
589         list_for_each_entry_rcu(n, &tn->node_list, list) {
590                 tipc_node_link_down(n, bearer_id, true);
591         }
592         rcu_read_unlock();
593 }
594
595 static void tipc_node_reset_links(struct tipc_node *n)
596 {
597         char addr_string[16];
598         int i;
599
600         pr_warn("Resetting all links to %s\n",
601                 tipc_addr_string_fill(addr_string, n->addr));
602
603         for (i = 0; i < MAX_BEARERS; i++) {
604                 tipc_node_link_down(n, i, false);
605         }
606 }
607
608 /* tipc_node_fsm_evt - node finite state machine
609  * Determines when contact is allowed with peer node
610  */
611 static void tipc_node_fsm_evt(struct tipc_node *n, int evt)
612 {
613         int state = n->state;
614
615         switch (state) {
616         case SELF_DOWN_PEER_DOWN:
617                 switch (evt) {
618                 case SELF_ESTABL_CONTACT_EVT:
619                         state = SELF_UP_PEER_COMING;
620                         break;
621                 case PEER_ESTABL_CONTACT_EVT:
622                         state = SELF_COMING_PEER_UP;
623                         break;
624                 case SELF_LOST_CONTACT_EVT:
625                 case PEER_LOST_CONTACT_EVT:
626                         break;
627                 case NODE_SYNCH_END_EVT:
628                 case NODE_SYNCH_BEGIN_EVT:
629                 case NODE_FAILOVER_BEGIN_EVT:
630                 case NODE_FAILOVER_END_EVT:
631                 default:
632                         goto illegal_evt;
633                 }
634                 break;
635         case SELF_UP_PEER_UP:
636                 switch (evt) {
637                 case SELF_LOST_CONTACT_EVT:
638                         state = SELF_DOWN_PEER_LEAVING;
639                         break;
640                 case PEER_LOST_CONTACT_EVT:
641                         state = SELF_LEAVING_PEER_DOWN;
642                         break;
643                 case NODE_SYNCH_BEGIN_EVT:
644                         state = NODE_SYNCHING;
645                         break;
646                 case NODE_FAILOVER_BEGIN_EVT:
647                         state = NODE_FAILINGOVER;
648                         break;
649                 case SELF_ESTABL_CONTACT_EVT:
650                 case PEER_ESTABL_CONTACT_EVT:
651                 case NODE_SYNCH_END_EVT:
652                 case NODE_FAILOVER_END_EVT:
653                         break;
654                 default:
655                         goto illegal_evt;
656                 }
657                 break;
658         case SELF_DOWN_PEER_LEAVING:
659                 switch (evt) {
660                 case PEER_LOST_CONTACT_EVT:
661                         state = SELF_DOWN_PEER_DOWN;
662                         break;
663                 case SELF_ESTABL_CONTACT_EVT:
664                 case PEER_ESTABL_CONTACT_EVT:
665                 case SELF_LOST_CONTACT_EVT:
666                         break;
667                 case NODE_SYNCH_END_EVT:
668                 case NODE_SYNCH_BEGIN_EVT:
669                 case NODE_FAILOVER_BEGIN_EVT:
670                 case NODE_FAILOVER_END_EVT:
671                 default:
672                         goto illegal_evt;
673                 }
674                 break;
675         case SELF_UP_PEER_COMING:
676                 switch (evt) {
677                 case PEER_ESTABL_CONTACT_EVT:
678                         state = SELF_UP_PEER_UP;
679                         break;
680                 case SELF_LOST_CONTACT_EVT:
681                         state = SELF_DOWN_PEER_LEAVING;
682                         break;
683                 case SELF_ESTABL_CONTACT_EVT:
684                 case PEER_LOST_CONTACT_EVT:
685                         break;
686                 case NODE_SYNCH_END_EVT:
687                 case NODE_SYNCH_BEGIN_EVT:
688                 case NODE_FAILOVER_BEGIN_EVT:
689                 case NODE_FAILOVER_END_EVT:
690                 default:
691                         goto illegal_evt;
692                 }
693                 break;
694         case SELF_COMING_PEER_UP:
695                 switch (evt) {
696                 case SELF_ESTABL_CONTACT_EVT:
697                         state = SELF_UP_PEER_UP;
698                         break;
699                 case PEER_LOST_CONTACT_EVT:
700                         state = SELF_LEAVING_PEER_DOWN;
701                         break;
702                 case SELF_LOST_CONTACT_EVT:
703                 case PEER_ESTABL_CONTACT_EVT:
704                         break;
705                 case NODE_SYNCH_END_EVT:
706                 case NODE_SYNCH_BEGIN_EVT:
707                 case NODE_FAILOVER_BEGIN_EVT:
708                 case NODE_FAILOVER_END_EVT:
709                 default:
710                         goto illegal_evt;
711                 }
712                 break;
713         case SELF_LEAVING_PEER_DOWN:
714                 switch (evt) {
715                 case SELF_LOST_CONTACT_EVT:
716                         state = SELF_DOWN_PEER_DOWN;
717                         break;
718                 case SELF_ESTABL_CONTACT_EVT:
719                 case PEER_ESTABL_CONTACT_EVT:
720                 case PEER_LOST_CONTACT_EVT:
721                         break;
722                 case NODE_SYNCH_END_EVT:
723                 case NODE_SYNCH_BEGIN_EVT:
724                 case NODE_FAILOVER_BEGIN_EVT:
725                 case NODE_FAILOVER_END_EVT:
726                 default:
727                         goto illegal_evt;
728                 }
729                 break;
730         case NODE_FAILINGOVER:
731                 switch (evt) {
732                 case SELF_LOST_CONTACT_EVT:
733                         state = SELF_DOWN_PEER_LEAVING;
734                         break;
735                 case PEER_LOST_CONTACT_EVT:
736                         state = SELF_LEAVING_PEER_DOWN;
737                         break;
738                 case NODE_FAILOVER_END_EVT:
739                         state = SELF_UP_PEER_UP;
740                         break;
741                 case NODE_FAILOVER_BEGIN_EVT:
742                 case SELF_ESTABL_CONTACT_EVT:
743                 case PEER_ESTABL_CONTACT_EVT:
744                         break;
745                 case NODE_SYNCH_BEGIN_EVT:
746                 case NODE_SYNCH_END_EVT:
747                 default:
748                         goto illegal_evt;
749                 }
750                 break;
751         case NODE_SYNCHING:
752                 switch (evt) {
753                 case SELF_LOST_CONTACT_EVT:
754                         state = SELF_DOWN_PEER_LEAVING;
755                         break;
756                 case PEER_LOST_CONTACT_EVT:
757                         state = SELF_LEAVING_PEER_DOWN;
758                         break;
759                 case NODE_SYNCH_END_EVT:
760                         state = SELF_UP_PEER_UP;
761                         break;
762                 case NODE_FAILOVER_BEGIN_EVT:
763                         state = NODE_FAILINGOVER;
764                         break;
765                 case NODE_SYNCH_BEGIN_EVT:
766                 case SELF_ESTABL_CONTACT_EVT:
767                 case PEER_ESTABL_CONTACT_EVT:
768                         break;
769                 case NODE_FAILOVER_END_EVT:
770                 default:
771                         goto illegal_evt;
772                 }
773                 break;
774         default:
775                 pr_err("Unknown node fsm state %x\n", state);
776                 break;
777         }
778         n->state = state;
779         return;
780
781 illegal_evt:
782         pr_err("Illegal node fsm evt %x in state %x\n", evt, state);
783 }
784
785 bool tipc_node_filter_pkt(struct tipc_node *n, struct tipc_msg *hdr)
786 {
787         int state = n->state;
788
789         if (likely(state == SELF_UP_PEER_UP))
790                 return true;
791
792         if (state == SELF_LEAVING_PEER_DOWN)
793                 return false;
794
795         if (state == SELF_DOWN_PEER_LEAVING) {
796                 if (msg_peer_node_is_up(hdr))
797                         return false;
798         }
799
800         return true;
801 }
802
803 static void node_established_contact(struct tipc_node *n_ptr)
804 {
805         tipc_node_fsm_evt(n_ptr, SELF_ESTABL_CONTACT_EVT);
806         n_ptr->action_flags |= TIPC_NOTIFY_NODE_UP;
807         n_ptr->bclink.oos_state = 0;
808         n_ptr->bclink.acked = tipc_bclink_get_last_sent(n_ptr->net);
809         tipc_bclink_add_node(n_ptr->net, n_ptr->addr);
810 }
811
812 static void node_lost_contact(struct tipc_node *n_ptr,
813                               struct sk_buff_head *inputq)
814 {
815         char addr_string[16];
816         struct tipc_sock_conn *conn, *safe;
817         struct tipc_link *l;
818         struct list_head *conns = &n_ptr->conn_sks;
819         struct sk_buff *skb;
820         struct tipc_net *tn = net_generic(n_ptr->net, tipc_net_id);
821         uint i;
822
823         pr_debug("Lost contact with %s\n",
824                  tipc_addr_string_fill(addr_string, n_ptr->addr));
825
826         /* Flush broadcast link info associated with lost node */
827         if (n_ptr->bclink.recv_permitted) {
828                 __skb_queue_purge(&n_ptr->bclink.deferdq);
829
830                 if (n_ptr->bclink.reasm_buf) {
831                         kfree_skb(n_ptr->bclink.reasm_buf);
832                         n_ptr->bclink.reasm_buf = NULL;
833                 }
834
835                 tipc_bclink_remove_node(n_ptr->net, n_ptr->addr);
836                 tipc_bclink_acknowledge(n_ptr, INVALID_LINK_SEQ);
837
838                 n_ptr->bclink.recv_permitted = false;
839         }
840
841         /* Abort any ongoing link failover */
842         for (i = 0; i < MAX_BEARERS; i++) {
843                 l = n_ptr->links[i].link;
844                 if (l)
845                         tipc_link_fsm_evt(l, LINK_FAILOVER_END_EVT);
846         }
847
848         /* Prevent re-contact with node until cleanup is done */
849         tipc_node_fsm_evt(n_ptr, SELF_LOST_CONTACT_EVT);
850
851         /* Notify publications from this node */
852         n_ptr->action_flags |= TIPC_NOTIFY_NODE_DOWN;
853
854         /* Notify sockets connected to node */
855         list_for_each_entry_safe(conn, safe, conns, list) {
856                 skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, TIPC_CONN_MSG,
857                                       SHORT_H_SIZE, 0, tn->own_addr,
858                                       conn->peer_node, conn->port,
859                                       conn->peer_port, TIPC_ERR_NO_NODE);
860                 if (likely(skb))
861                         skb_queue_tail(inputq, skb);
862                 list_del(&conn->list);
863                 kfree(conn);
864         }
865 }
866
867 /**
868  * tipc_node_get_linkname - get the name of a link
869  *
870  * @bearer_id: id of the bearer
871  * @node: peer node address
872  * @linkname: link name output buffer
873  *
874  * Returns 0 on success
875  */
876 int tipc_node_get_linkname(struct net *net, u32 bearer_id, u32 addr,
877                            char *linkname, size_t len)
878 {
879         struct tipc_link *link;
880         int err = -EINVAL;
881         struct tipc_node *node = tipc_node_find(net, addr);
882
883         if (!node)
884                 return err;
885
886         if (bearer_id >= MAX_BEARERS)
887                 goto exit;
888
889         tipc_node_lock(node);
890         link = node->links[bearer_id].link;
891         if (link) {
892                 strncpy(linkname, link->name, len);
893                 err = 0;
894         }
895 exit:
896         tipc_node_unlock(node);
897         tipc_node_put(node);
898         return err;
899 }
900
901 void tipc_node_unlock(struct tipc_node *node)
902 {
903         struct net *net = node->net;
904         u32 addr = 0;
905         u32 flags = node->action_flags;
906         u32 link_id = 0;
907         struct list_head *publ_list;
908
909         if (likely(!flags)) {
910                 spin_unlock_bh(&node->lock);
911                 return;
912         }
913
914         addr = node->addr;
915         link_id = node->link_id;
916         publ_list = &node->publ_list;
917
918         node->action_flags &= ~(TIPC_NOTIFY_NODE_DOWN | TIPC_NOTIFY_NODE_UP |
919                                 TIPC_NOTIFY_LINK_DOWN | TIPC_NOTIFY_LINK_UP |
920                                 TIPC_WAKEUP_BCAST_USERS | TIPC_BCAST_MSG_EVT |
921                                 TIPC_BCAST_RESET);
922
923         spin_unlock_bh(&node->lock);
924
925         if (flags & TIPC_NOTIFY_NODE_DOWN)
926                 tipc_publ_notify(net, publ_list, addr);
927
928         if (flags & TIPC_WAKEUP_BCAST_USERS)
929                 tipc_bclink_wakeup_users(net);
930
931         if (flags & TIPC_NOTIFY_NODE_UP)
932                 tipc_named_node_up(net, addr);
933
934         if (flags & TIPC_NOTIFY_LINK_UP)
935                 tipc_nametbl_publish(net, TIPC_LINK_STATE, addr, addr,
936                                      TIPC_NODE_SCOPE, link_id, addr);
937
938         if (flags & TIPC_NOTIFY_LINK_DOWN)
939                 tipc_nametbl_withdraw(net, TIPC_LINK_STATE, addr,
940                                       link_id, addr);
941
942         if (flags & TIPC_BCAST_MSG_EVT)
943                 tipc_bclink_input(net);
944
945         if (flags & TIPC_BCAST_RESET)
946                 tipc_node_reset_links(node);
947 }
948
949 /* Caller should hold node lock for the passed node */
950 static int __tipc_nl_add_node(struct tipc_nl_msg *msg, struct tipc_node *node)
951 {
952         void *hdr;
953         struct nlattr *attrs;
954
955         hdr = genlmsg_put(msg->skb, msg->portid, msg->seq, &tipc_genl_family,
956                           NLM_F_MULTI, TIPC_NL_NODE_GET);
957         if (!hdr)
958                 return -EMSGSIZE;
959
960         attrs = nla_nest_start(msg->skb, TIPC_NLA_NODE);
961         if (!attrs)
962                 goto msg_full;
963
964         if (nla_put_u32(msg->skb, TIPC_NLA_NODE_ADDR, node->addr))
965                 goto attr_msg_full;
966         if (tipc_node_is_up(node))
967                 if (nla_put_flag(msg->skb, TIPC_NLA_NODE_UP))
968                         goto attr_msg_full;
969
970         nla_nest_end(msg->skb, attrs);
971         genlmsg_end(msg->skb, hdr);
972
973         return 0;
974
975 attr_msg_full:
976         nla_nest_cancel(msg->skb, attrs);
977 msg_full:
978         genlmsg_cancel(msg->skb, hdr);
979
980         return -EMSGSIZE;
981 }
982
983 static struct tipc_link *tipc_node_select_link(struct tipc_node *n, int sel,
984                                                int *bearer_id,
985                                                struct tipc_media_addr **maddr)
986 {
987         int id = n->active_links[sel & 1];
988
989         if (unlikely(id < 0))
990                 return NULL;
991
992         *bearer_id = id;
993         *maddr = &n->links[id].maddr;
994         return n->links[id].link;
995 }
996
997 /**
998  * tipc_node_xmit() is the general link level function for message sending
999  * @net: the applicable net namespace
1000  * @list: chain of buffers containing message
1001  * @dnode: address of destination node
1002  * @selector: a number used for deterministic link selection
1003  * Consumes the buffer chain, except when returning -ELINKCONG
1004  * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE
1005  */
1006 int tipc_node_xmit(struct net *net, struct sk_buff_head *list,
1007                    u32 dnode, int selector)
1008 {
1009         struct tipc_link *l = NULL;
1010         struct tipc_node *n;
1011         struct sk_buff_head xmitq;
1012         struct tipc_media_addr *maddr;
1013         int bearer_id;
1014         int rc = -EHOSTUNREACH;
1015
1016         __skb_queue_head_init(&xmitq);
1017         n = tipc_node_find(net, dnode);
1018         if (likely(n)) {
1019                 tipc_node_lock(n);
1020                 l = tipc_node_select_link(n, selector, &bearer_id, &maddr);
1021                 if (likely(l))
1022                         rc = tipc_link_xmit(l, list, &xmitq);
1023                 tipc_node_unlock(n);
1024                 if (unlikely(rc == -ENOBUFS))
1025                         tipc_node_link_down(n, bearer_id, false);
1026                 tipc_node_put(n);
1027         }
1028         if (likely(!rc)) {
1029                 tipc_bearer_xmit(net, bearer_id, &xmitq, maddr);
1030                 return 0;
1031         }
1032         if (likely(in_own_node(net, dnode))) {
1033                 tipc_sk_rcv(net, list);
1034                 return 0;
1035         }
1036         return rc;
1037 }
1038
1039 /* tipc_node_xmit_skb(): send single buffer to destination
1040  * Buffers sent via this functon are generally TIPC_SYSTEM_IMPORTANCE
1041  * messages, which will not be rejected
1042  * The only exception is datagram messages rerouted after secondary
1043  * lookup, which are rare and safe to dispose of anyway.
1044  * TODO: Return real return value, and let callers use
1045  * tipc_wait_for_sendpkt() where applicable
1046  */
1047 int tipc_node_xmit_skb(struct net *net, struct sk_buff *skb, u32 dnode,
1048                        u32 selector)
1049 {
1050         struct sk_buff_head head;
1051         int rc;
1052
1053         skb_queue_head_init(&head);
1054         __skb_queue_tail(&head, skb);
1055         rc = tipc_node_xmit(net, &head, dnode, selector);
1056         if (rc == -ELINKCONG)
1057                 kfree_skb(skb);
1058         return 0;
1059 }
1060
1061 /**
1062  * tipc_node_check_state - check and if necessary update node state
1063  * @skb: TIPC packet
1064  * @bearer_id: identity of bearer delivering the packet
1065  * Returns true if state is ok, otherwise consumes buffer and returns false
1066  */
1067 static bool tipc_node_check_state(struct tipc_node *n, struct sk_buff *skb,
1068                                   int bearer_id, struct sk_buff_head *xmitq)
1069 {
1070         struct tipc_msg *hdr = buf_msg(skb);
1071         int usr = msg_user(hdr);
1072         int mtyp = msg_type(hdr);
1073         u16 oseqno = msg_seqno(hdr);
1074         u16 iseqno = msg_seqno(msg_get_wrapped(hdr));
1075         u16 exp_pkts = msg_msgcnt(hdr);
1076         u16 rcv_nxt, syncpt, dlv_nxt;
1077         int state = n->state;
1078         struct tipc_link *l, *pl = NULL;
1079         struct tipc_media_addr *maddr;
1080         int i, pb_id;
1081
1082         l = n->links[bearer_id].link;
1083         if (!l)
1084                 return false;
1085         rcv_nxt = l->rcv_nxt;
1086
1087
1088         if (likely((state == SELF_UP_PEER_UP) && (usr != TUNNEL_PROTOCOL)))
1089                 return true;
1090
1091         /* Find parallel link, if any */
1092         for (i = 0; i < MAX_BEARERS; i++) {
1093                 if ((i != bearer_id) && n->links[i].link) {
1094                         pl = n->links[i].link;
1095                         break;
1096                 }
1097         }
1098
1099         /* Update node accesibility if applicable */
1100         if (state == SELF_UP_PEER_COMING) {
1101                 if (!tipc_link_is_up(l))
1102                         return true;
1103                 if (!msg_peer_link_is_up(hdr))
1104                         return true;
1105                 tipc_node_fsm_evt(n, PEER_ESTABL_CONTACT_EVT);
1106         }
1107
1108         if (state == SELF_DOWN_PEER_LEAVING) {
1109                 if (msg_peer_node_is_up(hdr))
1110                         return false;
1111                 tipc_node_fsm_evt(n, PEER_LOST_CONTACT_EVT);
1112         }
1113
1114         /* Ignore duplicate packets */
1115         if (less(oseqno, rcv_nxt))
1116                 return true;
1117
1118         /* Initiate or update failover mode if applicable */
1119         if ((usr == TUNNEL_PROTOCOL) && (mtyp == FAILOVER_MSG)) {
1120                 syncpt = oseqno + exp_pkts - 1;
1121                 if (pl && tipc_link_is_up(pl)) {
1122                         pb_id = pl->bearer_id;
1123                         __tipc_node_link_down(n, &pb_id, xmitq, &maddr);
1124                         tipc_skb_queue_splice_tail_init(pl->inputq, l->inputq);
1125                 }
1126                 /* If pkts arrive out of order, use lowest calculated syncpt */
1127                 if (less(syncpt, n->sync_point))
1128                         n->sync_point = syncpt;
1129         }
1130
1131         /* Open parallel link when tunnel link reaches synch point */
1132         if ((n->state == NODE_FAILINGOVER) && !tipc_link_is_failingover(l)) {
1133                 if (!more(rcv_nxt, n->sync_point))
1134                         return true;
1135                 tipc_node_fsm_evt(n, NODE_FAILOVER_END_EVT);
1136                 if (pl)
1137                         tipc_link_fsm_evt(pl, LINK_FAILOVER_END_EVT);
1138                 return true;
1139         }
1140
1141         /* Initiate or update synch mode if applicable */
1142         if ((usr == TUNNEL_PROTOCOL) && (mtyp == SYNCH_MSG)) {
1143                 syncpt = iseqno + exp_pkts - 1;
1144                 if (!tipc_link_is_up(l)) {
1145                         tipc_link_fsm_evt(l, LINK_ESTABLISH_EVT);
1146                         __tipc_node_link_up(n, bearer_id, xmitq);
1147                 }
1148                 if (n->state == SELF_UP_PEER_UP) {
1149                         n->sync_point = syncpt;
1150                         tipc_link_fsm_evt(l, LINK_SYNCH_BEGIN_EVT);
1151                         tipc_node_fsm_evt(n, NODE_SYNCH_BEGIN_EVT);
1152                 }
1153                 if (less(syncpt, n->sync_point))
1154                         n->sync_point = syncpt;
1155         }
1156
1157         /* Open tunnel link when parallel link reaches synch point */
1158         if ((n->state == NODE_SYNCHING) && tipc_link_is_synching(l)) {
1159                 if (pl)
1160                         dlv_nxt = mod(pl->rcv_nxt - skb_queue_len(pl->inputq));
1161                 if (!pl || more(dlv_nxt, n->sync_point)) {
1162                         tipc_link_fsm_evt(l, LINK_SYNCH_END_EVT);
1163                         tipc_node_fsm_evt(n, NODE_SYNCH_END_EVT);
1164                         return true;
1165                 }
1166                 if ((usr == TUNNEL_PROTOCOL) && (mtyp == SYNCH_MSG))
1167                         return true;
1168                 if (usr == LINK_PROTOCOL)
1169                         return true;
1170                 return false;
1171         }
1172         return true;
1173 }
1174
1175 /**
1176  * tipc_rcv - process TIPC packets/messages arriving from off-node
1177  * @net: the applicable net namespace
1178  * @skb: TIPC packet
1179  * @bearer: pointer to bearer message arrived on
1180  *
1181  * Invoked with no locks held. Bearer pointer must point to a valid bearer
1182  * structure (i.e. cannot be NULL), but bearer can be inactive.
1183  */
1184 void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b)
1185 {
1186         struct sk_buff_head xmitq;
1187         struct tipc_node *n;
1188         struct tipc_msg *hdr = buf_msg(skb);
1189         int usr = msg_user(hdr);
1190         int bearer_id = b->identity;
1191         struct tipc_link_entry *le;
1192         int rc = 0;
1193
1194         __skb_queue_head_init(&xmitq);
1195
1196         /* Ensure message is well-formed */
1197         if (unlikely(!tipc_msg_validate(skb)))
1198                 goto discard;
1199
1200         /* Handle arrival of a non-unicast link packet */
1201         if (unlikely(msg_non_seq(hdr))) {
1202                 if (usr ==  LINK_CONFIG)
1203                         tipc_disc_rcv(net, skb, b);
1204                 else
1205                         tipc_bclink_rcv(net, skb);
1206                 return;
1207         }
1208
1209         /* Locate neighboring node that sent packet */
1210         n = tipc_node_find(net, msg_prevnode(hdr));
1211         if (unlikely(!n))
1212                 goto discard;
1213         le = &n->links[bearer_id];
1214
1215         tipc_node_lock(n);
1216
1217         /* Is reception permitted at the moment ? */
1218         if (!tipc_node_filter_pkt(n, hdr))
1219                 goto unlock;
1220
1221         if (unlikely(msg_user(hdr) == LINK_PROTOCOL))
1222                 tipc_bclink_sync_state(n, hdr);
1223
1224         /* Release acked broadcast packets */
1225         if (unlikely(n->bclink.acked != msg_bcast_ack(hdr)))
1226                 tipc_bclink_acknowledge(n, msg_bcast_ack(hdr));
1227
1228         /* Check and if necessary update node state */
1229         if (likely(tipc_node_check_state(n, skb, bearer_id, &xmitq))) {
1230                 rc = tipc_link_rcv(le->link, skb, &xmitq);
1231                 skb = NULL;
1232         }
1233 unlock:
1234         tipc_node_unlock(n);
1235
1236         if (unlikely(rc & TIPC_LINK_UP_EVT))
1237                 tipc_node_link_up(n, bearer_id, &xmitq);
1238
1239         if (unlikely(rc & TIPC_LINK_DOWN_EVT))
1240                 tipc_node_link_down(n, bearer_id, false);
1241
1242         if (unlikely(!skb_queue_empty(&n->bclink.namedq)))
1243                 tipc_named_rcv(net, &n->bclink.namedq);
1244
1245         if (!skb_queue_empty(&le->inputq))
1246                 tipc_sk_rcv(net, &le->inputq);
1247
1248         if (!skb_queue_empty(&xmitq))
1249                 tipc_bearer_xmit(net, bearer_id, &xmitq, &le->maddr);
1250
1251         tipc_node_put(n);
1252 discard:
1253         kfree_skb(skb);
1254 }
1255
1256 int tipc_nl_node_dump(struct sk_buff *skb, struct netlink_callback *cb)
1257 {
1258         int err;
1259         struct net *net = sock_net(skb->sk);
1260         struct tipc_net *tn = net_generic(net, tipc_net_id);
1261         int done = cb->args[0];
1262         int last_addr = cb->args[1];
1263         struct tipc_node *node;
1264         struct tipc_nl_msg msg;
1265
1266         if (done)
1267                 return 0;
1268
1269         msg.skb = skb;
1270         msg.portid = NETLINK_CB(cb->skb).portid;
1271         msg.seq = cb->nlh->nlmsg_seq;
1272
1273         rcu_read_lock();
1274         if (last_addr) {
1275                 node = tipc_node_find(net, last_addr);
1276                 if (!node) {
1277                         rcu_read_unlock();
1278                         /* We never set seq or call nl_dump_check_consistent()
1279                          * this means that setting prev_seq here will cause the
1280                          * consistence check to fail in the netlink callback
1281                          * handler. Resulting in the NLMSG_DONE message having
1282                          * the NLM_F_DUMP_INTR flag set if the node state
1283                          * changed while we released the lock.
1284                          */
1285                         cb->prev_seq = 1;
1286                         return -EPIPE;
1287                 }
1288                 tipc_node_put(node);
1289         }
1290
1291         list_for_each_entry_rcu(node, &tn->node_list, list) {
1292                 if (last_addr) {
1293                         if (node->addr == last_addr)
1294                                 last_addr = 0;
1295                         else
1296                                 continue;
1297                 }
1298
1299                 tipc_node_lock(node);
1300                 err = __tipc_nl_add_node(&msg, node);
1301                 if (err) {
1302                         last_addr = node->addr;
1303                         tipc_node_unlock(node);
1304                         goto out;
1305                 }
1306
1307                 tipc_node_unlock(node);
1308         }
1309         done = 1;
1310 out:
1311         cb->args[0] = done;
1312         cb->args[1] = last_addr;
1313         rcu_read_unlock();
1314
1315         return skb->len;
1316 }