tipc: improve link FSM implementation
[cascardo/linux.git] / net / tipc / link.c
1 /*
2  * net/tipc/link.c: TIPC link code
3  *
4  * Copyright (c) 1996-2007, 2012-2015, Ericsson AB
5  * Copyright (c) 2004-2007, 2010-2013, Wind River Systems
6  * All rights reserved.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in the
15  *    documentation and/or other materials provided with the distribution.
16  * 3. Neither the names of the copyright holders nor the names of its
17  *    contributors may be used to endorse or promote products derived from
18  *    this software without specific prior written permission.
19  *
20  * Alternatively, this software may be distributed under the terms of the
21  * GNU General Public License ("GPL") version 2 as published by the Free
22  * Software Foundation.
23  *
24  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
25  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
26  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
27  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
28  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
29  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
30  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
31  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
32  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
33  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
34  * POSSIBILITY OF SUCH DAMAGE.
35  */
36
37 #include "core.h"
38 #include "subscr.h"
39 #include "link.h"
40 #include "bcast.h"
41 #include "socket.h"
42 #include "name_distr.h"
43 #include "discover.h"
44 #include "netlink.h"
45
46 #include <linux/pkt_sched.h>
47
48 /*
49  * Error message prefixes
50  */
51 static const char *link_co_err = "Link changeover error, ";
52 static const char *link_rst_msg = "Resetting link ";
53 static const char *link_unk_evt = "Unknown link event ";
54
55 static const struct nla_policy tipc_nl_link_policy[TIPC_NLA_LINK_MAX + 1] = {
56         [TIPC_NLA_LINK_UNSPEC]          = { .type = NLA_UNSPEC },
57         [TIPC_NLA_LINK_NAME] = {
58                 .type = NLA_STRING,
59                 .len = TIPC_MAX_LINK_NAME
60         },
61         [TIPC_NLA_LINK_MTU]             = { .type = NLA_U32 },
62         [TIPC_NLA_LINK_BROADCAST]       = { .type = NLA_FLAG },
63         [TIPC_NLA_LINK_UP]              = { .type = NLA_FLAG },
64         [TIPC_NLA_LINK_ACTIVE]          = { .type = NLA_FLAG },
65         [TIPC_NLA_LINK_PROP]            = { .type = NLA_NESTED },
66         [TIPC_NLA_LINK_STATS]           = { .type = NLA_NESTED },
67         [TIPC_NLA_LINK_RX]              = { .type = NLA_U32 },
68         [TIPC_NLA_LINK_TX]              = { .type = NLA_U32 }
69 };
70
71 /* Properties valid for media, bearar and link */
72 static const struct nla_policy tipc_nl_prop_policy[TIPC_NLA_PROP_MAX + 1] = {
73         [TIPC_NLA_PROP_UNSPEC]          = { .type = NLA_UNSPEC },
74         [TIPC_NLA_PROP_PRIO]            = { .type = NLA_U32 },
75         [TIPC_NLA_PROP_TOL]             = { .type = NLA_U32 },
76         [TIPC_NLA_PROP_WIN]             = { .type = NLA_U32 }
77 };
78
79 /*
80  * Out-of-range value for link session numbers
81  */
82 #define WILDCARD_SESSION 0x10000
83
84 /* State value stored in 'failover_pkts'
85  */
86 #define FIRST_FAILOVER 0xffffu
87
88 /* Link FSM states and events:
89  */
90 enum {
91         TIPC_LINK_WORKING,
92         TIPC_LINK_PROBING,
93         TIPC_LINK_RESETTING,
94         TIPC_LINK_ESTABLISHING
95 };
96
97 enum {
98         PEER_RESET_EVT    = RESET_MSG,
99         ACTIVATE_EVT      = ACTIVATE_MSG,
100         TRAFFIC_EVT,      /* Any other valid msg from peer */
101         SILENCE_EVT       /* Peer was silent during last timer interval*/
102 };
103
104 /* Link FSM state checking routines
105  */
106 static int link_working(struct tipc_link *l)
107 {
108         return l->state == TIPC_LINK_WORKING;
109 }
110
111 static int link_probing(struct tipc_link *l)
112 {
113         return l->state == TIPC_LINK_PROBING;
114 }
115
116 static int link_resetting(struct tipc_link *l)
117 {
118         return l->state == TIPC_LINK_RESETTING;
119 }
120
121 static int link_establishing(struct tipc_link *l)
122 {
123         return l->state == TIPC_LINK_ESTABLISHING;
124 }
125
126 static void link_handle_out_of_seq_msg(struct tipc_link *link,
127                                        struct sk_buff *skb);
128 static void tipc_link_proto_rcv(struct tipc_link *link,
129                                 struct sk_buff *skb);
130 static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tol);
131 static void link_state_event(struct tipc_link *l_ptr, u32 event);
132 static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe,
133                                       u16 rcvgap, int tolerance, int priority,
134                                       struct sk_buff_head *xmitq);
135 static void link_reset_statistics(struct tipc_link *l_ptr);
136 static void link_print(struct tipc_link *l_ptr, const char *str);
137 static void tipc_link_sync_xmit(struct tipc_link *l);
138 static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf);
139 static void tipc_link_input(struct tipc_link *l, struct sk_buff *skb);
140 static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb);
141 static bool tipc_link_failover_rcv(struct tipc_link *l, struct sk_buff **skb);
142 static void link_set_timer(struct tipc_link *link, unsigned long time);
143 static void link_activate(struct tipc_link *link);
144
145 /*
146  *  Simple link routines
147  */
148 static unsigned int align(unsigned int i)
149 {
150         return (i + 3) & ~3u;
151 }
152
153 static void tipc_link_release(struct kref *kref)
154 {
155         kfree(container_of(kref, struct tipc_link, ref));
156 }
157
158 static void tipc_link_get(struct tipc_link *l_ptr)
159 {
160         kref_get(&l_ptr->ref);
161 }
162
163 static void tipc_link_put(struct tipc_link *l_ptr)
164 {
165         kref_put(&l_ptr->ref, tipc_link_release);
166 }
167
168 static struct tipc_link *tipc_parallel_link(struct tipc_link *l)
169 {
170         struct tipc_node *n = l->owner;
171
172         if (node_active_link(n, 0) != l)
173                 return node_active_link(n, 0);
174         return node_active_link(n, 1);
175 }
176
177 /*
178  *  Simple non-static link routines (i.e. referenced outside this file)
179  */
180 int tipc_link_is_up(struct tipc_link *l_ptr)
181 {
182         if (!l_ptr)
183                 return 0;
184         return link_working(l_ptr) || link_probing(l_ptr);
185 }
186
187 int tipc_link_is_active(struct tipc_link *l)
188 {
189         struct tipc_node *n = l->owner;
190
191         return (node_active_link(n, 0) == l) || (node_active_link(n, 1) == l);
192 }
193
194 /**
195  * link_timeout - handle expiration of link timer
196  * @l_ptr: pointer to link
197  */
198 static void link_timeout(unsigned long data)
199 {
200         struct tipc_link *l_ptr = (struct tipc_link *)data;
201         struct sk_buff *skb;
202
203         tipc_node_lock(l_ptr->owner);
204
205         /* update counters used in statistical profiling of send traffic */
206         l_ptr->stats.accu_queue_sz += skb_queue_len(&l_ptr->transmq);
207         l_ptr->stats.queue_sz_counts++;
208
209         skb = skb_peek(&l_ptr->transmq);
210         if (skb) {
211                 struct tipc_msg *msg = buf_msg(skb);
212                 u32 length = msg_size(msg);
213
214                 if ((msg_user(msg) == MSG_FRAGMENTER) &&
215                     (msg_type(msg) == FIRST_FRAGMENT)) {
216                         length = msg_size(msg_get_wrapped(msg));
217                 }
218                 if (length) {
219                         l_ptr->stats.msg_lengths_total += length;
220                         l_ptr->stats.msg_length_counts++;
221                         if (length <= 64)
222                                 l_ptr->stats.msg_length_profile[0]++;
223                         else if (length <= 256)
224                                 l_ptr->stats.msg_length_profile[1]++;
225                         else if (length <= 1024)
226                                 l_ptr->stats.msg_length_profile[2]++;
227                         else if (length <= 4096)
228                                 l_ptr->stats.msg_length_profile[3]++;
229                         else if (length <= 16384)
230                                 l_ptr->stats.msg_length_profile[4]++;
231                         else if (length <= 32768)
232                                 l_ptr->stats.msg_length_profile[5]++;
233                         else
234                                 l_ptr->stats.msg_length_profile[6]++;
235                 }
236         }
237
238         /* do all other link processing performed on a periodic basis */
239         if (l_ptr->silent_intv_cnt)
240                 link_state_event(l_ptr, SILENCE_EVT);
241         else if (link_working(l_ptr) && tipc_bclink_acks_missing(l_ptr->owner))
242                 tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0);
243
244         l_ptr->silent_intv_cnt++;
245         if (skb_queue_len(&l_ptr->backlogq))
246                 tipc_link_push_packets(l_ptr);
247         link_set_timer(l_ptr, l_ptr->keepalive_intv);
248         tipc_node_unlock(l_ptr->owner);
249         tipc_link_put(l_ptr);
250 }
251
252 static void link_set_timer(struct tipc_link *link, unsigned long time)
253 {
254         if (!mod_timer(&link->timer, jiffies + time))
255                 tipc_link_get(link);
256 }
257
258 /**
259  * tipc_link_create - create a new link
260  * @n_ptr: pointer to associated node
261  * @b_ptr: pointer to associated bearer
262  * @media_addr: media address to use when sending messages over link
263  *
264  * Returns pointer to link.
265  */
266 struct tipc_link *tipc_link_create(struct tipc_node *n_ptr,
267                                    struct tipc_bearer *b_ptr,
268                                    const struct tipc_media_addr *media_addr,
269                                    struct sk_buff_head *inputq,
270                                    struct sk_buff_head *namedq)
271 {
272         struct tipc_net *tn = net_generic(n_ptr->net, tipc_net_id);
273         struct tipc_link *l_ptr;
274         struct tipc_msg *msg;
275         char *if_name;
276         char addr_string[16];
277         u32 peer = n_ptr->addr;
278
279         if (n_ptr->link_cnt >= MAX_BEARERS) {
280                 tipc_addr_string_fill(addr_string, n_ptr->addr);
281                 pr_err("Cannot establish %uth link to %s. Max %u allowed.\n",
282                        n_ptr->link_cnt, addr_string, MAX_BEARERS);
283                 return NULL;
284         }
285
286         if (n_ptr->links[b_ptr->identity].link) {
287                 tipc_addr_string_fill(addr_string, n_ptr->addr);
288                 pr_err("Attempt to establish second link on <%s> to %s\n",
289                        b_ptr->name, addr_string);
290                 return NULL;
291         }
292
293         l_ptr = kzalloc(sizeof(*l_ptr), GFP_ATOMIC);
294         if (!l_ptr) {
295                 pr_warn("Link creation failed, no memory\n");
296                 return NULL;
297         }
298         kref_init(&l_ptr->ref);
299         l_ptr->addr = peer;
300         if_name = strchr(b_ptr->name, ':') + 1;
301         sprintf(l_ptr->name, "%u.%u.%u:%s-%u.%u.%u:unknown",
302                 tipc_zone(tn->own_addr), tipc_cluster(tn->own_addr),
303                 tipc_node(tn->own_addr),
304                 if_name,
305                 tipc_zone(peer), tipc_cluster(peer), tipc_node(peer));
306                 /* note: peer i/f name is updated by reset/activate message */
307         memcpy(&l_ptr->media_addr, media_addr, sizeof(*media_addr));
308         l_ptr->owner = n_ptr;
309         l_ptr->peer_session = WILDCARD_SESSION;
310         l_ptr->bearer_id = b_ptr->identity;
311         link_set_supervision_props(l_ptr, b_ptr->tolerance);
312         l_ptr->state = TIPC_LINK_RESETTING;
313
314         l_ptr->pmsg = (struct tipc_msg *)&l_ptr->proto_msg;
315         msg = l_ptr->pmsg;
316         tipc_msg_init(tn->own_addr, msg, LINK_PROTOCOL, RESET_MSG, INT_H_SIZE,
317                       l_ptr->addr);
318         msg_set_size(msg, sizeof(l_ptr->proto_msg));
319         msg_set_session(msg, (tn->random & 0xffff));
320         msg_set_bearer_id(msg, b_ptr->identity);
321         strcpy((char *)msg_data(msg), if_name);
322         l_ptr->net_plane = b_ptr->net_plane;
323         l_ptr->advertised_mtu = b_ptr->mtu;
324         l_ptr->mtu = l_ptr->advertised_mtu;
325         l_ptr->priority = b_ptr->priority;
326         tipc_link_set_queue_limits(l_ptr, b_ptr->window);
327         l_ptr->snd_nxt = 1;
328         __skb_queue_head_init(&l_ptr->transmq);
329         __skb_queue_head_init(&l_ptr->backlogq);
330         __skb_queue_head_init(&l_ptr->deferdq);
331         skb_queue_head_init(&l_ptr->wakeupq);
332         l_ptr->inputq = inputq;
333         l_ptr->namedq = namedq;
334         skb_queue_head_init(l_ptr->inputq);
335         link_reset_statistics(l_ptr);
336         tipc_node_attach_link(n_ptr, l_ptr);
337         setup_timer(&l_ptr->timer, link_timeout, (unsigned long)l_ptr);
338         link_set_timer(l_ptr, l_ptr->keepalive_intv);
339         return l_ptr;
340 }
341
342 /**
343  * tipc_link_delete - Delete a link
344  * @l: link to be deleted
345  */
346 void tipc_link_delete(struct tipc_link *l)
347 {
348         tipc_link_reset(l);
349         if (del_timer(&l->timer))
350                 tipc_link_put(l);
351         /* Delete link now, or when timer is finished: */
352         tipc_link_reset_fragments(l);
353         tipc_node_detach_link(l->owner, l);
354         tipc_link_put(l);
355 }
356
357 void tipc_link_delete_list(struct net *net, unsigned int bearer_id)
358 {
359         struct tipc_net *tn = net_generic(net, tipc_net_id);
360         struct tipc_link *link;
361         struct tipc_node *node;
362
363         rcu_read_lock();
364         list_for_each_entry_rcu(node, &tn->node_list, list) {
365                 tipc_node_lock(node);
366                 link = node->links[bearer_id].link;
367                 if (link)
368                         tipc_link_delete(link);
369                 tipc_node_unlock(node);
370         }
371         rcu_read_unlock();
372 }
373
374 /**
375  * tipc_link_fsm_evt - link finite state machine
376  * @l: pointer to link
377  * @evt: state machine event to be processed
378  * @xmitq: queue to prepend created protocol message, if any
379  */
380 static int tipc_link_fsm_evt(struct tipc_link *l, int evt,
381                              struct sk_buff_head *xmitq)
382 {
383         int mtyp = 0, rc = 0;
384         struct tipc_link *pl;
385         enum {
386                 LINK_RESET    = 1,
387                 LINK_ACTIVATE = (1 << 1),
388                 SND_PROBE     = (1 << 2),
389                 SND_STATE     = (1 << 3),
390                 SND_RESET     = (1 << 4),
391                 SND_ACTIVATE  = (1 << 5)
392         } actions = 0;
393
394         if (l->exec_mode == TIPC_LINK_BLOCKED)
395                 return rc;
396
397         switch (l->state) {
398         case TIPC_LINK_WORKING:
399                 switch (evt) {
400                 case TRAFFIC_EVT:
401                 case ACTIVATE_EVT:
402                         break;
403                 case SILENCE_EVT:
404                         l->state = TIPC_LINK_PROBING;
405                         actions |= SND_PROBE;
406                         break;
407                 case PEER_RESET_EVT:
408                         actions |= LINK_RESET | SND_ACTIVATE;
409                         break;
410                 default:
411                         pr_debug("%s%u WORKING\n", link_unk_evt, evt);
412                 }
413                 break;
414         case TIPC_LINK_PROBING:
415                 switch (evt) {
416                 case TRAFFIC_EVT:
417                 case ACTIVATE_EVT:
418                         l->state = TIPC_LINK_WORKING;
419                         break;
420                 case PEER_RESET_EVT:
421                         actions |= LINK_RESET | SND_ACTIVATE;
422                         break;
423                 case SILENCE_EVT:
424                         if (l->silent_intv_cnt <= l->abort_limit) {
425                                 actions |= SND_PROBE;
426                                 break;
427                         }
428                         actions |= LINK_RESET | SND_RESET;
429                         break;
430                 default:
431                         pr_err("%s%u PROBING\n", link_unk_evt, evt);
432                 }
433                 break;
434         case TIPC_LINK_RESETTING:
435                 switch (evt) {
436                 case TRAFFIC_EVT:
437                         break;
438                 case ACTIVATE_EVT:
439                         pl = node_active_link(l->owner, 0);
440                         if (pl && link_probing(pl))
441                                 break;
442                         actions |= LINK_ACTIVATE;
443                         if (l->owner->working_links == 1)
444                                 tipc_link_sync_xmit(l);
445                         break;
446                 case PEER_RESET_EVT:
447                         l->state = TIPC_LINK_ESTABLISHING;
448                         actions |= SND_ACTIVATE;
449                         break;
450                 case SILENCE_EVT:
451                         actions |= SND_RESET;
452                         break;
453                 default:
454                         pr_err("%s%u in RESETTING\n", link_unk_evt, evt);
455                 }
456                 break;
457         case TIPC_LINK_ESTABLISHING:
458                 switch (evt) {
459                 case TRAFFIC_EVT:
460                 case ACTIVATE_EVT:
461                         pl = node_active_link(l->owner, 0);
462                         if (pl && link_probing(pl))
463                                 break;
464                         actions |= LINK_ACTIVATE;
465                         if (l->owner->working_links == 1)
466                                 tipc_link_sync_xmit(l);
467                         break;
468                 case PEER_RESET_EVT:
469                         break;
470                 case SILENCE_EVT:
471                         actions |= SND_ACTIVATE;
472                         break;
473                 default:
474                         pr_err("%s%u ESTABLISHING\n", link_unk_evt, evt);
475                 }
476                 break;
477         default:
478                 pr_err("Unknown link state %u/%u\n", l->state, evt);
479         }
480
481         /* Perform actions as decided by FSM */
482         if (actions & LINK_RESET) {
483                 l->exec_mode = TIPC_LINK_BLOCKED;
484                 rc |= TIPC_LINK_DOWN_EVT;
485         }
486         if (actions & LINK_ACTIVATE) {
487                 l->exec_mode = TIPC_LINK_OPEN;
488                 rc |= TIPC_LINK_UP_EVT;
489         }
490         if (actions & (SND_STATE | SND_PROBE))
491                 mtyp = STATE_MSG;
492         if (actions & SND_RESET)
493                 mtyp = RESET_MSG;
494         if (actions & SND_ACTIVATE)
495                 mtyp = ACTIVATE_MSG;
496         if (actions & (SND_PROBE | SND_STATE | SND_RESET | SND_ACTIVATE))
497                 tipc_link_build_proto_msg(l, mtyp, actions & SND_PROBE,
498                                           0, 0, 0, xmitq);
499         return rc;
500 }
501
502 /**
503  * link_schedule_user - schedule a message sender for wakeup after congestion
504  * @link: congested link
505  * @list: message that was attempted sent
506  * Create pseudo msg to send back to user when congestion abates
507  * Does not consume buffer list
508  */
509 static int link_schedule_user(struct tipc_link *link, struct sk_buff_head *list)
510 {
511         struct tipc_msg *msg = buf_msg(skb_peek(list));
512         int imp = msg_importance(msg);
513         u32 oport = msg_origport(msg);
514         u32 addr = link_own_addr(link);
515         struct sk_buff *skb;
516
517         /* This really cannot happen...  */
518         if (unlikely(imp > TIPC_CRITICAL_IMPORTANCE)) {
519                 pr_warn("%s<%s>, send queue full", link_rst_msg, link->name);
520                 return -ENOBUFS;
521         }
522         /* Non-blocking sender: */
523         if (TIPC_SKB_CB(skb_peek(list))->wakeup_pending)
524                 return -ELINKCONG;
525
526         /* Create and schedule wakeup pseudo message */
527         skb = tipc_msg_create(SOCK_WAKEUP, 0, INT_H_SIZE, 0,
528                               addr, addr, oport, 0, 0);
529         if (!skb)
530                 return -ENOBUFS;
531         TIPC_SKB_CB(skb)->chain_sz = skb_queue_len(list);
532         TIPC_SKB_CB(skb)->chain_imp = imp;
533         skb_queue_tail(&link->wakeupq, skb);
534         link->stats.link_congs++;
535         return -ELINKCONG;
536 }
537
538 /**
539  * link_prepare_wakeup - prepare users for wakeup after congestion
540  * @link: congested link
541  * Move a number of waiting users, as permitted by available space in
542  * the send queue, from link wait queue to node wait queue for wakeup
543  */
544 void link_prepare_wakeup(struct tipc_link *l)
545 {
546         int pnd[TIPC_SYSTEM_IMPORTANCE + 1] = {0,};
547         int imp, lim;
548         struct sk_buff *skb, *tmp;
549
550         skb_queue_walk_safe(&l->wakeupq, skb, tmp) {
551                 imp = TIPC_SKB_CB(skb)->chain_imp;
552                 lim = l->window + l->backlog[imp].limit;
553                 pnd[imp] += TIPC_SKB_CB(skb)->chain_sz;
554                 if ((pnd[imp] + l->backlog[imp].len) >= lim)
555                         break;
556                 skb_unlink(skb, &l->wakeupq);
557                 skb_queue_tail(l->inputq, skb);
558                 l->owner->inputq = l->inputq;
559                 l->owner->action_flags |= TIPC_MSG_EVT;
560         }
561 }
562
563 /**
564  * tipc_link_reset_fragments - purge link's inbound message fragments queue
565  * @l_ptr: pointer to link
566  */
567 void tipc_link_reset_fragments(struct tipc_link *l_ptr)
568 {
569         kfree_skb(l_ptr->reasm_buf);
570         l_ptr->reasm_buf = NULL;
571 }
572
573 void tipc_link_purge_backlog(struct tipc_link *l)
574 {
575         __skb_queue_purge(&l->backlogq);
576         l->backlog[TIPC_LOW_IMPORTANCE].len = 0;
577         l->backlog[TIPC_MEDIUM_IMPORTANCE].len = 0;
578         l->backlog[TIPC_HIGH_IMPORTANCE].len = 0;
579         l->backlog[TIPC_CRITICAL_IMPORTANCE].len = 0;
580         l->backlog[TIPC_SYSTEM_IMPORTANCE].len = 0;
581 }
582
583 /**
584  * tipc_link_purge_queues - purge all pkt queues associated with link
585  * @l_ptr: pointer to link
586  */
587 void tipc_link_purge_queues(struct tipc_link *l_ptr)
588 {
589         __skb_queue_purge(&l_ptr->deferdq);
590         __skb_queue_purge(&l_ptr->transmq);
591         tipc_link_purge_backlog(l_ptr);
592         tipc_link_reset_fragments(l_ptr);
593 }
594
595 void tipc_link_reset(struct tipc_link *l_ptr)
596 {
597         u32 prev_state = l_ptr->state;
598         int was_active_link = tipc_link_is_active(l_ptr);
599         struct tipc_node *owner = l_ptr->owner;
600         struct tipc_link *pl = tipc_parallel_link(l_ptr);
601
602         msg_set_session(l_ptr->pmsg, ((msg_session(l_ptr->pmsg) + 1) & 0xffff));
603
604         /* Link is down, accept any session */
605         l_ptr->peer_session = WILDCARD_SESSION;
606
607         /* Prepare for renewed mtu size negotiation */
608         l_ptr->mtu = l_ptr->advertised_mtu;
609
610         l_ptr->state = TIPC_LINK_RESETTING;
611
612         if ((prev_state == TIPC_LINK_RESETTING) ||
613             (prev_state == TIPC_LINK_ESTABLISHING))
614                 return;
615
616         tipc_node_link_down(l_ptr->owner, l_ptr->bearer_id);
617         tipc_bearer_remove_dest(owner->net, l_ptr->bearer_id, l_ptr->addr);
618
619         if (was_active_link && tipc_node_is_up(l_ptr->owner) && (pl != l_ptr)) {
620                 l_ptr->exec_mode = TIPC_LINK_BLOCKED;
621                 l_ptr->failover_checkpt = l_ptr->rcv_nxt;
622                 pl->failover_pkts = FIRST_FAILOVER;
623                 pl->failover_checkpt = l_ptr->rcv_nxt;
624                 pl->failover_skb = l_ptr->reasm_buf;
625         } else {
626                 kfree_skb(l_ptr->reasm_buf);
627         }
628         /* Clean up all queues, except inputq: */
629         __skb_queue_purge(&l_ptr->transmq);
630         __skb_queue_purge(&l_ptr->deferdq);
631         if (!owner->inputq)
632                 owner->inputq = l_ptr->inputq;
633         skb_queue_splice_init(&l_ptr->wakeupq, owner->inputq);
634         if (!skb_queue_empty(owner->inputq))
635                 owner->action_flags |= TIPC_MSG_EVT;
636         tipc_link_purge_backlog(l_ptr);
637         l_ptr->reasm_buf = NULL;
638         l_ptr->rcv_unacked = 0;
639         l_ptr->snd_nxt = 1;
640         l_ptr->silent_intv_cnt = 0;
641         l_ptr->stale_count = 0;
642         link_reset_statistics(l_ptr);
643 }
644
645 static void link_activate(struct tipc_link *link)
646 {
647         struct tipc_node *node = link->owner;
648
649         link->rcv_nxt = 1;
650         link->stats.recv_info = 1;
651         link->silent_intv_cnt = 0;
652         link->state = TIPC_LINK_WORKING;
653         link->exec_mode = TIPC_LINK_OPEN;
654         tipc_node_link_up(node, link->bearer_id);
655         tipc_bearer_add_dest(node->net, link->bearer_id, link->addr);
656 }
657
658 /**
659  * link_state_event - link finite state machine
660  * @l_ptr: pointer to link
661  * @event: state machine event to process
662  */
663 static void link_state_event(struct tipc_link *l, unsigned int evt)
664 {
665         int rc;
666         struct sk_buff_head xmitq;
667         struct sk_buff *skb;
668
669         if (l->exec_mode == TIPC_LINK_BLOCKED)
670                 return;
671
672         __skb_queue_head_init(&xmitq);
673
674         rc = tipc_link_fsm_evt(l, evt, &xmitq);
675
676         if (rc & TIPC_LINK_UP_EVT)
677                 link_activate(l);
678
679         if (rc & TIPC_LINK_DOWN_EVT)
680                 tipc_link_reset(l);
681
682         skb = __skb_dequeue(&xmitq);
683         if (!skb)
684                 return;
685         tipc_bearer_send(l->owner->net, l->bearer_id, skb, &l->media_addr);
686 }
687
688 /**
689  * __tipc_link_xmit(): same as tipc_link_xmit, but destlink is known & locked
690  * @link: link to use
691  * @list: chain of buffers containing message
692  *
693  * Consumes the buffer chain, except when returning an error code,
694  * Returns 0 if success, or errno: -ELINKCONG, -EMSGSIZE or -ENOBUFS
695  * Messages at TIPC_SYSTEM_IMPORTANCE are always accepted
696  */
697 int __tipc_link_xmit(struct net *net, struct tipc_link *link,
698                      struct sk_buff_head *list)
699 {
700         struct tipc_msg *msg = buf_msg(skb_peek(list));
701         unsigned int maxwin = link->window;
702         unsigned int i, imp = msg_importance(msg);
703         uint mtu = link->mtu;
704         u16 ack = mod(link->rcv_nxt - 1);
705         u16 seqno = link->snd_nxt;
706         u16 bc_last_in = link->owner->bclink.last_in;
707         struct tipc_media_addr *addr = &link->media_addr;
708         struct sk_buff_head *transmq = &link->transmq;
709         struct sk_buff_head *backlogq = &link->backlogq;
710         struct sk_buff *skb, *bskb;
711
712         /* Match msg importance against this and all higher backlog limits: */
713         for (i = imp; i <= TIPC_SYSTEM_IMPORTANCE; i++) {
714                 if (unlikely(link->backlog[i].len >= link->backlog[i].limit))
715                         return link_schedule_user(link, list);
716         }
717         if (unlikely(msg_size(msg) > mtu))
718                 return -EMSGSIZE;
719
720         /* Prepare each packet for sending, and add to relevant queue: */
721         while (skb_queue_len(list)) {
722                 skb = skb_peek(list);
723                 msg = buf_msg(skb);
724                 msg_set_seqno(msg, seqno);
725                 msg_set_ack(msg, ack);
726                 msg_set_bcast_ack(msg, bc_last_in);
727
728                 if (likely(skb_queue_len(transmq) < maxwin)) {
729                         __skb_dequeue(list);
730                         __skb_queue_tail(transmq, skb);
731                         tipc_bearer_send(net, link->bearer_id, skb, addr);
732                         link->rcv_unacked = 0;
733                         seqno++;
734                         continue;
735                 }
736                 if (tipc_msg_bundle(skb_peek_tail(backlogq), msg, mtu)) {
737                         kfree_skb(__skb_dequeue(list));
738                         link->stats.sent_bundled++;
739                         continue;
740                 }
741                 if (tipc_msg_make_bundle(&bskb, msg, mtu, link->addr)) {
742                         kfree_skb(__skb_dequeue(list));
743                         __skb_queue_tail(backlogq, bskb);
744                         link->backlog[msg_importance(buf_msg(bskb))].len++;
745                         link->stats.sent_bundled++;
746                         link->stats.sent_bundles++;
747                         continue;
748                 }
749                 link->backlog[imp].len += skb_queue_len(list);
750                 skb_queue_splice_tail_init(list, backlogq);
751         }
752         link->snd_nxt = seqno;
753         return 0;
754 }
755
756 /**
757  * tipc_link_xmit(): enqueue buffer list according to queue situation
758  * @link: link to use
759  * @list: chain of buffers containing message
760  * @xmitq: returned list of packets to be sent by caller
761  *
762  * Consumes the buffer chain, except when returning -ELINKCONG,
763  * since the caller then may want to make more send attempts.
764  * Returns 0 if success, or errno: -ELINKCONG, -EMSGSIZE or -ENOBUFS
765  * Messages at TIPC_SYSTEM_IMPORTANCE are always accepted
766  */
767 int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list,
768                    struct sk_buff_head *xmitq)
769 {
770         struct tipc_msg *hdr = buf_msg(skb_peek(list));
771         unsigned int maxwin = l->window;
772         unsigned int i, imp = msg_importance(hdr);
773         unsigned int mtu = l->mtu;
774         u16 ack = l->rcv_nxt - 1;
775         u16 seqno = l->snd_nxt;
776         u16 bc_last_in = l->owner->bclink.last_in;
777         struct sk_buff_head *transmq = &l->transmq;
778         struct sk_buff_head *backlogq = &l->backlogq;
779         struct sk_buff *skb, *_skb, *bskb;
780
781         /* Match msg importance against this and all higher backlog limits: */
782         for (i = imp; i <= TIPC_SYSTEM_IMPORTANCE; i++) {
783                 if (unlikely(l->backlog[i].len >= l->backlog[i].limit))
784                         return link_schedule_user(l, list);
785         }
786         if (unlikely(msg_size(hdr) > mtu))
787                 return -EMSGSIZE;
788
789         /* Prepare each packet for sending, and add to relevant queue: */
790         while (skb_queue_len(list)) {
791                 skb = skb_peek(list);
792                 hdr = buf_msg(skb);
793                 msg_set_seqno(hdr, seqno);
794                 msg_set_ack(hdr, ack);
795                 msg_set_bcast_ack(hdr, bc_last_in);
796
797                 if (likely(skb_queue_len(transmq) < maxwin)) {
798                         _skb = skb_clone(skb, GFP_ATOMIC);
799                         if (!_skb)
800                                 return -ENOBUFS;
801                         __skb_dequeue(list);
802                         __skb_queue_tail(transmq, skb);
803                         __skb_queue_tail(xmitq, _skb);
804                         l->rcv_unacked = 0;
805                         seqno++;
806                         continue;
807                 }
808                 if (tipc_msg_bundle(skb_peek_tail(backlogq), hdr, mtu)) {
809                         kfree_skb(__skb_dequeue(list));
810                         l->stats.sent_bundled++;
811                         continue;
812                 }
813                 if (tipc_msg_make_bundle(&bskb, hdr, mtu, l->addr)) {
814                         kfree_skb(__skb_dequeue(list));
815                         __skb_queue_tail(backlogq, bskb);
816                         l->backlog[msg_importance(buf_msg(bskb))].len++;
817                         l->stats.sent_bundled++;
818                         l->stats.sent_bundles++;
819                         continue;
820                 }
821                 l->backlog[imp].len += skb_queue_len(list);
822                 skb_queue_splice_tail_init(list, backlogq);
823         }
824         l->snd_nxt = seqno;
825         return 0;
826 }
827
828 static void skb2list(struct sk_buff *skb, struct sk_buff_head *list)
829 {
830         skb_queue_head_init(list);
831         __skb_queue_tail(list, skb);
832 }
833
834 static int __tipc_link_xmit_skb(struct tipc_link *link, struct sk_buff *skb)
835 {
836         struct sk_buff_head head;
837
838         skb2list(skb, &head);
839         return __tipc_link_xmit(link->owner->net, link, &head);
840 }
841
842 /*
843  * tipc_link_sync_xmit - synchronize broadcast link endpoints.
844  *
845  * Give a newly added peer node the sequence number where it should
846  * start receiving and acking broadcast packets.
847  *
848  * Called with node locked
849  */
850 static void tipc_link_sync_xmit(struct tipc_link *link)
851 {
852         struct sk_buff *skb;
853         struct tipc_msg *msg;
854
855         skb = tipc_buf_acquire(INT_H_SIZE);
856         if (!skb)
857                 return;
858
859         msg = buf_msg(skb);
860         tipc_msg_init(link_own_addr(link), msg, BCAST_PROTOCOL, STATE_MSG,
861                       INT_H_SIZE, link->addr);
862         msg_set_last_bcast(msg, link->owner->bclink.acked);
863         __tipc_link_xmit_skb(link, skb);
864 }
865
866 /*
867  * tipc_link_sync_rcv - synchronize broadcast link endpoints.
868  * Receive the sequence number where we should start receiving and
869  * acking broadcast packets from a newly added peer node, and open
870  * up for reception of such packets.
871  *
872  * Called with node locked
873  */
874 static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf)
875 {
876         struct tipc_msg *msg = buf_msg(buf);
877
878         n->bclink.last_sent = n->bclink.last_in = msg_last_bcast(msg);
879         n->bclink.recv_permitted = true;
880         kfree_skb(buf);
881 }
882
883 /*
884  * tipc_link_push_packets - push unsent packets to bearer
885  *
886  * Push out the unsent messages of a link where congestion
887  * has abated. Node is locked.
888  *
889  * Called with node locked
890  */
891 void tipc_link_push_packets(struct tipc_link *link)
892 {
893         struct sk_buff *skb;
894         struct tipc_msg *msg;
895         u16 seqno = link->snd_nxt;
896         u16 ack = mod(link->rcv_nxt - 1);
897
898         while (skb_queue_len(&link->transmq) < link->window) {
899                 skb = __skb_dequeue(&link->backlogq);
900                 if (!skb)
901                         break;
902                 msg = buf_msg(skb);
903                 link->backlog[msg_importance(msg)].len--;
904                 msg_set_ack(msg, ack);
905                 msg_set_seqno(msg, seqno);
906                 seqno = mod(seqno + 1);
907                 msg_set_bcast_ack(msg, link->owner->bclink.last_in);
908                 link->rcv_unacked = 0;
909                 __skb_queue_tail(&link->transmq, skb);
910                 tipc_bearer_send(link->owner->net, link->bearer_id,
911                                  skb, &link->media_addr);
912         }
913         link->snd_nxt = seqno;
914 }
915
916 void tipc_link_reset_all(struct tipc_node *node)
917 {
918         char addr_string[16];
919         u32 i;
920
921         tipc_node_lock(node);
922
923         pr_warn("Resetting all links to %s\n",
924                 tipc_addr_string_fill(addr_string, node->addr));
925
926         for (i = 0; i < MAX_BEARERS; i++) {
927                 if (node->links[i].link) {
928                         link_print(node->links[i].link, "Resetting link\n");
929                         tipc_link_reset(node->links[i].link);
930                 }
931         }
932
933         tipc_node_unlock(node);
934 }
935
936 static void link_retransmit_failure(struct tipc_link *l_ptr,
937                                     struct sk_buff *buf)
938 {
939         struct tipc_msg *msg = buf_msg(buf);
940         struct net *net = l_ptr->owner->net;
941
942         pr_warn("Retransmission failure on link <%s>\n", l_ptr->name);
943
944         if (l_ptr->addr) {
945                 /* Handle failure on standard link */
946                 link_print(l_ptr, "Resetting link\n");
947                 tipc_link_reset(l_ptr);
948
949         } else {
950                 /* Handle failure on broadcast link */
951                 struct tipc_node *n_ptr;
952                 char addr_string[16];
953
954                 pr_info("Msg seq number: %u,  ", msg_seqno(msg));
955                 pr_cont("Outstanding acks: %lu\n",
956                         (unsigned long) TIPC_SKB_CB(buf)->handle);
957
958                 n_ptr = tipc_bclink_retransmit_to(net);
959
960                 tipc_addr_string_fill(addr_string, n_ptr->addr);
961                 pr_info("Broadcast link info for %s\n", addr_string);
962                 pr_info("Reception permitted: %d,  Acked: %u\n",
963                         n_ptr->bclink.recv_permitted,
964                         n_ptr->bclink.acked);
965                 pr_info("Last in: %u,  Oos state: %u,  Last sent: %u\n",
966                         n_ptr->bclink.last_in,
967                         n_ptr->bclink.oos_state,
968                         n_ptr->bclink.last_sent);
969
970                 n_ptr->action_flags |= TIPC_BCAST_RESET;
971                 l_ptr->stale_count = 0;
972         }
973 }
974
975 void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *skb,
976                           u32 retransmits)
977 {
978         struct tipc_msg *msg;
979
980         if (!skb)
981                 return;
982
983         msg = buf_msg(skb);
984
985         /* Detect repeated retransmit failures */
986         if (l_ptr->last_retransm == msg_seqno(msg)) {
987                 if (++l_ptr->stale_count > 100) {
988                         link_retransmit_failure(l_ptr, skb);
989                         return;
990                 }
991         } else {
992                 l_ptr->last_retransm = msg_seqno(msg);
993                 l_ptr->stale_count = 1;
994         }
995
996         skb_queue_walk_from(&l_ptr->transmq, skb) {
997                 if (!retransmits)
998                         break;
999                 msg = buf_msg(skb);
1000                 msg_set_ack(msg, mod(l_ptr->rcv_nxt - 1));
1001                 msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
1002                 tipc_bearer_send(l_ptr->owner->net, l_ptr->bearer_id, skb,
1003                                  &l_ptr->media_addr);
1004                 retransmits--;
1005                 l_ptr->stats.retransmitted++;
1006         }
1007 }
1008
1009 /* link_synch(): check if all packets arrived before the synch
1010  *               point have been consumed
1011  * Returns true if the parallel links are synched, otherwise false
1012  */
1013 static bool link_synch(struct tipc_link *l)
1014 {
1015         unsigned int post_synch;
1016         struct tipc_link *pl;
1017
1018         pl  = tipc_parallel_link(l);
1019         if (pl == l)
1020                 goto synched;
1021
1022         /* Was last pre-synch packet added to input queue ? */
1023         if (less_eq(pl->rcv_nxt, l->synch_point))
1024                 return false;
1025
1026         /* Is it still in the input queue ? */
1027         post_synch = mod(pl->rcv_nxt - l->synch_point) - 1;
1028         if (skb_queue_len(pl->inputq) > post_synch)
1029                 return false;
1030 synched:
1031         l->exec_mode = TIPC_LINK_OPEN;
1032         return true;
1033 }
1034
1035 static void link_retrieve_defq(struct tipc_link *link,
1036                                struct sk_buff_head *list)
1037 {
1038         u16 seq_no;
1039
1040         if (skb_queue_empty(&link->deferdq))
1041                 return;
1042
1043         seq_no = buf_seqno(skb_peek(&link->deferdq));
1044         if (seq_no == link->rcv_nxt)
1045                 skb_queue_splice_tail_init(&link->deferdq, list);
1046 }
1047
1048 /**
1049  * tipc_rcv - process TIPC packets/messages arriving from off-node
1050  * @net: the applicable net namespace
1051  * @skb: TIPC packet
1052  * @b_ptr: pointer to bearer message arrived on
1053  *
1054  * Invoked with no locks held.  Bearer pointer must point to a valid bearer
1055  * structure (i.e. cannot be NULL), but bearer can be inactive.
1056  */
1057 void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b_ptr)
1058 {
1059         struct tipc_net *tn = net_generic(net, tipc_net_id);
1060         struct sk_buff_head head;
1061         struct tipc_node *n_ptr;
1062         struct tipc_link *l_ptr;
1063         struct sk_buff *skb1, *tmp;
1064         struct tipc_msg *msg;
1065         u16 seq_no;
1066         u16 ackd;
1067         u32 released;
1068
1069         skb2list(skb, &head);
1070
1071         while ((skb = __skb_dequeue(&head))) {
1072                 /* Ensure message is well-formed */
1073                 if (unlikely(!tipc_msg_validate(skb)))
1074                         goto discard;
1075
1076                 /* Handle arrival of a non-unicast link message */
1077                 msg = buf_msg(skb);
1078                 if (unlikely(msg_non_seq(msg))) {
1079                         if (msg_user(msg) ==  LINK_CONFIG)
1080                                 tipc_disc_rcv(net, skb, b_ptr);
1081                         else
1082                                 tipc_bclink_rcv(net, skb);
1083                         continue;
1084                 }
1085
1086                 /* Discard unicast link messages destined for another node */
1087                 if (unlikely(!msg_short(msg) &&
1088                              (msg_destnode(msg) != tn->own_addr)))
1089                         goto discard;
1090
1091                 /* Locate neighboring node that sent message */
1092                 n_ptr = tipc_node_find(net, msg_prevnode(msg));
1093                 if (unlikely(!n_ptr))
1094                         goto discard;
1095
1096                 tipc_node_lock(n_ptr);
1097                 /* Locate unicast link endpoint that should handle message */
1098                 l_ptr = n_ptr->links[b_ptr->identity].link;
1099                 if (unlikely(!l_ptr))
1100                         goto unlock;
1101
1102                 /* Verify that communication with node is currently allowed */
1103                 if ((n_ptr->action_flags & TIPC_WAIT_PEER_LINKS_DOWN) &&
1104                     msg_user(msg) == LINK_PROTOCOL &&
1105                     (msg_type(msg) == RESET_MSG ||
1106                     msg_type(msg) == ACTIVATE_MSG) &&
1107                     !msg_redundant_link(msg))
1108                         n_ptr->action_flags &= ~TIPC_WAIT_PEER_LINKS_DOWN;
1109
1110                 if (tipc_node_blocked(n_ptr))
1111                         goto unlock;
1112
1113                 /* Validate message sequence number info */
1114                 seq_no = msg_seqno(msg);
1115                 ackd = msg_ack(msg);
1116
1117                 /* Release acked messages */
1118                 if (unlikely(n_ptr->bclink.acked != msg_bcast_ack(msg)))
1119                         tipc_bclink_acknowledge(n_ptr, msg_bcast_ack(msg));
1120
1121                 released = 0;
1122                 skb_queue_walk_safe(&l_ptr->transmq, skb1, tmp) {
1123                         if (more(buf_seqno(skb1), ackd))
1124                                 break;
1125                          __skb_unlink(skb1, &l_ptr->transmq);
1126                          kfree_skb(skb1);
1127                          released = 1;
1128                 }
1129
1130                 /* Try sending any messages link endpoint has pending */
1131                 if (unlikely(skb_queue_len(&l_ptr->backlogq)))
1132                         tipc_link_push_packets(l_ptr);
1133
1134                 if (released && !skb_queue_empty(&l_ptr->wakeupq))
1135                         link_prepare_wakeup(l_ptr);
1136
1137                 /* Process the incoming packet */
1138                 if (unlikely(!link_working(l_ptr))) {
1139                         if (msg_user(msg) == LINK_PROTOCOL) {
1140                                 tipc_link_proto_rcv(l_ptr, skb);
1141                                 link_retrieve_defq(l_ptr, &head);
1142                                 skb = NULL;
1143                                 goto unlock;
1144                         }
1145
1146                         /* Traffic message. Conditionally activate link */
1147                         link_state_event(l_ptr, TRAFFIC_EVT);
1148
1149                         if (link_working(l_ptr)) {
1150                                 /* Re-insert buffer in front of queue */
1151                                 __skb_queue_head(&head, skb);
1152                                 skb = NULL;
1153                                 goto unlock;
1154                         }
1155                         goto unlock;
1156                 }
1157
1158                 /* Link is now in state TIPC_LINK_WORKING */
1159                 if (unlikely(seq_no != l_ptr->rcv_nxt)) {
1160                         link_handle_out_of_seq_msg(l_ptr, skb);
1161                         link_retrieve_defq(l_ptr, &head);
1162                         skb = NULL;
1163                         goto unlock;
1164                 }
1165                 l_ptr->silent_intv_cnt = 0;
1166
1167                 /* Synchronize with parallel link if applicable */
1168                 if (unlikely((l_ptr->exec_mode == TIPC_LINK_TUNNEL) &&
1169                              !msg_dup(msg))) {
1170                         if (!link_synch(l_ptr))
1171                                 goto unlock;
1172                 }
1173                 l_ptr->rcv_nxt++;
1174                 if (unlikely(!skb_queue_empty(&l_ptr->deferdq)))
1175                         link_retrieve_defq(l_ptr, &head);
1176                 if (unlikely(++l_ptr->rcv_unacked >= TIPC_MIN_LINK_WIN)) {
1177                         l_ptr->stats.sent_acks++;
1178                         tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0);
1179                 }
1180                 tipc_link_input(l_ptr, skb);
1181                 skb = NULL;
1182 unlock:
1183                 tipc_node_unlock(n_ptr);
1184                 tipc_node_put(n_ptr);
1185 discard:
1186                 if (unlikely(skb))
1187                         kfree_skb(skb);
1188         }
1189 }
1190
1191 /* tipc_data_input - deliver data and name distr msgs to upper layer
1192  *
1193  * Consumes buffer if message is of right type
1194  * Node lock must be held
1195  */
1196 static bool tipc_data_input(struct tipc_link *link, struct sk_buff *skb)
1197 {
1198         struct tipc_node *node = link->owner;
1199         struct tipc_msg *msg = buf_msg(skb);
1200         u32 dport = msg_destport(msg);
1201
1202         switch (msg_user(msg)) {
1203         case TIPC_LOW_IMPORTANCE:
1204         case TIPC_MEDIUM_IMPORTANCE:
1205         case TIPC_HIGH_IMPORTANCE:
1206         case TIPC_CRITICAL_IMPORTANCE:
1207         case CONN_MANAGER:
1208                 if (tipc_skb_queue_tail(link->inputq, skb, dport)) {
1209                         node->inputq = link->inputq;
1210                         node->action_flags |= TIPC_MSG_EVT;
1211                 }
1212                 return true;
1213         case NAME_DISTRIBUTOR:
1214                 node->bclink.recv_permitted = true;
1215                 node->namedq = link->namedq;
1216                 skb_queue_tail(link->namedq, skb);
1217                 if (skb_queue_len(link->namedq) == 1)
1218                         node->action_flags |= TIPC_NAMED_MSG_EVT;
1219                 return true;
1220         case MSG_BUNDLER:
1221         case TUNNEL_PROTOCOL:
1222         case MSG_FRAGMENTER:
1223         case BCAST_PROTOCOL:
1224                 return false;
1225         default:
1226                 pr_warn("Dropping received illegal msg type\n");
1227                 kfree_skb(skb);
1228                 return false;
1229         };
1230 }
1231
1232 /* tipc_link_input - process packet that has passed link protocol check
1233  *
1234  * Consumes buffer
1235  * Node lock must be held
1236  */
1237 static void tipc_link_input(struct tipc_link *link, struct sk_buff *skb)
1238 {
1239         struct tipc_node *node = link->owner;
1240         struct tipc_msg *msg = buf_msg(skb);
1241         struct sk_buff *iskb;
1242         int pos = 0;
1243
1244         if (likely(tipc_data_input(link, skb)))
1245                 return;
1246
1247         switch (msg_user(msg)) {
1248         case TUNNEL_PROTOCOL:
1249                 if (msg_dup(msg)) {
1250                         link->exec_mode = TIPC_LINK_TUNNEL;
1251                         link->synch_point = msg_seqno(msg_get_wrapped(msg));
1252                         kfree_skb(skb);
1253                         break;
1254                 }
1255                 if (!tipc_link_failover_rcv(link, &skb))
1256                         break;
1257                 if (msg_user(buf_msg(skb)) != MSG_BUNDLER) {
1258                         tipc_data_input(link, skb);
1259                         break;
1260                 }
1261         case MSG_BUNDLER:
1262                 link->stats.recv_bundles++;
1263                 link->stats.recv_bundled += msg_msgcnt(msg);
1264
1265                 while (tipc_msg_extract(skb, &iskb, &pos))
1266                         tipc_data_input(link, iskb);
1267                 break;
1268         case MSG_FRAGMENTER:
1269                 link->stats.recv_fragments++;
1270                 if (tipc_buf_append(&link->reasm_buf, &skb)) {
1271                         link->stats.recv_fragmented++;
1272                         tipc_data_input(link, skb);
1273                 } else if (!link->reasm_buf) {
1274                         tipc_link_reset(link);
1275                 }
1276                 break;
1277         case BCAST_PROTOCOL:
1278                 tipc_link_sync_rcv(node, skb);
1279                 break;
1280         default:
1281                 break;
1282         };
1283 }
1284
1285 /**
1286  * tipc_link_defer_pkt - Add out-of-sequence message to deferred reception queue
1287  *
1288  * Returns increase in queue length (i.e. 0 or 1)
1289  */
1290 u32 tipc_link_defer_pkt(struct sk_buff_head *list, struct sk_buff *skb)
1291 {
1292         struct sk_buff *skb1;
1293         u16 seq_no = buf_seqno(skb);
1294
1295         /* Empty queue ? */
1296         if (skb_queue_empty(list)) {
1297                 __skb_queue_tail(list, skb);
1298                 return 1;
1299         }
1300
1301         /* Last ? */
1302         if (less(buf_seqno(skb_peek_tail(list)), seq_no)) {
1303                 __skb_queue_tail(list, skb);
1304                 return 1;
1305         }
1306
1307         /* Locate insertion point in queue, then insert; discard if duplicate */
1308         skb_queue_walk(list, skb1) {
1309                 u16 curr_seqno = buf_seqno(skb1);
1310
1311                 if (seq_no == curr_seqno) {
1312                         kfree_skb(skb);
1313                         return 0;
1314                 }
1315
1316                 if (less(seq_no, curr_seqno))
1317                         break;
1318         }
1319
1320         __skb_queue_before(list, skb1, skb);
1321         return 1;
1322 }
1323
1324 /*
1325  * link_handle_out_of_seq_msg - handle arrival of out-of-sequence packet
1326  */
1327 static void link_handle_out_of_seq_msg(struct tipc_link *l_ptr,
1328                                        struct sk_buff *buf)
1329 {
1330         u32 seq_no = buf_seqno(buf);
1331
1332         if (likely(msg_user(buf_msg(buf)) == LINK_PROTOCOL)) {
1333                 tipc_link_proto_rcv(l_ptr, buf);
1334                 return;
1335         }
1336
1337         /* Record OOS packet arrival */
1338         l_ptr->silent_intv_cnt = 0;
1339
1340         /*
1341          * Discard packet if a duplicate; otherwise add it to deferred queue
1342          * and notify peer of gap as per protocol specification
1343          */
1344         if (less(seq_no, l_ptr->rcv_nxt)) {
1345                 l_ptr->stats.duplicates++;
1346                 kfree_skb(buf);
1347                 return;
1348         }
1349
1350         if (tipc_link_defer_pkt(&l_ptr->deferdq, buf)) {
1351                 l_ptr->stats.deferred_recv++;
1352                 if ((skb_queue_len(&l_ptr->deferdq) % TIPC_MIN_LINK_WIN) == 1)
1353                         tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0);
1354         } else {
1355                 l_ptr->stats.duplicates++;
1356         }
1357 }
1358
1359 /*
1360  * Send protocol message to the other endpoint.
1361  */
1362 void tipc_link_proto_xmit(struct tipc_link *l, u32 msg_typ, int probe_msg,
1363                           u32 gap, u32 tolerance, u32 priority)
1364 {
1365         struct sk_buff *skb = NULL;
1366         struct sk_buff_head xmitq;
1367
1368         __skb_queue_head_init(&xmitq);
1369         tipc_link_build_proto_msg(l, msg_typ, probe_msg, gap,
1370                                   tolerance, priority, &xmitq);
1371         skb = __skb_dequeue(&xmitq);
1372         if (!skb)
1373                 return;
1374         tipc_bearer_send(l->owner->net, l->bearer_id, skb, &l->media_addr);
1375         l->rcv_unacked = 0;
1376         kfree_skb(skb);
1377 }
1378
1379 /*
1380  * Receive protocol message :
1381  * Note that network plane id propagates through the network, and may
1382  * change at any time. The node with lowest address rules
1383  */
1384 static void tipc_link_proto_rcv(struct tipc_link *l_ptr,
1385                                 struct sk_buff *buf)
1386 {
1387         u32 rec_gap = 0;
1388         u32 msg_tol;
1389         struct tipc_msg *msg = buf_msg(buf);
1390
1391         if (l_ptr->exec_mode == TIPC_LINK_BLOCKED)
1392                 goto exit;
1393
1394         if (l_ptr->net_plane != msg_net_plane(msg))
1395                 if (link_own_addr(l_ptr) > msg_prevnode(msg))
1396                         l_ptr->net_plane = msg_net_plane(msg);
1397
1398         switch (msg_type(msg)) {
1399
1400         case RESET_MSG:
1401                 if (!link_probing(l_ptr) &&
1402                     (l_ptr->peer_session != WILDCARD_SESSION)) {
1403                         if (less_eq(msg_session(msg), l_ptr->peer_session))
1404                                 break; /* duplicate or old reset: ignore */
1405                 }
1406
1407                 if (!msg_redundant_link(msg) && (link_working(l_ptr) ||
1408                                                  link_probing(l_ptr))) {
1409                         /* peer has lost contact -- don't allow peer's links
1410                          * to reactivate before we recognize loss & clean up
1411                          */
1412                         l_ptr->owner->action_flags |= TIPC_WAIT_OWN_LINKS_DOWN;
1413                 }
1414
1415                 link_state_event(l_ptr, RESET_MSG);
1416
1417                 /* fall thru' */
1418         case ACTIVATE_MSG:
1419                 /* Update link settings according other endpoint's values */
1420                 strcpy((strrchr(l_ptr->name, ':') + 1), (char *)msg_data(msg));
1421
1422                 msg_tol = msg_link_tolerance(msg);
1423                 if (msg_tol > l_ptr->tolerance)
1424                         link_set_supervision_props(l_ptr, msg_tol);
1425
1426                 if (msg_linkprio(msg) > l_ptr->priority)
1427                         l_ptr->priority = msg_linkprio(msg);
1428
1429                 if (l_ptr->mtu > msg_max_pkt(msg))
1430                         l_ptr->mtu = msg_max_pkt(msg);
1431
1432                 /* Synchronize broadcast link info, if not done previously */
1433                 if (!tipc_node_is_up(l_ptr->owner)) {
1434                         l_ptr->owner->bclink.last_sent =
1435                                 l_ptr->owner->bclink.last_in =
1436                                 msg_last_bcast(msg);
1437                         l_ptr->owner->bclink.oos_state = 0;
1438                 }
1439
1440                 l_ptr->peer_session = msg_session(msg);
1441                 l_ptr->peer_bearer_id = msg_bearer_id(msg);
1442
1443                 if (msg_type(msg) == ACTIVATE_MSG)
1444                         link_state_event(l_ptr, ACTIVATE_MSG);
1445                 break;
1446         case STATE_MSG:
1447
1448                 msg_tol = msg_link_tolerance(msg);
1449                 if (msg_tol)
1450                         link_set_supervision_props(l_ptr, msg_tol);
1451
1452                 if (msg_linkprio(msg) &&
1453                     (msg_linkprio(msg) != l_ptr->priority)) {
1454                         pr_debug("%s<%s>, priority change %u->%u\n",
1455                                  link_rst_msg, l_ptr->name,
1456                                  l_ptr->priority, msg_linkprio(msg));
1457                         l_ptr->priority = msg_linkprio(msg);
1458                         tipc_link_reset(l_ptr); /* Enforce change to take effect */
1459                         break;
1460                 }
1461
1462                 /* Record reception; force mismatch at next timeout: */
1463                 l_ptr->silent_intv_cnt = 0;
1464
1465                 link_state_event(l_ptr, TRAFFIC_EVT);
1466                 l_ptr->stats.recv_states++;
1467                 if (link_resetting(l_ptr))
1468                         break;
1469
1470                 if (less_eq(l_ptr->rcv_nxt, msg_next_sent(msg)))
1471                         rec_gap = mod(msg_next_sent(msg) - l_ptr->rcv_nxt);
1472
1473                 if (msg_probe(msg))
1474                         l_ptr->stats.recv_probes++;
1475
1476                 /* Protocol message before retransmits, reduce loss risk */
1477                 if (l_ptr->owner->bclink.recv_permitted)
1478                         tipc_bclink_update_link_state(l_ptr->owner,
1479                                                       msg_last_bcast(msg));
1480
1481                 if (rec_gap || (msg_probe(msg))) {
1482                         tipc_link_proto_xmit(l_ptr, STATE_MSG, 0,
1483                                              rec_gap, 0, 0);
1484                 }
1485                 if (msg_seq_gap(msg)) {
1486                         l_ptr->stats.recv_nacks++;
1487                         tipc_link_retransmit(l_ptr, skb_peek(&l_ptr->transmq),
1488                                              msg_seq_gap(msg));
1489                 }
1490                 break;
1491         }
1492 exit:
1493         kfree_skb(buf);
1494 }
1495
1496 /* tipc_link_build_proto_msg: prepare link protocol message for transmission
1497  */
1498 static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe,
1499                                       u16 rcvgap, int tolerance, int priority,
1500                                       struct sk_buff_head *xmitq)
1501 {
1502         struct sk_buff *skb = NULL;
1503         struct tipc_msg *hdr = l->pmsg;
1504         u16 snd_nxt = l->snd_nxt;
1505         u16 rcv_nxt = l->rcv_nxt;
1506         u16 rcv_last = rcv_nxt - 1;
1507         int node_up = l->owner->bclink.recv_permitted;
1508
1509         /* Don't send protocol message during reset or link failover */
1510         if (l->exec_mode == TIPC_LINK_BLOCKED)
1511                 return;
1512
1513         /* Abort non-RESET send if communication with node is prohibited */
1514         if ((tipc_node_blocked(l->owner)) && (mtyp != RESET_MSG))
1515                 return;
1516
1517         msg_set_type(hdr, mtyp);
1518         msg_set_net_plane(hdr, l->net_plane);
1519         msg_set_bcast_ack(hdr, l->owner->bclink.last_in);
1520         msg_set_last_bcast(hdr, tipc_bclink_get_last_sent(l->owner->net));
1521         msg_set_link_tolerance(hdr, tolerance);
1522         msg_set_linkprio(hdr, priority);
1523         msg_set_redundant_link(hdr, node_up);
1524         msg_set_seq_gap(hdr, 0);
1525
1526         /* Compatibility: created msg must not be in sequence with pkt flow */
1527         msg_set_seqno(hdr, snd_nxt + U16_MAX / 2);
1528
1529         if (mtyp == STATE_MSG) {
1530                 if (!tipc_link_is_up(l))
1531                         return;
1532                 msg_set_next_sent(hdr, snd_nxt);
1533
1534                 /* Override rcvgap if there are packets in deferred queue */
1535                 if (!skb_queue_empty(&l->deferdq))
1536                         rcvgap = buf_seqno(skb_peek(&l->deferdq)) - rcv_nxt;
1537                 if (rcvgap) {
1538                         msg_set_seq_gap(hdr, rcvgap);
1539                         l->stats.sent_nacks++;
1540                 }
1541                 msg_set_ack(hdr, rcv_last);
1542                 msg_set_probe(hdr, probe);
1543                 if (probe)
1544                         l->stats.sent_probes++;
1545                 l->stats.sent_states++;
1546         } else {
1547                 /* RESET_MSG or ACTIVATE_MSG */
1548                 msg_set_max_pkt(hdr, l->advertised_mtu);
1549                 msg_set_ack(hdr, l->failover_checkpt - 1);
1550                 msg_set_next_sent(hdr, 1);
1551         }
1552         skb = tipc_buf_acquire(msg_size(hdr));
1553         if (!skb)
1554                 return;
1555         skb_copy_to_linear_data(skb, hdr, msg_size(hdr));
1556         skb->priority = TC_PRIO_CONTROL;
1557         __skb_queue_head(xmitq, skb);
1558 }
1559
1560 /* tipc_link_tunnel_xmit(): Tunnel one packet via a link belonging to
1561  * a different bearer. Owner node is locked.
1562  */
1563 static void tipc_link_tunnel_xmit(struct tipc_link *l_ptr,
1564                                   struct tipc_msg *tunnel_hdr,
1565                                   struct tipc_msg *msg,
1566                                   u32 selector)
1567 {
1568         struct tipc_link *tunnel;
1569         struct sk_buff *skb;
1570         u32 length = msg_size(msg);
1571
1572         tunnel = node_active_link(l_ptr->owner, selector & 1);
1573         if (!tipc_link_is_up(tunnel)) {
1574                 pr_warn("%stunnel link no longer available\n", link_co_err);
1575                 return;
1576         }
1577         msg_set_size(tunnel_hdr, length + INT_H_SIZE);
1578         skb = tipc_buf_acquire(length + INT_H_SIZE);
1579         if (!skb) {
1580                 pr_warn("%sunable to send tunnel msg\n", link_co_err);
1581                 return;
1582         }
1583         skb_copy_to_linear_data(skb, tunnel_hdr, INT_H_SIZE);
1584         skb_copy_to_linear_data_offset(skb, INT_H_SIZE, msg, length);
1585         __tipc_link_xmit_skb(tunnel, skb);
1586 }
1587
1588
1589 /* tipc_link_failover_send_queue(): A link has gone down, but a second
1590  * link is still active. We can do failover. Tunnel the failing link's
1591  * whole send queue via the remaining link. This way, we don't lose
1592  * any packets, and sequence order is preserved for subsequent traffic
1593  * sent over the remaining link. Owner node is locked.
1594  */
1595 void tipc_link_failover_send_queue(struct tipc_link *l_ptr)
1596 {
1597         int msgcount;
1598         struct tipc_link *tunnel = node_active_link(l_ptr->owner, 0);
1599         struct tipc_msg tunnel_hdr;
1600         struct sk_buff *skb;
1601         int split_bundles;
1602
1603         if (!tunnel)
1604                 return;
1605
1606         tipc_msg_init(link_own_addr(l_ptr), &tunnel_hdr, TUNNEL_PROTOCOL,
1607                       FAILOVER_MSG, INT_H_SIZE, l_ptr->addr);
1608
1609         skb_queue_walk(&l_ptr->backlogq, skb) {
1610                 msg_set_seqno(buf_msg(skb), l_ptr->snd_nxt);
1611                 l_ptr->snd_nxt = mod(l_ptr->snd_nxt + 1);
1612         }
1613         skb_queue_splice_tail_init(&l_ptr->backlogq, &l_ptr->transmq);
1614         tipc_link_purge_backlog(l_ptr);
1615         msgcount = skb_queue_len(&l_ptr->transmq);
1616         msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id);
1617         msg_set_msgcnt(&tunnel_hdr, msgcount);
1618
1619         if (skb_queue_empty(&l_ptr->transmq)) {
1620                 skb = tipc_buf_acquire(INT_H_SIZE);
1621                 if (skb) {
1622                         skb_copy_to_linear_data(skb, &tunnel_hdr, INT_H_SIZE);
1623                         msg_set_size(&tunnel_hdr, INT_H_SIZE);
1624                         __tipc_link_xmit_skb(tunnel, skb);
1625                 } else {
1626                         pr_warn("%sunable to send changeover msg\n",
1627                                 link_co_err);
1628                 }
1629                 return;
1630         }
1631
1632         split_bundles = (node_active_link(l_ptr->owner, 0) !=
1633                          node_active_link(l_ptr->owner, 0));
1634
1635         skb_queue_walk(&l_ptr->transmq, skb) {
1636                 struct tipc_msg *msg = buf_msg(skb);
1637
1638                 if ((msg_user(msg) == MSG_BUNDLER) && split_bundles) {
1639                         struct tipc_msg *m = msg_get_wrapped(msg);
1640                         unchar *pos = (unchar *)m;
1641
1642                         msgcount = msg_msgcnt(msg);
1643                         while (msgcount--) {
1644                                 msg_set_seqno(m, msg_seqno(msg));
1645                                 tipc_link_tunnel_xmit(l_ptr, &tunnel_hdr, m,
1646                                                       msg_link_selector(m));
1647                                 pos += align(msg_size(m));
1648                                 m = (struct tipc_msg *)pos;
1649                         }
1650                 } else {
1651                         tipc_link_tunnel_xmit(l_ptr, &tunnel_hdr, msg,
1652                                               msg_link_selector(msg));
1653                 }
1654         }
1655 }
1656
1657 /* tipc_link_dup_queue_xmit(): A second link has become active. Tunnel a
1658  * duplicate of the first link's send queue via the new link. This way, we
1659  * are guaranteed that currently queued packets from a socket are delivered
1660  * before future traffic from the same socket, even if this is using the
1661  * new link. The last arriving copy of each duplicate packet is dropped at
1662  * the receiving end by the regular protocol check, so packet cardinality
1663  * and sequence order is preserved per sender/receiver socket pair.
1664  * Owner node is locked.
1665  */
1666 void tipc_link_dup_queue_xmit(struct tipc_link *link,
1667                               struct tipc_link *tnl)
1668 {
1669         struct sk_buff *skb;
1670         struct tipc_msg tnl_hdr;
1671         struct sk_buff_head *queue = &link->transmq;
1672         int mcnt;
1673         u16 seqno;
1674
1675         tipc_msg_init(link_own_addr(link), &tnl_hdr, TUNNEL_PROTOCOL,
1676                       SYNCH_MSG, INT_H_SIZE, link->addr);
1677         mcnt = skb_queue_len(&link->transmq) + skb_queue_len(&link->backlogq);
1678         msg_set_msgcnt(&tnl_hdr, mcnt);
1679         msg_set_bearer_id(&tnl_hdr, link->peer_bearer_id);
1680
1681 tunnel_queue:
1682         skb_queue_walk(queue, skb) {
1683                 struct sk_buff *outskb;
1684                 struct tipc_msg *msg = buf_msg(skb);
1685                 u32 len = msg_size(msg);
1686
1687                 msg_set_ack(msg, mod(link->rcv_nxt - 1));
1688                 msg_set_bcast_ack(msg, link->owner->bclink.last_in);
1689                 msg_set_size(&tnl_hdr, len + INT_H_SIZE);
1690                 outskb = tipc_buf_acquire(len + INT_H_SIZE);
1691                 if (outskb == NULL) {
1692                         pr_warn("%sunable to send duplicate msg\n",
1693                                 link_co_err);
1694                         return;
1695                 }
1696                 skb_copy_to_linear_data(outskb, &tnl_hdr, INT_H_SIZE);
1697                 skb_copy_to_linear_data_offset(outskb, INT_H_SIZE,
1698                                                skb->data, len);
1699                 __tipc_link_xmit_skb(tnl, outskb);
1700                 if (!tipc_link_is_up(link))
1701                         return;
1702         }
1703         if (queue == &link->backlogq)
1704                 return;
1705         seqno = link->snd_nxt;
1706         skb_queue_walk(&link->backlogq, skb) {
1707                 msg_set_seqno(buf_msg(skb), seqno);
1708                 seqno = mod(seqno + 1);
1709         }
1710         queue = &link->backlogq;
1711         goto tunnel_queue;
1712 }
1713
1714 /*  tipc_link_failover_rcv(): Receive a tunnelled FAILOVER_MSG packet
1715  *  Owner node is locked.
1716  */
1717 static bool tipc_link_failover_rcv(struct tipc_link *link,
1718                                    struct sk_buff **skb)
1719 {
1720         struct tipc_msg *msg = buf_msg(*skb);
1721         struct sk_buff *iskb = NULL;
1722         struct tipc_link *pl = NULL;
1723         int bearer_id = msg_bearer_id(msg);
1724         int pos = 0;
1725
1726         if (msg_type(msg) != FAILOVER_MSG) {
1727                 pr_warn("%sunknown tunnel pkt received\n", link_co_err);
1728                 goto exit;
1729         }
1730         if (bearer_id >= MAX_BEARERS)
1731                 goto exit;
1732
1733         if (bearer_id == link->bearer_id)
1734                 goto exit;
1735
1736         pl = link->owner->links[bearer_id].link;
1737         if (pl && tipc_link_is_up(pl))
1738                 tipc_link_reset(pl);
1739
1740         if (link->failover_pkts == FIRST_FAILOVER)
1741                 link->failover_pkts = msg_msgcnt(msg);
1742
1743         /* Should we expect an inner packet? */
1744         if (!link->failover_pkts)
1745                 goto exit;
1746
1747         if (!tipc_msg_extract(*skb, &iskb, &pos)) {
1748                 pr_warn("%sno inner failover pkt\n", link_co_err);
1749                 *skb = NULL;
1750                 goto exit;
1751         }
1752         link->failover_pkts--;
1753         *skb = NULL;
1754
1755         /* Was this packet already delivered? */
1756         if (less(buf_seqno(iskb), link->failover_checkpt)) {
1757                 kfree_skb(iskb);
1758                 iskb = NULL;
1759                 goto exit;
1760         }
1761         if (msg_user(buf_msg(iskb)) == MSG_FRAGMENTER) {
1762                 link->stats.recv_fragments++;
1763                 tipc_buf_append(&link->failover_skb, &iskb);
1764         }
1765 exit:
1766         if (!link->failover_pkts && pl)
1767                 pl->exec_mode = TIPC_LINK_OPEN;
1768         kfree_skb(*skb);
1769         *skb = iskb;
1770         return *skb;
1771 }
1772
1773 static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tol)
1774 {
1775         unsigned long intv = ((tol / 4) > 500) ? 500 : tol / 4;
1776
1777         if ((tol < TIPC_MIN_LINK_TOL) || (tol > TIPC_MAX_LINK_TOL))
1778                 return;
1779
1780         l_ptr->tolerance = tol;
1781         l_ptr->keepalive_intv = msecs_to_jiffies(intv);
1782         l_ptr->abort_limit = tol / (jiffies_to_msecs(l_ptr->keepalive_intv));
1783 }
1784
1785 void tipc_link_set_queue_limits(struct tipc_link *l, u32 win)
1786 {
1787         int max_bulk = TIPC_MAX_PUBLICATIONS / (l->mtu / ITEM_SIZE);
1788
1789         l->window = win;
1790         l->backlog[TIPC_LOW_IMPORTANCE].limit      = win / 2;
1791         l->backlog[TIPC_MEDIUM_IMPORTANCE].limit   = win;
1792         l->backlog[TIPC_HIGH_IMPORTANCE].limit     = win / 2 * 3;
1793         l->backlog[TIPC_CRITICAL_IMPORTANCE].limit = win * 2;
1794         l->backlog[TIPC_SYSTEM_IMPORTANCE].limit   = max_bulk;
1795 }
1796
1797 /* tipc_link_find_owner - locate owner node of link by link's name
1798  * @net: the applicable net namespace
1799  * @name: pointer to link name string
1800  * @bearer_id: pointer to index in 'node->links' array where the link was found.
1801  *
1802  * Returns pointer to node owning the link, or 0 if no matching link is found.
1803  */
1804 static struct tipc_node *tipc_link_find_owner(struct net *net,
1805                                               const char *link_name,
1806                                               unsigned int *bearer_id)
1807 {
1808         struct tipc_net *tn = net_generic(net, tipc_net_id);
1809         struct tipc_link *l_ptr;
1810         struct tipc_node *n_ptr;
1811         struct tipc_node *found_node = NULL;
1812         int i;
1813
1814         *bearer_id = 0;
1815         rcu_read_lock();
1816         list_for_each_entry_rcu(n_ptr, &tn->node_list, list) {
1817                 tipc_node_lock(n_ptr);
1818                 for (i = 0; i < MAX_BEARERS; i++) {
1819                         l_ptr = n_ptr->links[i].link;
1820                         if (l_ptr && !strcmp(l_ptr->name, link_name)) {
1821                                 *bearer_id = i;
1822                                 found_node = n_ptr;
1823                                 break;
1824                         }
1825                 }
1826                 tipc_node_unlock(n_ptr);
1827                 if (found_node)
1828                         break;
1829         }
1830         rcu_read_unlock();
1831
1832         return found_node;
1833 }
1834
1835 /**
1836  * link_reset_statistics - reset link statistics
1837  * @l_ptr: pointer to link
1838  */
1839 static void link_reset_statistics(struct tipc_link *l_ptr)
1840 {
1841         memset(&l_ptr->stats, 0, sizeof(l_ptr->stats));
1842         l_ptr->stats.sent_info = l_ptr->snd_nxt;
1843         l_ptr->stats.recv_info = l_ptr->rcv_nxt;
1844 }
1845
1846 static void link_print(struct tipc_link *l_ptr, const char *str)
1847 {
1848         struct tipc_net *tn = net_generic(l_ptr->owner->net, tipc_net_id);
1849         struct tipc_bearer *b_ptr;
1850
1851         rcu_read_lock();
1852         b_ptr = rcu_dereference_rtnl(tn->bearer_list[l_ptr->bearer_id]);
1853         if (b_ptr)
1854                 pr_info("%s Link %x<%s>:", str, l_ptr->addr, b_ptr->name);
1855         rcu_read_unlock();
1856
1857         if (link_probing(l_ptr))
1858                 pr_cont(":P\n");
1859         else if (link_establishing(l_ptr))
1860                 pr_cont(":E\n");
1861         else if (link_resetting(l_ptr))
1862                 pr_cont(":R\n");
1863         else if (link_working(l_ptr))
1864                 pr_cont(":W\n");
1865         else
1866                 pr_cont("\n");
1867 }
1868
1869 /* Parse and validate nested (link) properties valid for media, bearer and link
1870  */
1871 int tipc_nl_parse_link_prop(struct nlattr *prop, struct nlattr *props[])
1872 {
1873         int err;
1874
1875         err = nla_parse_nested(props, TIPC_NLA_PROP_MAX, prop,
1876                                tipc_nl_prop_policy);
1877         if (err)
1878                 return err;
1879
1880         if (props[TIPC_NLA_PROP_PRIO]) {
1881                 u32 prio;
1882
1883                 prio = nla_get_u32(props[TIPC_NLA_PROP_PRIO]);
1884                 if (prio > TIPC_MAX_LINK_PRI)
1885                         return -EINVAL;
1886         }
1887
1888         if (props[TIPC_NLA_PROP_TOL]) {
1889                 u32 tol;
1890
1891                 tol = nla_get_u32(props[TIPC_NLA_PROP_TOL]);
1892                 if ((tol < TIPC_MIN_LINK_TOL) || (tol > TIPC_MAX_LINK_TOL))
1893                         return -EINVAL;
1894         }
1895
1896         if (props[TIPC_NLA_PROP_WIN]) {
1897                 u32 win;
1898
1899                 win = nla_get_u32(props[TIPC_NLA_PROP_WIN]);
1900                 if ((win < TIPC_MIN_LINK_WIN) || (win > TIPC_MAX_LINK_WIN))
1901                         return -EINVAL;
1902         }
1903
1904         return 0;
1905 }
1906
1907 int tipc_nl_link_set(struct sk_buff *skb, struct genl_info *info)
1908 {
1909         int err;
1910         int res = 0;
1911         int bearer_id;
1912         char *name;
1913         struct tipc_link *link;
1914         struct tipc_node *node;
1915         struct nlattr *attrs[TIPC_NLA_LINK_MAX + 1];
1916         struct net *net = sock_net(skb->sk);
1917
1918         if (!info->attrs[TIPC_NLA_LINK])
1919                 return -EINVAL;
1920
1921         err = nla_parse_nested(attrs, TIPC_NLA_LINK_MAX,
1922                                info->attrs[TIPC_NLA_LINK],
1923                                tipc_nl_link_policy);
1924         if (err)
1925                 return err;
1926
1927         if (!attrs[TIPC_NLA_LINK_NAME])
1928                 return -EINVAL;
1929
1930         name = nla_data(attrs[TIPC_NLA_LINK_NAME]);
1931
1932         if (strcmp(name, tipc_bclink_name) == 0)
1933                 return tipc_nl_bc_link_set(net, attrs);
1934
1935         node = tipc_link_find_owner(net, name, &bearer_id);
1936         if (!node)
1937                 return -EINVAL;
1938
1939         tipc_node_lock(node);
1940
1941         link = node->links[bearer_id].link;
1942         if (!link) {
1943                 res = -EINVAL;
1944                 goto out;
1945         }
1946
1947         if (attrs[TIPC_NLA_LINK_PROP]) {
1948                 struct nlattr *props[TIPC_NLA_PROP_MAX + 1];
1949
1950                 err = tipc_nl_parse_link_prop(attrs[TIPC_NLA_LINK_PROP],
1951                                               props);
1952                 if (err) {
1953                         res = err;
1954                         goto out;
1955                 }
1956
1957                 if (props[TIPC_NLA_PROP_TOL]) {
1958                         u32 tol;
1959
1960                         tol = nla_get_u32(props[TIPC_NLA_PROP_TOL]);
1961                         link_set_supervision_props(link, tol);
1962                         tipc_link_proto_xmit(link, STATE_MSG, 0, 0, tol, 0);
1963                 }
1964                 if (props[TIPC_NLA_PROP_PRIO]) {
1965                         u32 prio;
1966
1967                         prio = nla_get_u32(props[TIPC_NLA_PROP_PRIO]);
1968                         link->priority = prio;
1969                         tipc_link_proto_xmit(link, STATE_MSG, 0, 0, 0, prio);
1970                 }
1971                 if (props[TIPC_NLA_PROP_WIN]) {
1972                         u32 win;
1973
1974                         win = nla_get_u32(props[TIPC_NLA_PROP_WIN]);
1975                         tipc_link_set_queue_limits(link, win);
1976                 }
1977         }
1978
1979 out:
1980         tipc_node_unlock(node);
1981
1982         return res;
1983 }
1984
1985 static int __tipc_nl_add_stats(struct sk_buff *skb, struct tipc_stats *s)
1986 {
1987         int i;
1988         struct nlattr *stats;
1989
1990         struct nla_map {
1991                 u32 key;
1992                 u32 val;
1993         };
1994
1995         struct nla_map map[] = {
1996                 {TIPC_NLA_STATS_RX_INFO, s->recv_info},
1997                 {TIPC_NLA_STATS_RX_FRAGMENTS, s->recv_fragments},
1998                 {TIPC_NLA_STATS_RX_FRAGMENTED, s->recv_fragmented},
1999                 {TIPC_NLA_STATS_RX_BUNDLES, s->recv_bundles},
2000                 {TIPC_NLA_STATS_RX_BUNDLED, s->recv_bundled},
2001                 {TIPC_NLA_STATS_TX_INFO, s->sent_info},
2002                 {TIPC_NLA_STATS_TX_FRAGMENTS, s->sent_fragments},
2003                 {TIPC_NLA_STATS_TX_FRAGMENTED, s->sent_fragmented},
2004                 {TIPC_NLA_STATS_TX_BUNDLES, s->sent_bundles},
2005                 {TIPC_NLA_STATS_TX_BUNDLED, s->sent_bundled},
2006                 {TIPC_NLA_STATS_MSG_PROF_TOT, (s->msg_length_counts) ?
2007                         s->msg_length_counts : 1},
2008                 {TIPC_NLA_STATS_MSG_LEN_CNT, s->msg_length_counts},
2009                 {TIPC_NLA_STATS_MSG_LEN_TOT, s->msg_lengths_total},
2010                 {TIPC_NLA_STATS_MSG_LEN_P0, s->msg_length_profile[0]},
2011                 {TIPC_NLA_STATS_MSG_LEN_P1, s->msg_length_profile[1]},
2012                 {TIPC_NLA_STATS_MSG_LEN_P2, s->msg_length_profile[2]},
2013                 {TIPC_NLA_STATS_MSG_LEN_P3, s->msg_length_profile[3]},
2014                 {TIPC_NLA_STATS_MSG_LEN_P4, s->msg_length_profile[4]},
2015                 {TIPC_NLA_STATS_MSG_LEN_P5, s->msg_length_profile[5]},
2016                 {TIPC_NLA_STATS_MSG_LEN_P6, s->msg_length_profile[6]},
2017                 {TIPC_NLA_STATS_RX_STATES, s->recv_states},
2018                 {TIPC_NLA_STATS_RX_PROBES, s->recv_probes},
2019                 {TIPC_NLA_STATS_RX_NACKS, s->recv_nacks},
2020                 {TIPC_NLA_STATS_RX_DEFERRED, s->deferred_recv},
2021                 {TIPC_NLA_STATS_TX_STATES, s->sent_states},
2022                 {TIPC_NLA_STATS_TX_PROBES, s->sent_probes},
2023                 {TIPC_NLA_STATS_TX_NACKS, s->sent_nacks},
2024                 {TIPC_NLA_STATS_TX_ACKS, s->sent_acks},
2025                 {TIPC_NLA_STATS_RETRANSMITTED, s->retransmitted},
2026                 {TIPC_NLA_STATS_DUPLICATES, s->duplicates},
2027                 {TIPC_NLA_STATS_LINK_CONGS, s->link_congs},
2028                 {TIPC_NLA_STATS_MAX_QUEUE, s->max_queue_sz},
2029                 {TIPC_NLA_STATS_AVG_QUEUE, s->queue_sz_counts ?
2030                         (s->accu_queue_sz / s->queue_sz_counts) : 0}
2031         };
2032
2033         stats = nla_nest_start(skb, TIPC_NLA_LINK_STATS);
2034         if (!stats)
2035                 return -EMSGSIZE;
2036
2037         for (i = 0; i <  ARRAY_SIZE(map); i++)
2038                 if (nla_put_u32(skb, map[i].key, map[i].val))
2039                         goto msg_full;
2040
2041         nla_nest_end(skb, stats);
2042
2043         return 0;
2044 msg_full:
2045         nla_nest_cancel(skb, stats);
2046
2047         return -EMSGSIZE;
2048 }
2049
2050 /* Caller should hold appropriate locks to protect the link */
2051 static int __tipc_nl_add_link(struct net *net, struct tipc_nl_msg *msg,
2052                               struct tipc_link *link, int nlflags)
2053 {
2054         int err;
2055         void *hdr;
2056         struct nlattr *attrs;
2057         struct nlattr *prop;
2058         struct tipc_net *tn = net_generic(net, tipc_net_id);
2059
2060         hdr = genlmsg_put(msg->skb, msg->portid, msg->seq, &tipc_genl_family,
2061                           nlflags, TIPC_NL_LINK_GET);
2062         if (!hdr)
2063                 return -EMSGSIZE;
2064
2065         attrs = nla_nest_start(msg->skb, TIPC_NLA_LINK);
2066         if (!attrs)
2067                 goto msg_full;
2068
2069         if (nla_put_string(msg->skb, TIPC_NLA_LINK_NAME, link->name))
2070                 goto attr_msg_full;
2071         if (nla_put_u32(msg->skb, TIPC_NLA_LINK_DEST,
2072                         tipc_cluster_mask(tn->own_addr)))
2073                 goto attr_msg_full;
2074         if (nla_put_u32(msg->skb, TIPC_NLA_LINK_MTU, link->mtu))
2075                 goto attr_msg_full;
2076         if (nla_put_u32(msg->skb, TIPC_NLA_LINK_RX, link->rcv_nxt))
2077                 goto attr_msg_full;
2078         if (nla_put_u32(msg->skb, TIPC_NLA_LINK_TX, link->snd_nxt))
2079                 goto attr_msg_full;
2080
2081         if (tipc_link_is_up(link))
2082                 if (nla_put_flag(msg->skb, TIPC_NLA_LINK_UP))
2083                         goto attr_msg_full;
2084         if (tipc_link_is_active(link))
2085                 if (nla_put_flag(msg->skb, TIPC_NLA_LINK_ACTIVE))
2086                         goto attr_msg_full;
2087
2088         prop = nla_nest_start(msg->skb, TIPC_NLA_LINK_PROP);
2089         if (!prop)
2090                 goto attr_msg_full;
2091         if (nla_put_u32(msg->skb, TIPC_NLA_PROP_PRIO, link->priority))
2092                 goto prop_msg_full;
2093         if (nla_put_u32(msg->skb, TIPC_NLA_PROP_TOL, link->tolerance))
2094                 goto prop_msg_full;
2095         if (nla_put_u32(msg->skb, TIPC_NLA_PROP_WIN,
2096                         link->window))
2097                 goto prop_msg_full;
2098         if (nla_put_u32(msg->skb, TIPC_NLA_PROP_PRIO, link->priority))
2099                 goto prop_msg_full;
2100         nla_nest_end(msg->skb, prop);
2101
2102         err = __tipc_nl_add_stats(msg->skb, &link->stats);
2103         if (err)
2104                 goto attr_msg_full;
2105
2106         nla_nest_end(msg->skb, attrs);
2107         genlmsg_end(msg->skb, hdr);
2108
2109         return 0;
2110
2111 prop_msg_full:
2112         nla_nest_cancel(msg->skb, prop);
2113 attr_msg_full:
2114         nla_nest_cancel(msg->skb, attrs);
2115 msg_full:
2116         genlmsg_cancel(msg->skb, hdr);
2117
2118         return -EMSGSIZE;
2119 }
2120
2121 /* Caller should hold node lock  */
2122 static int __tipc_nl_add_node_links(struct net *net, struct tipc_nl_msg *msg,
2123                                     struct tipc_node *node, u32 *prev_link)
2124 {
2125         u32 i;
2126         int err;
2127
2128         for (i = *prev_link; i < MAX_BEARERS; i++) {
2129                 *prev_link = i;
2130
2131                 if (!node->links[i].link)
2132                         continue;
2133
2134                 err = __tipc_nl_add_link(net, msg,
2135                                          node->links[i].link, NLM_F_MULTI);
2136                 if (err)
2137                         return err;
2138         }
2139         *prev_link = 0;
2140
2141         return 0;
2142 }
2143
2144 int tipc_nl_link_dump(struct sk_buff *skb, struct netlink_callback *cb)
2145 {
2146         struct net *net = sock_net(skb->sk);
2147         struct tipc_net *tn = net_generic(net, tipc_net_id);
2148         struct tipc_node *node;
2149         struct tipc_nl_msg msg;
2150         u32 prev_node = cb->args[0];
2151         u32 prev_link = cb->args[1];
2152         int done = cb->args[2];
2153         int err;
2154
2155         if (done)
2156                 return 0;
2157
2158         msg.skb = skb;
2159         msg.portid = NETLINK_CB(cb->skb).portid;
2160         msg.seq = cb->nlh->nlmsg_seq;
2161
2162         rcu_read_lock();
2163         if (prev_node) {
2164                 node = tipc_node_find(net, prev_node);
2165                 if (!node) {
2166                         /* We never set seq or call nl_dump_check_consistent()
2167                          * this means that setting prev_seq here will cause the
2168                          * consistence check to fail in the netlink callback
2169                          * handler. Resulting in the last NLMSG_DONE message
2170                          * having the NLM_F_DUMP_INTR flag set.
2171                          */
2172                         cb->prev_seq = 1;
2173                         goto out;
2174                 }
2175                 tipc_node_put(node);
2176
2177                 list_for_each_entry_continue_rcu(node, &tn->node_list,
2178                                                  list) {
2179                         tipc_node_lock(node);
2180                         err = __tipc_nl_add_node_links(net, &msg, node,
2181                                                        &prev_link);
2182                         tipc_node_unlock(node);
2183                         if (err)
2184                                 goto out;
2185
2186                         prev_node = node->addr;
2187                 }
2188         } else {
2189                 err = tipc_nl_add_bc_link(net, &msg);
2190                 if (err)
2191                         goto out;
2192
2193                 list_for_each_entry_rcu(node, &tn->node_list, list) {
2194                         tipc_node_lock(node);
2195                         err = __tipc_nl_add_node_links(net, &msg, node,
2196                                                        &prev_link);
2197                         tipc_node_unlock(node);
2198                         if (err)
2199                                 goto out;
2200
2201                         prev_node = node->addr;
2202                 }
2203         }
2204         done = 1;
2205 out:
2206         rcu_read_unlock();
2207
2208         cb->args[0] = prev_node;
2209         cb->args[1] = prev_link;
2210         cb->args[2] = done;
2211
2212         return skb->len;
2213 }
2214
2215 int tipc_nl_link_get(struct sk_buff *skb, struct genl_info *info)
2216 {
2217         struct net *net = genl_info_net(info);
2218         struct tipc_nl_msg msg;
2219         char *name;
2220         int err;
2221
2222         msg.portid = info->snd_portid;
2223         msg.seq = info->snd_seq;
2224
2225         if (!info->attrs[TIPC_NLA_LINK_NAME])
2226                 return -EINVAL;
2227         name = nla_data(info->attrs[TIPC_NLA_LINK_NAME]);
2228
2229         msg.skb = nlmsg_new(NLMSG_GOODSIZE, GFP_KERNEL);
2230         if (!msg.skb)
2231                 return -ENOMEM;
2232
2233         if (strcmp(name, tipc_bclink_name) == 0) {
2234                 err = tipc_nl_add_bc_link(net, &msg);
2235                 if (err) {
2236                         nlmsg_free(msg.skb);
2237                         return err;
2238                 }
2239         } else {
2240                 int bearer_id;
2241                 struct tipc_node *node;
2242                 struct tipc_link *link;
2243
2244                 node = tipc_link_find_owner(net, name, &bearer_id);
2245                 if (!node)
2246                         return -EINVAL;
2247
2248                 tipc_node_lock(node);
2249                 link = node->links[bearer_id].link;
2250                 if (!link) {
2251                         tipc_node_unlock(node);
2252                         nlmsg_free(msg.skb);
2253                         return -EINVAL;
2254                 }
2255
2256                 err = __tipc_nl_add_link(net, &msg, link, 0);
2257                 tipc_node_unlock(node);
2258                 if (err) {
2259                         nlmsg_free(msg.skb);
2260                         return err;
2261                 }
2262         }
2263
2264         return genlmsg_reply(msg.skb, info);
2265 }
2266
2267 int tipc_nl_link_reset_stats(struct sk_buff *skb, struct genl_info *info)
2268 {
2269         int err;
2270         char *link_name;
2271         unsigned int bearer_id;
2272         struct tipc_link *link;
2273         struct tipc_node *node;
2274         struct nlattr *attrs[TIPC_NLA_LINK_MAX + 1];
2275         struct net *net = sock_net(skb->sk);
2276
2277         if (!info->attrs[TIPC_NLA_LINK])
2278                 return -EINVAL;
2279
2280         err = nla_parse_nested(attrs, TIPC_NLA_LINK_MAX,
2281                                info->attrs[TIPC_NLA_LINK],
2282                                tipc_nl_link_policy);
2283         if (err)
2284                 return err;
2285
2286         if (!attrs[TIPC_NLA_LINK_NAME])
2287                 return -EINVAL;
2288
2289         link_name = nla_data(attrs[TIPC_NLA_LINK_NAME]);
2290
2291         if (strcmp(link_name, tipc_bclink_name) == 0) {
2292                 err = tipc_bclink_reset_stats(net);
2293                 if (err)
2294                         return err;
2295                 return 0;
2296         }
2297
2298         node = tipc_link_find_owner(net, link_name, &bearer_id);
2299         if (!node)
2300                 return -EINVAL;
2301
2302         tipc_node_lock(node);
2303
2304         link = node->links[bearer_id].link;
2305         if (!link) {
2306                 tipc_node_unlock(node);
2307                 return -EINVAL;
2308         }
2309
2310         link_reset_statistics(link);
2311
2312         tipc_node_unlock(node);
2313
2314         return 0;
2315 }