tipc: introduce new link protocol msg create function
authorJon Paul Maloy <jon.maloy@ericsson.com>
Thu, 16 Jul 2015 20:54:26 +0000 (16:54 -0400)
committerDavid S. Miller <davem@davemloft.net>
Tue, 21 Jul 2015 03:41:15 +0000 (20:41 -0700)
As a preparation for later changes, we introduce a new function
tipc_link_build_proto_msg(). Instead of actually sending the created
protocol message, it only creates it and adds it to the head of a
skb queue provided by the caller.

Since we still need the existing function tipc_link_protocol_xmit()
for a while, we redesign it to make use of the new function.

Reviewed-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
net/tipc/link.c

index 35a2da6..657ba91 100644 (file)
@@ -129,6 +129,9 @@ static void tipc_link_proto_rcv(struct tipc_link *link,
                                struct sk_buff *skb);
 static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tol);
 static void link_state_event(struct tipc_link *l_ptr, u32 event);
+static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe,
+                                     u16 rcvgap, int tolerance, int priority,
+                                     struct sk_buff_head *xmitq);
 static void link_reset_statistics(struct tipc_link *l_ptr);
 static void link_print(struct tipc_link *l_ptr, const char *str);
 static void tipc_link_sync_xmit(struct tipc_link *l);
@@ -1323,77 +1326,21 @@ static void link_handle_out_of_seq_msg(struct tipc_link *l_ptr,
 /*
  * Send protocol message to the other endpoint.
  */
-void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int probe_msg,
+void tipc_link_proto_xmit(struct tipc_link *l, u32 msg_typ, int probe_msg,
                          u32 gap, u32 tolerance, u32 priority)
 {
-       struct sk_buff *buf = NULL;
-       struct tipc_msg *msg = l_ptr->pmsg;
-       u32 msg_size = sizeof(l_ptr->proto_msg);
-       int r_flag;
-       u16 last_rcv;
+       struct sk_buff *skb = NULL;
+       struct sk_buff_head xmitq;
 
-       /* Don't send protocol message during link failover */
-       if (l_ptr->exec_mode == TIPC_LINK_BLOCKED)
-               return;
-
-       /* Abort non-RESET send if communication with node is prohibited */
-       if ((tipc_node_blocked(l_ptr->owner)) && (msg_typ != RESET_MSG))
-               return;
-
-       /* Create protocol message with "out-of-sequence" sequence number */
-       msg_set_type(msg, msg_typ);
-       msg_set_net_plane(msg, l_ptr->net_plane);
-       msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
-       msg_set_last_bcast(msg, tipc_bclink_get_last_sent(l_ptr->owner->net));
-
-       if (msg_typ == STATE_MSG) {
-               u16 next_sent = l_ptr->snd_nxt;
-
-               if (!tipc_link_is_up(l_ptr))
-                       return;
-               msg_set_next_sent(msg, next_sent);
-               if (!skb_queue_empty(&l_ptr->deferdq)) {
-                       last_rcv = buf_seqno(skb_peek(&l_ptr->deferdq));
-                       gap = mod(last_rcv - l_ptr->rcv_nxt);
-               }
-               msg_set_seq_gap(msg, gap);
-               if (gap)
-                       l_ptr->stats.sent_nacks++;
-               msg_set_link_tolerance(msg, tolerance);
-               msg_set_linkprio(msg, priority);
-               msg_set_max_pkt(msg, l_ptr->mtu);
-               msg_set_ack(msg, mod(l_ptr->rcv_nxt - 1));
-               msg_set_probe(msg, probe_msg != 0);
-               if (probe_msg)
-                       l_ptr->stats.sent_probes++;
-               l_ptr->stats.sent_states++;
-       } else {                /* RESET_MSG or ACTIVATE_MSG */
-               msg_set_ack(msg, mod(l_ptr->failover_checkpt - 1));
-               msg_set_seq_gap(msg, 0);
-               msg_set_next_sent(msg, 1);
-               msg_set_probe(msg, 0);
-               msg_set_link_tolerance(msg, l_ptr->tolerance);
-               msg_set_linkprio(msg, l_ptr->priority);
-               msg_set_max_pkt(msg, l_ptr->advertised_mtu);
-       }
-
-       r_flag = (l_ptr->owner->working_links > tipc_link_is_up(l_ptr));
-       msg_set_redundant_link(msg, r_flag);
-       msg_set_linkprio(msg, l_ptr->priority);
-       msg_set_size(msg, msg_size);
-
-       msg_set_seqno(msg, mod(l_ptr->snd_nxt + (0xffff / 2)));
-
-       buf = tipc_buf_acquire(msg_size);
-       if (!buf)
+       __skb_queue_head_init(&xmitq);
+       tipc_link_build_proto_msg(l, msg_typ, probe_msg, gap,
+                                 tolerance, priority, &xmitq);
+       skb = __skb_dequeue(&xmitq);
+       if (!skb)
                return;
-
-       skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg));
-       buf->priority = TC_PRIO_CONTROL;
-       tipc_bearer_send(l_ptr->owner->net, l_ptr->bearer_id, buf,
-                        &l_ptr->media_addr);
-       l_ptr->rcv_unacked = 0;
-       kfree_skb(buf);
+       tipc_bearer_send(l->owner->net, l->bearer_id, skb, &l->media_addr);
+       l->rcv_unacked = 0;
+       kfree_skb(skb);
 }
 
 /*
@@ -1514,6 +1461,69 @@ exit:
        kfree_skb(buf);
 }
 
+/* tipc_link_build_proto_msg: prepare link protocol message for transmission
+ */
+static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe,
+                                     u16 rcvgap, int tolerance, int priority,
+                                     struct sk_buff_head *xmitq)
+{
+       struct sk_buff *skb = NULL;
+       struct tipc_msg *hdr = l->pmsg;
+       u16 snd_nxt = l->snd_nxt;
+       u16 rcv_nxt = l->rcv_nxt;
+       u16 rcv_last = rcv_nxt - 1;
+       int node_up = l->owner->bclink.recv_permitted;
+
+       /* Don't send protocol message during reset or link failover */
+       if (l->exec_mode == TIPC_LINK_BLOCKED)
+               return;
+
+       /* Abort non-RESET send if communication with node is prohibited */
+       if ((tipc_node_blocked(l->owner)) && (mtyp != RESET_MSG))
+               return;
+
+       msg_set_type(hdr, mtyp);
+       msg_set_net_plane(hdr, l->net_plane);
+       msg_set_bcast_ack(hdr, l->owner->bclink.last_in);
+       msg_set_last_bcast(hdr, tipc_bclink_get_last_sent(l->owner->net));
+       msg_set_link_tolerance(hdr, tolerance);
+       msg_set_linkprio(hdr, priority);
+       msg_set_redundant_link(hdr, node_up);
+       msg_set_seq_gap(hdr, 0);
+
+       /* Compatibility: created msg must not be in sequence with pkt flow */
+       msg_set_seqno(hdr, snd_nxt + U16_MAX / 2);
+
+       if (mtyp == STATE_MSG) {
+               if (!tipc_link_is_up(l))
+                       return;
+               msg_set_next_sent(hdr, snd_nxt);
+
+               /* Override rcvgap if there are packets in deferred queue */
+               if (!skb_queue_empty(&l->deferdq))
+                       rcvgap = buf_seqno(skb_peek(&l->deferdq)) - rcv_nxt;
+               if (rcvgap) {
+                       msg_set_seq_gap(hdr, rcvgap);
+                       l->stats.sent_nacks++;
+               }
+               msg_set_ack(hdr, rcv_last);
+               msg_set_probe(hdr, probe);
+               if (probe)
+                       l->stats.sent_probes++;
+               l->stats.sent_states++;
+       } else {
+               /* RESET_MSG or ACTIVATE_MSG */
+               msg_set_max_pkt(hdr, l->advertised_mtu);
+               msg_set_ack(hdr, l->failover_checkpt - 1);
+               msg_set_next_sent(hdr, 1);
+       }
+       skb = tipc_buf_acquire(msg_size(hdr));
+       if (!skb)
+               return;
+       skb_copy_to_linear_data(skb, hdr, msg_size(hdr));
+       skb->priority = TC_PRIO_CONTROL;
+       __skb_queue_head(xmitq, skb);
+}
 
 /* tipc_link_tunnel_xmit(): Tunnel one packet via a link belonging to
  * a different bearer. Owner node is locked.