7b027e99a5c945e2c96d7c95581905322a195897
[cascardo/linux.git] / net / tipc / port.c
1 /*
2  * net/tipc/port.c: TIPC port code
3  *
4  * Copyright (c) 1992-2007, 2014, Ericsson AB
5  * Copyright (c) 2004-2008, 2010-2013, Wind River Systems
6  * All rights reserved.
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in the
15  *    documentation and/or other materials provided with the distribution.
16  * 3. Neither the names of the copyright holders nor the names of its
17  *    contributors may be used to endorse or promote products derived from
18  *    this software without specific prior written permission.
19  *
20  * Alternatively, this software may be distributed under the terms of the
21  * GNU General Public License ("GPL") version 2 as published by the Free
22  * Software Foundation.
23  *
24  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
25  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
26  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
27  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
28  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
29  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
30  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
31  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
32  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
33  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
34  * POSSIBILITY OF SUCH DAMAGE.
35  */
36
37 #include "core.h"
38 #include "config.h"
39 #include "port.h"
40 #include "name_table.h"
41 #include "socket.h"
42
43 /* Connection management: */
44 #define PROBING_INTERVAL 3600000        /* [ms] => 1 h */
45 #define CONFIRMED 0
46 #define PROBING 1
47
48 #define MAX_REJECT_SIZE 1024
49
50 DEFINE_SPINLOCK(tipc_port_list_lock);
51
52 static LIST_HEAD(ports);
53 static void port_handle_node_down(unsigned long ref);
54 static struct sk_buff *port_build_self_abort_msg(struct tipc_port *, u32 err);
55 static struct sk_buff *port_build_peer_abort_msg(struct tipc_port *, u32 err);
56 static void port_timeout(unsigned long ref);
57
58 /**
59  * tipc_port_peer_msg - verify message was sent by connected port's peer
60  *
61  * Handles cases where the node's network address has changed from
62  * the default of <0.0.0> to its configured setting.
63  */
64 int tipc_port_peer_msg(struct tipc_port *p_ptr, struct tipc_msg *msg)
65 {
66         u32 peernode;
67         u32 orignode;
68
69         if (msg_origport(msg) != tipc_port_peerport(p_ptr))
70                 return 0;
71
72         orignode = msg_orignode(msg);
73         peernode = tipc_port_peernode(p_ptr);
74         return (orignode == peernode) ||
75                 (!orignode && (peernode == tipc_own_addr)) ||
76                 (!peernode && (orignode == tipc_own_addr));
77 }
78
79 /**
80  * tipc_port_mcast_xmit - send a multicast message to local and remote
81  * destinations
82  */
83 int tipc_port_mcast_xmit(u32 ref, struct tipc_name_seq const *seq,
84                          struct iovec const *msg_sect, unsigned int len)
85 {
86         struct tipc_msg *hdr;
87         struct sk_buff *buf;
88         struct sk_buff *ibuf = NULL;
89         struct tipc_port_list dports = {0, NULL, };
90         struct tipc_port *oport = tipc_port_deref(ref);
91         int ext_targets;
92         int res;
93
94         if (unlikely(!oport))
95                 return -EINVAL;
96
97         /* Create multicast message */
98         hdr = &oport->phdr;
99         msg_set_type(hdr, TIPC_MCAST_MSG);
100         msg_set_lookup_scope(hdr, TIPC_CLUSTER_SCOPE);
101         msg_set_destport(hdr, 0);
102         msg_set_destnode(hdr, 0);
103         msg_set_nametype(hdr, seq->type);
104         msg_set_namelower(hdr, seq->lower);
105         msg_set_nameupper(hdr, seq->upper);
106         msg_set_hdr_sz(hdr, MCAST_H_SIZE);
107         res = tipc_msg_build(hdr, msg_sect, len, MAX_MSG_SIZE, &buf);
108         if (unlikely(!buf))
109                 return res;
110
111         /* Figure out where to send multicast message */
112         ext_targets = tipc_nametbl_mc_translate(seq->type, seq->lower, seq->upper,
113                                                 TIPC_NODE_SCOPE, &dports);
114
115         /* Send message to destinations (duplicate it only if necessary) */
116         if (ext_targets) {
117                 if (dports.count != 0) {
118                         ibuf = skb_copy(buf, GFP_ATOMIC);
119                         if (ibuf == NULL) {
120                                 tipc_port_list_free(&dports);
121                                 kfree_skb(buf);
122                                 return -ENOMEM;
123                         }
124                 }
125                 res = tipc_bclink_xmit(buf);
126                 if ((res < 0) && (dports.count != 0))
127                         kfree_skb(ibuf);
128         } else {
129                 ibuf = buf;
130         }
131
132         if (res >= 0) {
133                 if (ibuf)
134                         tipc_port_mcast_rcv(ibuf, &dports);
135         } else {
136                 tipc_port_list_free(&dports);
137         }
138         return res;
139 }
140
141 /**
142  * tipc_port_mcast_rcv - deliver multicast message to all destination ports
143  *
144  * If there is no port list, perform a lookup to create one
145  */
146 void tipc_port_mcast_rcv(struct sk_buff *buf, struct tipc_port_list *dp)
147 {
148         struct tipc_msg *msg;
149         struct tipc_port_list dports = {0, NULL, };
150         struct tipc_port_list *item = dp;
151         int cnt = 0;
152
153         msg = buf_msg(buf);
154
155         /* Create destination port list, if one wasn't supplied */
156         if (dp == NULL) {
157                 tipc_nametbl_mc_translate(msg_nametype(msg),
158                                      msg_namelower(msg),
159                                      msg_nameupper(msg),
160                                      TIPC_CLUSTER_SCOPE,
161                                      &dports);
162                 item = dp = &dports;
163         }
164
165         /* Deliver a copy of message to each destination port */
166         if (dp->count != 0) {
167                 msg_set_destnode(msg, tipc_own_addr);
168                 if (dp->count == 1) {
169                         msg_set_destport(msg, dp->ports[0]);
170                         tipc_port_rcv(buf);
171                         tipc_port_list_free(dp);
172                         return;
173                 }
174                 for (; cnt < dp->count; cnt++) {
175                         int index = cnt % PLSIZE;
176                         struct sk_buff *b = skb_clone(buf, GFP_ATOMIC);
177
178                         if (b == NULL) {
179                                 pr_warn("Unable to deliver multicast message(s)\n");
180                                 goto exit;
181                         }
182                         if ((index == 0) && (cnt != 0))
183                                 item = item->next;
184                         msg_set_destport(buf_msg(b), item->ports[index]);
185                         tipc_port_rcv(b);
186                 }
187         }
188 exit:
189         kfree_skb(buf);
190         tipc_port_list_free(dp);
191 }
192
193 /**
194  * tipc_createport - create a generic TIPC port
195  *
196  * Returns pointer to (locked) TIPC port, or NULL if unable to create it
197  */
198 struct tipc_port *tipc_createport(struct sock *sk,
199                                   u32 (*dispatcher)(struct tipc_port *,
200                                   struct sk_buff *),
201                                   void (*wakeup)(struct tipc_port *),
202                                   const u32 importance)
203 {
204         struct tipc_port *p_ptr = tipc_sk_port(sk);
205         struct tipc_msg *msg;
206         u32 ref;
207
208         ref = tipc_ref_acquire(p_ptr, &p_ptr->lock);
209         if (!ref) {
210                 pr_warn("Port registration failed, ref. table exhausted\n");
211                 return NULL;
212         }
213
214         p_ptr->max_pkt = MAX_PKT_DEFAULT;
215         p_ptr->ref = ref;
216         INIT_LIST_HEAD(&p_ptr->wait_list);
217         INIT_LIST_HEAD(&p_ptr->subscription.nodesub_list);
218         p_ptr->dispatcher = dispatcher;
219         p_ptr->wakeup = wakeup;
220         k_init_timer(&p_ptr->timer, (Handler)port_timeout, ref);
221         INIT_LIST_HEAD(&p_ptr->publications);
222         INIT_LIST_HEAD(&p_ptr->port_list);
223
224         /*
225          * Must hold port list lock while initializing message header template
226          * to ensure a change to node's own network address doesn't result
227          * in template containing out-dated network address information
228          */
229         spin_lock_bh(&tipc_port_list_lock);
230         msg = &p_ptr->phdr;
231         tipc_msg_init(msg, importance, TIPC_NAMED_MSG, NAMED_H_SIZE, 0);
232         msg_set_origport(msg, ref);
233         list_add_tail(&p_ptr->port_list, &ports);
234         spin_unlock_bh(&tipc_port_list_lock);
235         return p_ptr;
236 }
237
238 int tipc_deleteport(struct tipc_port *p_ptr)
239 {
240         struct sk_buff *buf = NULL;
241
242         tipc_withdraw(p_ptr, 0, NULL);
243
244         spin_lock_bh(p_ptr->lock);
245         tipc_ref_discard(p_ptr->ref);
246         spin_unlock_bh(p_ptr->lock);
247
248         k_cancel_timer(&p_ptr->timer);
249         if (p_ptr->connected) {
250                 buf = port_build_peer_abort_msg(p_ptr, TIPC_ERR_NO_PORT);
251                 tipc_nodesub_unsubscribe(&p_ptr->subscription);
252         }
253
254         spin_lock_bh(&tipc_port_list_lock);
255         list_del(&p_ptr->port_list);
256         list_del(&p_ptr->wait_list);
257         spin_unlock_bh(&tipc_port_list_lock);
258         k_term_timer(&p_ptr->timer);
259         tipc_net_route_msg(buf);
260         return 0;
261 }
262
263 static int port_unreliable(struct tipc_port *p_ptr)
264 {
265         return msg_src_droppable(&p_ptr->phdr);
266 }
267
268 int tipc_portunreliable(u32 ref, unsigned int *isunreliable)
269 {
270         struct tipc_port *p_ptr;
271
272         p_ptr = tipc_port_lock(ref);
273         if (!p_ptr)
274                 return -EINVAL;
275         *isunreliable = port_unreliable(p_ptr);
276         tipc_port_unlock(p_ptr);
277         return 0;
278 }
279
280 int tipc_set_portunreliable(u32 ref, unsigned int isunreliable)
281 {
282         struct tipc_port *p_ptr;
283
284         p_ptr = tipc_port_lock(ref);
285         if (!p_ptr)
286                 return -EINVAL;
287         msg_set_src_droppable(&p_ptr->phdr, (isunreliable != 0));
288         tipc_port_unlock(p_ptr);
289         return 0;
290 }
291
292 static int port_unreturnable(struct tipc_port *p_ptr)
293 {
294         return msg_dest_droppable(&p_ptr->phdr);
295 }
296
297 int tipc_portunreturnable(u32 ref, unsigned int *isunrejectable)
298 {
299         struct tipc_port *p_ptr;
300
301         p_ptr = tipc_port_lock(ref);
302         if (!p_ptr)
303                 return -EINVAL;
304         *isunrejectable = port_unreturnable(p_ptr);
305         tipc_port_unlock(p_ptr);
306         return 0;
307 }
308
309 int tipc_set_portunreturnable(u32 ref, unsigned int isunrejectable)
310 {
311         struct tipc_port *p_ptr;
312
313         p_ptr = tipc_port_lock(ref);
314         if (!p_ptr)
315                 return -EINVAL;
316         msg_set_dest_droppable(&p_ptr->phdr, (isunrejectable != 0));
317         tipc_port_unlock(p_ptr);
318         return 0;
319 }
320
321 /*
322  * port_build_proto_msg(): create connection protocol message for port
323  *
324  * On entry the port must be locked and connected.
325  */
326 static struct sk_buff *port_build_proto_msg(struct tipc_port *p_ptr,
327                                             u32 type, u32 ack)
328 {
329         struct sk_buff *buf;
330         struct tipc_msg *msg;
331
332         buf = tipc_buf_acquire(INT_H_SIZE);
333         if (buf) {
334                 msg = buf_msg(buf);
335                 tipc_msg_init(msg, CONN_MANAGER, type, INT_H_SIZE,
336                               tipc_port_peernode(p_ptr));
337                 msg_set_destport(msg, tipc_port_peerport(p_ptr));
338                 msg_set_origport(msg, p_ptr->ref);
339                 msg_set_msgcnt(msg, ack);
340         }
341         return buf;
342 }
343
344 int tipc_reject_msg(struct sk_buff *buf, u32 err)
345 {
346         struct tipc_msg *msg = buf_msg(buf);
347         struct sk_buff *rbuf;
348         struct tipc_msg *rmsg;
349         int hdr_sz;
350         u32 imp;
351         u32 data_sz = msg_data_sz(msg);
352         u32 src_node;
353         u32 rmsg_sz;
354
355         /* discard rejected message if it shouldn't be returned to sender */
356         if (WARN(!msg_isdata(msg),
357                  "attempt to reject message with user=%u", msg_user(msg))) {
358                 dump_stack();
359                 goto exit;
360         }
361         if (msg_errcode(msg) || msg_dest_droppable(msg))
362                 goto exit;
363
364         /*
365          * construct returned message by copying rejected message header and
366          * data (or subset), then updating header fields that need adjusting
367          */
368         hdr_sz = msg_hdr_sz(msg);
369         rmsg_sz = hdr_sz + min_t(u32, data_sz, MAX_REJECT_SIZE);
370
371         rbuf = tipc_buf_acquire(rmsg_sz);
372         if (rbuf == NULL)
373                 goto exit;
374
375         rmsg = buf_msg(rbuf);
376         skb_copy_to_linear_data(rbuf, msg, rmsg_sz);
377
378         if (msg_connected(rmsg)) {
379                 imp = msg_importance(rmsg);
380                 if (imp < TIPC_CRITICAL_IMPORTANCE)
381                         msg_set_importance(rmsg, ++imp);
382         }
383         msg_set_non_seq(rmsg, 0);
384         msg_set_size(rmsg, rmsg_sz);
385         msg_set_errcode(rmsg, err);
386         msg_set_prevnode(rmsg, tipc_own_addr);
387         msg_swap_words(rmsg, 4, 5);
388         if (!msg_short(rmsg))
389                 msg_swap_words(rmsg, 6, 7);
390
391         /* send self-abort message when rejecting on a connected port */
392         if (msg_connected(msg)) {
393                 struct tipc_port *p_ptr = tipc_port_lock(msg_destport(msg));
394
395                 if (p_ptr) {
396                         struct sk_buff *abuf = NULL;
397
398                         if (p_ptr->connected)
399                                 abuf = port_build_self_abort_msg(p_ptr, err);
400                         tipc_port_unlock(p_ptr);
401                         tipc_net_route_msg(abuf);
402                 }
403         }
404
405         /* send returned message & dispose of rejected message */
406         src_node = msg_prevnode(msg);
407         if (in_own_node(src_node))
408                 tipc_port_rcv(rbuf);
409         else
410                 tipc_link_xmit(rbuf, src_node, msg_link_selector(rmsg));
411 exit:
412         kfree_skb(buf);
413         return data_sz;
414 }
415
416 int tipc_port_iovec_reject(struct tipc_port *p_ptr, struct tipc_msg *hdr,
417                            struct iovec const *msg_sect, unsigned int len,
418                            int err)
419 {
420         struct sk_buff *buf;
421         int res;
422
423         res = tipc_msg_build(hdr, msg_sect, len, MAX_MSG_SIZE, &buf);
424         if (!buf)
425                 return res;
426
427         return tipc_reject_msg(buf, err);
428 }
429
430 static void port_timeout(unsigned long ref)
431 {
432         struct tipc_port *p_ptr = tipc_port_lock(ref);
433         struct sk_buff *buf = NULL;
434
435         if (!p_ptr)
436                 return;
437
438         if (!p_ptr->connected) {
439                 tipc_port_unlock(p_ptr);
440                 return;
441         }
442
443         /* Last probe answered ? */
444         if (p_ptr->probing_state == PROBING) {
445                 buf = port_build_self_abort_msg(p_ptr, TIPC_ERR_NO_PORT);
446         } else {
447                 buf = port_build_proto_msg(p_ptr, CONN_PROBE, 0);
448                 p_ptr->probing_state = PROBING;
449                 k_start_timer(&p_ptr->timer, p_ptr->probing_interval);
450         }
451         tipc_port_unlock(p_ptr);
452         tipc_net_route_msg(buf);
453 }
454
455
456 static void port_handle_node_down(unsigned long ref)
457 {
458         struct tipc_port *p_ptr = tipc_port_lock(ref);
459         struct sk_buff *buf = NULL;
460
461         if (!p_ptr)
462                 return;
463         buf = port_build_self_abort_msg(p_ptr, TIPC_ERR_NO_NODE);
464         tipc_port_unlock(p_ptr);
465         tipc_net_route_msg(buf);
466 }
467
468
469 static struct sk_buff *port_build_self_abort_msg(struct tipc_port *p_ptr, u32 err)
470 {
471         struct sk_buff *buf = port_build_peer_abort_msg(p_ptr, err);
472
473         if (buf) {
474                 struct tipc_msg *msg = buf_msg(buf);
475                 msg_swap_words(msg, 4, 5);
476                 msg_swap_words(msg, 6, 7);
477         }
478         return buf;
479 }
480
481
482 static struct sk_buff *port_build_peer_abort_msg(struct tipc_port *p_ptr, u32 err)
483 {
484         struct sk_buff *buf;
485         struct tipc_msg *msg;
486         u32 imp;
487
488         if (!p_ptr->connected)
489                 return NULL;
490
491         buf = tipc_buf_acquire(BASIC_H_SIZE);
492         if (buf) {
493                 msg = buf_msg(buf);
494                 memcpy(msg, &p_ptr->phdr, BASIC_H_SIZE);
495                 msg_set_hdr_sz(msg, BASIC_H_SIZE);
496                 msg_set_size(msg, BASIC_H_SIZE);
497                 imp = msg_importance(msg);
498                 if (imp < TIPC_CRITICAL_IMPORTANCE)
499                         msg_set_importance(msg, ++imp);
500                 msg_set_errcode(msg, err);
501         }
502         return buf;
503 }
504
505 void tipc_port_proto_rcv(struct sk_buff *buf)
506 {
507         struct tipc_msg *msg = buf_msg(buf);
508         struct tipc_port *p_ptr;
509         struct sk_buff *r_buf = NULL;
510         u32 destport = msg_destport(msg);
511         int wakeable;
512
513         /* Validate connection */
514         p_ptr = tipc_port_lock(destport);
515         if (!p_ptr || !p_ptr->connected || !tipc_port_peer_msg(p_ptr, msg)) {
516                 r_buf = tipc_buf_acquire(BASIC_H_SIZE);
517                 if (r_buf) {
518                         msg = buf_msg(r_buf);
519                         tipc_msg_init(msg, TIPC_HIGH_IMPORTANCE, TIPC_CONN_MSG,
520                                       BASIC_H_SIZE, msg_orignode(msg));
521                         msg_set_errcode(msg, TIPC_ERR_NO_PORT);
522                         msg_set_origport(msg, destport);
523                         msg_set_destport(msg, msg_origport(msg));
524                 }
525                 if (p_ptr)
526                         tipc_port_unlock(p_ptr);
527                 goto exit;
528         }
529
530         /* Process protocol message sent by peer */
531         switch (msg_type(msg)) {
532         case CONN_ACK:
533                 wakeable = tipc_port_congested(p_ptr) && p_ptr->congested &&
534                         p_ptr->wakeup;
535                 p_ptr->acked += msg_msgcnt(msg);
536                 if (!tipc_port_congested(p_ptr)) {
537                         p_ptr->congested = 0;
538                         if (wakeable)
539                                 p_ptr->wakeup(p_ptr);
540                 }
541                 break;
542         case CONN_PROBE:
543                 r_buf = port_build_proto_msg(p_ptr, CONN_PROBE_REPLY, 0);
544                 break;
545         default:
546                 /* CONN_PROBE_REPLY or unrecognized - no action required */
547                 break;
548         }
549         p_ptr->probing_state = CONFIRMED;
550         tipc_port_unlock(p_ptr);
551 exit:
552         tipc_net_route_msg(r_buf);
553         kfree_skb(buf);
554 }
555
556 static int port_print(struct tipc_port *p_ptr, char *buf, int len, int full_id)
557 {
558         struct publication *publ;
559         int ret;
560
561         if (full_id)
562                 ret = tipc_snprintf(buf, len, "<%u.%u.%u:%u>:",
563                                     tipc_zone(tipc_own_addr),
564                                     tipc_cluster(tipc_own_addr),
565                                     tipc_node(tipc_own_addr), p_ptr->ref);
566         else
567                 ret = tipc_snprintf(buf, len, "%-10u:", p_ptr->ref);
568
569         if (p_ptr->connected) {
570                 u32 dport = tipc_port_peerport(p_ptr);
571                 u32 destnode = tipc_port_peernode(p_ptr);
572
573                 ret += tipc_snprintf(buf + ret, len - ret,
574                                      " connected to <%u.%u.%u:%u>",
575                                      tipc_zone(destnode),
576                                      tipc_cluster(destnode),
577                                      tipc_node(destnode), dport);
578                 if (p_ptr->conn_type != 0)
579                         ret += tipc_snprintf(buf + ret, len - ret,
580                                              " via {%u,%u}", p_ptr->conn_type,
581                                              p_ptr->conn_instance);
582         } else if (p_ptr->published) {
583                 ret += tipc_snprintf(buf + ret, len - ret, " bound to");
584                 list_for_each_entry(publ, &p_ptr->publications, pport_list) {
585                         if (publ->lower == publ->upper)
586                                 ret += tipc_snprintf(buf + ret, len - ret,
587                                                      " {%u,%u}", publ->type,
588                                                      publ->lower);
589                         else
590                                 ret += tipc_snprintf(buf + ret, len - ret,
591                                                      " {%u,%u,%u}", publ->type,
592                                                      publ->lower, publ->upper);
593                 }
594         }
595         ret += tipc_snprintf(buf + ret, len - ret, "\n");
596         return ret;
597 }
598
599 struct sk_buff *tipc_port_get_ports(void)
600 {
601         struct sk_buff *buf;
602         struct tlv_desc *rep_tlv;
603         char *pb;
604         int pb_len;
605         struct tipc_port *p_ptr;
606         int str_len = 0;
607
608         buf = tipc_cfg_reply_alloc(TLV_SPACE(ULTRA_STRING_MAX_LEN));
609         if (!buf)
610                 return NULL;
611         rep_tlv = (struct tlv_desc *)buf->data;
612         pb = TLV_DATA(rep_tlv);
613         pb_len = ULTRA_STRING_MAX_LEN;
614
615         spin_lock_bh(&tipc_port_list_lock);
616         list_for_each_entry(p_ptr, &ports, port_list) {
617                 spin_lock_bh(p_ptr->lock);
618                 str_len += port_print(p_ptr, pb, pb_len, 0);
619                 spin_unlock_bh(p_ptr->lock);
620         }
621         spin_unlock_bh(&tipc_port_list_lock);
622         str_len += 1;   /* for "\0" */
623         skb_put(buf, TLV_SPACE(str_len));
624         TLV_SET(rep_tlv, TIPC_TLV_ULTRA_STRING, NULL, str_len);
625
626         return buf;
627 }
628
629 void tipc_port_reinit(void)
630 {
631         struct tipc_port *p_ptr;
632         struct tipc_msg *msg;
633
634         spin_lock_bh(&tipc_port_list_lock);
635         list_for_each_entry(p_ptr, &ports, port_list) {
636                 msg = &p_ptr->phdr;
637                 msg_set_prevnode(msg, tipc_own_addr);
638                 msg_set_orignode(msg, tipc_own_addr);
639         }
640         spin_unlock_bh(&tipc_port_list_lock);
641 }
642
643 void tipc_acknowledge(u32 ref, u32 ack)
644 {
645         struct tipc_port *p_ptr;
646         struct sk_buff *buf = NULL;
647
648         p_ptr = tipc_port_lock(ref);
649         if (!p_ptr)
650                 return;
651         if (p_ptr->connected) {
652                 p_ptr->conn_unacked -= ack;
653                 buf = port_build_proto_msg(p_ptr, CONN_ACK, ack);
654         }
655         tipc_port_unlock(p_ptr);
656         tipc_net_route_msg(buf);
657 }
658
659 int tipc_portimportance(u32 ref, unsigned int *importance)
660 {
661         struct tipc_port *p_ptr;
662
663         p_ptr = tipc_port_lock(ref);
664         if (!p_ptr)
665                 return -EINVAL;
666         *importance = (unsigned int)msg_importance(&p_ptr->phdr);
667         tipc_port_unlock(p_ptr);
668         return 0;
669 }
670
671 int tipc_set_portimportance(u32 ref, unsigned int imp)
672 {
673         struct tipc_port *p_ptr;
674
675         if (imp > TIPC_CRITICAL_IMPORTANCE)
676                 return -EINVAL;
677
678         p_ptr = tipc_port_lock(ref);
679         if (!p_ptr)
680                 return -EINVAL;
681         msg_set_importance(&p_ptr->phdr, (u32)imp);
682         tipc_port_unlock(p_ptr);
683         return 0;
684 }
685
686
687 int tipc_publish(struct tipc_port *p_ptr, unsigned int scope,
688                  struct tipc_name_seq const *seq)
689 {
690         struct publication *publ;
691         u32 key;
692
693         if (p_ptr->connected)
694                 return -EINVAL;
695         key = p_ptr->ref + p_ptr->pub_count + 1;
696         if (key == p_ptr->ref)
697                 return -EADDRINUSE;
698
699         publ = tipc_nametbl_publish(seq->type, seq->lower, seq->upper,
700                                     scope, p_ptr->ref, key);
701         if (publ) {
702                 list_add(&publ->pport_list, &p_ptr->publications);
703                 p_ptr->pub_count++;
704                 p_ptr->published = 1;
705                 return 0;
706         }
707         return -EINVAL;
708 }
709
710 int tipc_withdraw(struct tipc_port *p_ptr, unsigned int scope,
711                   struct tipc_name_seq const *seq)
712 {
713         struct publication *publ;
714         struct publication *tpubl;
715         int res = -EINVAL;
716
717         if (!seq) {
718                 list_for_each_entry_safe(publ, tpubl,
719                                          &p_ptr->publications, pport_list) {
720                         tipc_nametbl_withdraw(publ->type, publ->lower,
721                                               publ->ref, publ->key);
722                 }
723                 res = 0;
724         } else {
725                 list_for_each_entry_safe(publ, tpubl,
726                                          &p_ptr->publications, pport_list) {
727                         if (publ->scope != scope)
728                                 continue;
729                         if (publ->type != seq->type)
730                                 continue;
731                         if (publ->lower != seq->lower)
732                                 continue;
733                         if (publ->upper != seq->upper)
734                                 break;
735                         tipc_nametbl_withdraw(publ->type, publ->lower,
736                                               publ->ref, publ->key);
737                         res = 0;
738                         break;
739                 }
740         }
741         if (list_empty(&p_ptr->publications))
742                 p_ptr->published = 0;
743         return res;
744 }
745
746 int tipc_port_connect(u32 ref, struct tipc_portid const *peer)
747 {
748         struct tipc_port *p_ptr;
749         int res;
750
751         p_ptr = tipc_port_lock(ref);
752         if (!p_ptr)
753                 return -EINVAL;
754         res = __tipc_port_connect(ref, p_ptr, peer);
755         tipc_port_unlock(p_ptr);
756         return res;
757 }
758
759 /*
760  * __tipc_port_connect - connect to a remote peer
761  *
762  * Port must be locked.
763  */
764 int __tipc_port_connect(u32 ref, struct tipc_port *p_ptr,
765                         struct tipc_portid const *peer)
766 {
767         struct tipc_msg *msg;
768         int res = -EINVAL;
769
770         if (p_ptr->published || p_ptr->connected)
771                 goto exit;
772         if (!peer->ref)
773                 goto exit;
774
775         msg = &p_ptr->phdr;
776         msg_set_destnode(msg, peer->node);
777         msg_set_destport(msg, peer->ref);
778         msg_set_type(msg, TIPC_CONN_MSG);
779         msg_set_lookup_scope(msg, 0);
780         msg_set_hdr_sz(msg, SHORT_H_SIZE);
781
782         p_ptr->probing_interval = PROBING_INTERVAL;
783         p_ptr->probing_state = CONFIRMED;
784         p_ptr->connected = 1;
785         k_start_timer(&p_ptr->timer, p_ptr->probing_interval);
786
787         tipc_nodesub_subscribe(&p_ptr->subscription, peer->node,
788                           (void *)(unsigned long)ref,
789                           (net_ev_handler)port_handle_node_down);
790         res = 0;
791 exit:
792         p_ptr->max_pkt = tipc_link_get_max_pkt(peer->node, ref);
793         return res;
794 }
795
796 /*
797  * __tipc_disconnect - disconnect port from peer
798  *
799  * Port must be locked.
800  */
801 int __tipc_port_disconnect(struct tipc_port *tp_ptr)
802 {
803         if (tp_ptr->connected) {
804                 tp_ptr->connected = 0;
805                 /* let timer expire on it's own to avoid deadlock! */
806                 tipc_nodesub_unsubscribe(&tp_ptr->subscription);
807                 return 0;
808         }
809
810         return -ENOTCONN;
811 }
812
813 /*
814  * tipc_port_disconnect(): Disconnect port form peer.
815  *                    This is a node local operation.
816  */
817 int tipc_port_disconnect(u32 ref)
818 {
819         struct tipc_port *p_ptr;
820         int res;
821
822         p_ptr = tipc_port_lock(ref);
823         if (!p_ptr)
824                 return -EINVAL;
825         res = __tipc_port_disconnect(p_ptr);
826         tipc_port_unlock(p_ptr);
827         return res;
828 }
829
830 /*
831  * tipc_port_shutdown(): Send a SHUTDOWN msg to peer and disconnect
832  */
833 int tipc_port_shutdown(u32 ref)
834 {
835         struct tipc_port *p_ptr;
836         struct sk_buff *buf = NULL;
837
838         p_ptr = tipc_port_lock(ref);
839         if (!p_ptr)
840                 return -EINVAL;
841
842         buf = port_build_peer_abort_msg(p_ptr, TIPC_CONN_SHUTDOWN);
843         tipc_port_unlock(p_ptr);
844         tipc_net_route_msg(buf);
845         return tipc_port_disconnect(ref);
846 }
847
848 /**
849  * tipc_port_rcv - receive message from lower layer and deliver to port user
850  */
851 int tipc_port_rcv(struct sk_buff *buf)
852 {
853         struct tipc_port *p_ptr;
854         struct tipc_msg *msg = buf_msg(buf);
855         u32 destport = msg_destport(msg);
856         u32 dsz = msg_data_sz(msg);
857         u32 err;
858
859         /* forward unresolved named message */
860         if (unlikely(!destport)) {
861                 tipc_net_route_msg(buf);
862                 return dsz;
863         }
864
865         /* validate destination & pass to port, otherwise reject message */
866         p_ptr = tipc_port_lock(destport);
867         if (likely(p_ptr)) {
868                 err = p_ptr->dispatcher(p_ptr, buf);
869                 tipc_port_unlock(p_ptr);
870                 if (likely(!err))
871                         return dsz;
872         } else {
873                 err = TIPC_ERR_NO_PORT;
874         }
875
876         return tipc_reject_msg(buf, err);
877 }
878
879 /*
880  *  tipc_port_iovec_rcv: Concatenate and deliver sectioned
881  *                       message for this node.
882  */
883 static int tipc_port_iovec_rcv(struct tipc_port *sender,
884                                struct iovec const *msg_sect,
885                                unsigned int len)
886 {
887         struct sk_buff *buf;
888         int res;
889
890         res = tipc_msg_build(&sender->phdr, msg_sect, len, MAX_MSG_SIZE, &buf);
891         if (likely(buf))
892                 tipc_port_rcv(buf);
893         return res;
894 }
895
896 /**
897  * tipc_send - send message sections on connection
898  */
899 int tipc_send(u32 ref, struct iovec const *msg_sect, unsigned int len)
900 {
901         struct tipc_port *p_ptr;
902         u32 destnode;
903         int res;
904
905         p_ptr = tipc_port_deref(ref);
906         if (!p_ptr || !p_ptr->connected)
907                 return -EINVAL;
908
909         p_ptr->congested = 1;
910         if (!tipc_port_congested(p_ptr)) {
911                 destnode = tipc_port_peernode(p_ptr);
912                 if (likely(!in_own_node(destnode)))
913                         res = tipc_link_iovec_xmit_fast(p_ptr, msg_sect, len,
914                                                         destnode);
915                 else
916                         res = tipc_port_iovec_rcv(p_ptr, msg_sect, len);
917
918                 if (likely(res != -ELINKCONG)) {
919                         p_ptr->congested = 0;
920                         if (res > 0)
921                                 p_ptr->sent++;
922                         return res;
923                 }
924         }
925         if (port_unreliable(p_ptr)) {
926                 p_ptr->congested = 0;
927                 return len;
928         }
929         return -ELINKCONG;
930 }
931
932 /**
933  * tipc_send2name - send message sections to port name
934  */
935 int tipc_send2name(u32 ref, struct tipc_name const *name, unsigned int domain,
936                    struct iovec const *msg_sect, unsigned int len)
937 {
938         struct tipc_port *p_ptr;
939         struct tipc_msg *msg;
940         u32 destnode = domain;
941         u32 destport;
942         int res;
943
944         p_ptr = tipc_port_deref(ref);
945         if (!p_ptr || p_ptr->connected)
946                 return -EINVAL;
947
948         msg = &p_ptr->phdr;
949         msg_set_type(msg, TIPC_NAMED_MSG);
950         msg_set_hdr_sz(msg, NAMED_H_SIZE);
951         msg_set_nametype(msg, name->type);
952         msg_set_nameinst(msg, name->instance);
953         msg_set_lookup_scope(msg, tipc_addr_scope(domain));
954         destport = tipc_nametbl_translate(name->type, name->instance, &destnode);
955         msg_set_destnode(msg, destnode);
956         msg_set_destport(msg, destport);
957
958         if (likely(destport || destnode)) {
959                 if (likely(in_own_node(destnode)))
960                         res = tipc_port_iovec_rcv(p_ptr, msg_sect, len);
961                 else if (tipc_own_addr)
962                         res = tipc_link_iovec_xmit_fast(p_ptr, msg_sect, len,
963                                                         destnode);
964                 else
965                         res = tipc_port_iovec_reject(p_ptr, msg, msg_sect,
966                                                      len, TIPC_ERR_NO_NODE);
967                 if (likely(res != -ELINKCONG)) {
968                         if (res > 0)
969                                 p_ptr->sent++;
970                         return res;
971                 }
972                 if (port_unreliable(p_ptr)) {
973                         return len;
974                 }
975                 return -ELINKCONG;
976         }
977         return tipc_port_iovec_reject(p_ptr, msg, msg_sect, len,
978                                       TIPC_ERR_NO_NAME);
979 }
980
981 /**
982  * tipc_send2port - send message sections to port identity
983  */
984 int tipc_send2port(u32 ref, struct tipc_portid const *dest,
985                    struct iovec const *msg_sect, unsigned int len)
986 {
987         struct tipc_port *p_ptr;
988         struct tipc_msg *msg;
989         int res;
990
991         p_ptr = tipc_port_deref(ref);
992         if (!p_ptr || p_ptr->connected)
993                 return -EINVAL;
994
995         msg = &p_ptr->phdr;
996         msg_set_type(msg, TIPC_DIRECT_MSG);
997         msg_set_lookup_scope(msg, 0);
998         msg_set_destnode(msg, dest->node);
999         msg_set_destport(msg, dest->ref);
1000         msg_set_hdr_sz(msg, BASIC_H_SIZE);
1001
1002         if (in_own_node(dest->node))
1003                 res =  tipc_port_iovec_rcv(p_ptr, msg_sect, len);
1004         else if (tipc_own_addr)
1005                 res = tipc_link_iovec_xmit_fast(p_ptr, msg_sect, len,
1006                                                 dest->node);
1007         else
1008                 res = tipc_port_iovec_reject(p_ptr, msg, msg_sect, len,
1009                                                 TIPC_ERR_NO_NODE);
1010         if (likely(res != -ELINKCONG)) {
1011                 if (res > 0)
1012                         p_ptr->sent++;
1013                 return res;
1014         }
1015         if (port_unreliable(p_ptr)) {
1016                 return len;
1017         }
1018         return -ELINKCONG;
1019 }