tipc: transfer broadcast nacks in link state messages
[cascardo/linux.git] / net / tipc / link.c
index 7d89f87..136316f 100644 (file)
@@ -42,6 +42,7 @@
 #include "name_distr.h"
 #include "discover.h"
 #include "netlink.h"
+#include "monitor.h"
 
 #include <linux/pkt_sched.h>
 
@@ -87,7 +88,6 @@ struct tipc_stats {
  * @peer_bearer_id: bearer id used by link's peer endpoint
  * @bearer_id: local bearer id used by link
  * @tolerance: minimum link continuity loss needed to reset link [in ms]
- * @keepalive_intv: link keepalive timer interval
  * @abort_limit: # of unacknowledged continuity probes needed to reset link
  * @state: current state of link FSM
  * @peer_caps: bitmap describing capabilities of peer node
@@ -96,6 +96,7 @@ struct tipc_stats {
  * @pmsg: convenience pointer to "proto_msg" field
  * @priority: current link priority
  * @net_plane: current link network plane ('A' through 'H')
+ * @mon_state: cookie with information needed by link monitor
  * @backlog_limit: backlog queue congestion thresholds (indexed by importance)
  * @exp_msg_count: # of tunnelled messages expected during link changeover
  * @reset_rcv_checkpt: seq # of last acknowledged message at time of link reset
@@ -131,7 +132,6 @@ struct tipc_link {
        u32 peer_bearer_id;
        u32 bearer_id;
        u32 tolerance;
-       unsigned long keepalive_intv;
        u32 abort_limit;
        u32 state;
        u16 peer_caps;
@@ -140,6 +140,7 @@ struct tipc_link {
        char if_name[TIPC_MAX_IF_NAME];
        u32 priority;
        char net_plane;
+       struct tipc_mon_state mon_state;
        u16 rst_cnt;
 
        /* Failover/synch */
@@ -366,6 +367,18 @@ int tipc_link_bc_peers(struct tipc_link *l)
        return l->ackers;
 }
 
+u16 link_bc_rcv_gap(struct tipc_link *l)
+{
+       struct sk_buff *skb = skb_peek(&l->deferdq);
+       u16 gap = 0;
+
+       if (more(l->snd_nxt, l->rcv_nxt))
+               gap = l->snd_nxt - l->rcv_nxt;
+       if (skb)
+               gap = buf_seqno(skb) - l->rcv_nxt;
+       return gap;
+}
+
 void tipc_link_set_mtu(struct tipc_link *l, int mtu)
 {
        l->mtu = mtu;
@@ -713,18 +726,25 @@ int tipc_link_timeout(struct tipc_link *l, struct sk_buff_head *xmitq)
        bool setup = false;
        u16 bc_snt = l->bc_sndlink->snd_nxt - 1;
        u16 bc_acked = l->bc_rcvlink->acked;
-
-       link_profile_stats(l);
+       struct tipc_mon_state *mstate = &l->mon_state;
 
        switch (l->state) {
        case LINK_ESTABLISHED:
        case LINK_SYNCHING:
-               if (l->silent_intv_cnt > l->abort_limit)
-                       return tipc_link_fsm_evt(l, LINK_FAILURE_EVT);
                mtyp = STATE_MSG;
+               link_profile_stats(l);
+               tipc_mon_get_state(l->net, l->addr, mstate, l->bearer_id);
+               if (mstate->reset || (l->silent_intv_cnt > l->abort_limit))
+                       return tipc_link_fsm_evt(l, LINK_FAILURE_EVT);
                state = bc_acked != bc_snt;
-               probe = l->silent_intv_cnt;
-               l->silent_intv_cnt++;
+               state |= l->bc_rcvlink->rcv_unacked;
+               state |= l->rcv_unacked;
+               state |= !skb_queue_empty(&l->transmq);
+               state |= !skb_queue_empty(&l->deferdq);
+               probe = mstate->probing;
+               probe |= l->silent_intv_cnt;
+               if (probe || mstate->monitoring)
+                       l->silent_intv_cnt++;
                break;
        case LINK_RESET:
                setup = l->rst_cnt++ <= 4;
@@ -799,7 +819,7 @@ void link_prepare_wakeup(struct tipc_link *l)
 
        skb_queue_walk_safe(&l->wakeupq, skb, tmp) {
                imp = TIPC_SKB_CB(skb)->chain_imp;
-               lim = l->window + l->backlog[imp].limit;
+               lim = l->backlog[imp].limit;
                pnd[imp] += TIPC_SKB_CB(skb)->chain_sz;
                if ((pnd[imp] + l->backlog[imp].len) >= lim)
                        break;
@@ -835,6 +855,7 @@ void tipc_link_reset(struct tipc_link *l)
        l->stats.recv_info = 0;
        l->stale_count = 0;
        l->bc_peer_is_up = false;
+       memset(&l->mon_state, 0, sizeof(l->mon_state));
        tipc_link_reset_stats(l);
 }
 
@@ -864,9 +885,11 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list,
        struct sk_buff *skb, *_skb, *bskb;
 
        /* Match msg importance against this and all higher backlog limits: */
-       for (i = imp; i <= TIPC_SYSTEM_IMPORTANCE; i++) {
-               if (unlikely(l->backlog[i].len >= l->backlog[i].limit))
-                       return link_schedule_user(l, list);
+       if (!skb_queue_empty(backlogq)) {
+               for (i = imp; i <= TIPC_SYSTEM_IMPORTANCE; i++) {
+                       if (unlikely(l->backlog[i].len >= l->backlog[i].limit))
+                               return link_schedule_user(l, list);
+               }
        }
        if (unlikely(msg_size(hdr) > mtu)) {
                skb_queue_purge(list);
@@ -1124,7 +1147,10 @@ int tipc_link_build_state_msg(struct tipc_link *l, struct sk_buff_head *xmitq)
                if (((l->rcv_nxt ^ tipc_own_addr(l->net)) & 0xf) != 0xf)
                        return 0;
                l->rcv_unacked = 0;
-               return TIPC_LINK_SND_BC_ACK;
+
+               /* Use snd_nxt to store peer's snd_nxt in broadcast rcv link */
+               l->snd_nxt = l->rcv_nxt;
+               return TIPC_LINK_SND_STATE;
        }
 
        /* Unicast ACK */
@@ -1225,7 +1251,7 @@ int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb,
                        rc |= tipc_link_input(l, skb, l->inputq);
                if (unlikely(++l->rcv_unacked >= TIPC_MIN_LINK_WIN))
                        rc |= tipc_link_build_state_msg(l, xmitq);
-               if (unlikely(rc & ~TIPC_LINK_SND_BC_ACK))
+               if (unlikely(rc & ~TIPC_LINK_SND_STATE))
                        break;
        } while ((skb = __skb_dequeue(defq)));
 
@@ -1239,10 +1265,14 @@ static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe,
                                      u16 rcvgap, int tolerance, int priority,
                                      struct sk_buff_head *xmitq)
 {
+       struct tipc_link *bcl = l->bc_rcvlink;
        struct sk_buff *skb;
        struct tipc_msg *hdr;
        struct sk_buff_head *dfq = &l->deferdq;
-       bool node_up = link_is_up(l->bc_rcvlink);
+       bool node_up = link_is_up(bcl);
+       struct tipc_mon_state *mstate = &l->mon_state;
+       int dlen = 0;
+       void *data;
 
        /* Don't send protocol message during reset or link failover */
        if (tipc_link_is_blocked(l))
@@ -1255,18 +1285,19 @@ static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe,
                rcvgap = buf_seqno(skb_peek(dfq)) - l->rcv_nxt;
 
        skb = tipc_msg_create(LINK_PROTOCOL, mtyp, INT_H_SIZE,
-                             TIPC_MAX_IF_NAME, l->addr,
+                             tipc_max_domain_size, l->addr,
                              tipc_own_addr(l->net), 0, 0, 0);
        if (!skb)
                return;
 
        hdr = buf_msg(skb);
+       data = msg_data(hdr);
        msg_set_session(hdr, l->session);
        msg_set_bearer_id(hdr, l->bearer_id);
        msg_set_net_plane(hdr, l->net_plane);
        msg_set_next_sent(hdr, l->snd_nxt);
        msg_set_ack(hdr, l->rcv_nxt - 1);
-       msg_set_bcast_ack(hdr, l->bc_rcvlink->rcv_nxt - 1);
+       msg_set_bcast_ack(hdr, bcl->rcv_nxt - 1);
        msg_set_last_bcast(hdr, l->bc_sndlink->snd_nxt - 1);
        msg_set_link_tolerance(hdr, tolerance);
        msg_set_linkprio(hdr, priority);
@@ -1276,14 +1307,19 @@ static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe,
 
        if (mtyp == STATE_MSG) {
                msg_set_seq_gap(hdr, rcvgap);
-               msg_set_size(hdr, INT_H_SIZE);
+               msg_set_bc_gap(hdr, link_bc_rcv_gap(bcl));
                msg_set_probe(hdr, probe);
+               tipc_mon_prep(l->net, data, &dlen, mstate, l->bearer_id);
+               msg_set_size(hdr, INT_H_SIZE + dlen);
+               skb_trim(skb, INT_H_SIZE + dlen);
                l->stats.sent_states++;
                l->rcv_unacked = 0;
        } else {
                /* RESET_MSG or ACTIVATE_MSG */
                msg_set_max_pkt(hdr, l->advertised_mtu);
-               strcpy(msg_data(hdr), l->if_name);
+               strcpy(data, l->if_name);
+               msg_set_size(hdr, INT_H_SIZE + TIPC_MAX_IF_NAME);
+               skb_trim(skb, INT_H_SIZE + TIPC_MAX_IF_NAME);
        }
        if (probe)
                l->stats.sent_probes++;
@@ -1376,7 +1412,9 @@ static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb,
        u16 peers_tol = msg_link_tolerance(hdr);
        u16 peers_prio = msg_linkprio(hdr);
        u16 rcv_nxt = l->rcv_nxt;
+       u16 dlen = msg_data_sz(hdr);
        int mtyp = msg_type(hdr);
+       void *data;
        char *if_name;
        int rc = 0;
 
@@ -1386,6 +1424,10 @@ static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb,
        if (tipc_own_addr(l->net) > msg_prevnode(hdr))
                l->net_plane = msg_net_plane(hdr);
 
+       skb_linearize(skb);
+       hdr = buf_msg(skb);
+       data = msg_data(hdr);
+
        switch (mtyp) {
        case RESET_MSG:
 
@@ -1396,8 +1438,6 @@ static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb,
                /* fall thru' */
 
        case ACTIVATE_MSG:
-               skb_linearize(skb);
-               hdr = buf_msg(skb);
 
                /* Complete own link name with peer's interface name */
                if_name =  strrchr(l->name, ':') + 1;
@@ -1405,7 +1445,7 @@ static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb,
                        break;
                if (msg_data_sz(hdr) < TIPC_MAX_IF_NAME)
                        break;
-               strncpy(if_name, msg_data(hdr), TIPC_MAX_IF_NAME);
+               strncpy(if_name, data, TIPC_MAX_IF_NAME);
 
                /* Update own tolerance if peer indicates a non-zero value */
                if (in_range(peers_tol, TIPC_MIN_LINK_TOL, TIPC_MAX_LINK_TOL))
@@ -1453,6 +1493,8 @@ static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb,
                                rc = TIPC_LINK_UP_EVT;
                        break;
                }
+               tipc_mon_rcv(l->net, data, dlen, l->addr,
+                            &l->mon_state, l->bearer_id);
 
                /* Send NACK if peer has sent pkts we haven't received yet */
                if (more(peers_snd_nxt, rcv_nxt) && !tipc_link_is_synching(l))
@@ -1550,49 +1592,68 @@ void tipc_link_bc_init_rcv(struct tipc_link *l, struct tipc_msg *hdr)
 
 /* tipc_link_bc_sync_rcv - update rcv link according to peer's send state
  */
-void tipc_link_bc_sync_rcv(struct tipc_link *l, struct tipc_msg *hdr,
-                          struct sk_buff_head *xmitq)
+int tipc_link_bc_sync_rcv(struct tipc_link *l, struct tipc_msg *hdr,
+                         struct sk_buff_head *xmitq)
 {
        u16 peers_snd_nxt = msg_bc_snd_nxt(hdr);
+       u16 from = msg_bcast_ack(hdr) + 1;
+       u16 to = from + msg_bc_gap(hdr) - 1;
+       int rc = 0;
 
        if (!link_is_up(l))
-               return;
+               return rc;
 
        if (!msg_peer_node_is_up(hdr))
-               return;
+               return rc;
 
        /* Open when peer ackowledges our bcast init msg (pkt #1) */
        if (msg_ack(hdr))
                l->bc_peer_is_up = true;
 
        if (!l->bc_peer_is_up)
-               return;
+               return rc;
 
        /* Ignore if peers_snd_nxt goes beyond receive window */
        if (more(peers_snd_nxt, l->rcv_nxt + l->window))
-               return;
+               return rc;
+
+       if (!less(to, from)) {
+               rc = tipc_link_retrans(l->bc_sndlink, from, to, xmitq);
+               l->stats.recv_nacks++;
+       }
+
+       l->snd_nxt = peers_snd_nxt;
+       if (link_bc_rcv_gap(l))
+               rc |= TIPC_LINK_SND_STATE;
+
+       /* Return now if sender supports nack via STATE messages */
+       if (l->peer_caps & TIPC_BCAST_STATE_NACK)
+               return rc;
+
+       /* Otherwise, be backwards compatible */
 
        if (!more(peers_snd_nxt, l->rcv_nxt)) {
                l->nack_state = BC_NACK_SND_CONDITIONAL;
-               return;
+               return 0;
        }
 
        /* Don't NACK if one was recently sent or peeked */
        if (l->nack_state == BC_NACK_SND_SUPPRESS) {
                l->nack_state = BC_NACK_SND_UNCONDITIONAL;
-               return;
+               return 0;
        }
 
        /* Conditionally delay NACK sending until next synch rcv */
        if (l->nack_state == BC_NACK_SND_CONDITIONAL) {
                l->nack_state = BC_NACK_SND_UNCONDITIONAL;
                if ((peers_snd_nxt - l->rcv_nxt) < TIPC_MIN_LINK_WIN)
-                       return;
+                       return 0;
        }
 
        /* Send NACK now but suppress next one */
        tipc_link_build_bc_proto_msg(l, true, peers_snd_nxt, xmitq);
        l->nack_state = BC_NACK_SND_SUPPRESS;
+       return 0;
 }
 
 void tipc_link_bc_ack_rcv(struct tipc_link *l, u16 acked,
@@ -1629,6 +1690,8 @@ void tipc_link_bc_ack_rcv(struct tipc_link *l, u16 acked,
 }
 
 /* tipc_link_bc_nack_rcv(): receive broadcast nack message
+ * This function is here for backwards compatibility, since
+ * no BCAST_PROTOCOL/STATE messages occur from TIPC v2.5.
  */
 int tipc_link_bc_nack_rcv(struct tipc_link *l, struct sk_buff *skb,
                          struct sk_buff_head *xmitq)
@@ -1669,10 +1732,10 @@ void tipc_link_set_queue_limits(struct tipc_link *l, u32 win)
        int max_bulk = TIPC_MAX_PUBLICATIONS / (l->mtu / ITEM_SIZE);
 
        l->window = win;
-       l->backlog[TIPC_LOW_IMPORTANCE].limit      = win / 2;
-       l->backlog[TIPC_MEDIUM_IMPORTANCE].limit   = win;
-       l->backlog[TIPC_HIGH_IMPORTANCE].limit     = win / 2 * 3;
-       l->backlog[TIPC_CRITICAL_IMPORTANCE].limit = win * 2;
+       l->backlog[TIPC_LOW_IMPORTANCE].limit      = max_t(u16, 50, win);
+       l->backlog[TIPC_MEDIUM_IMPORTANCE].limit   = max_t(u16, 100, win * 2);
+       l->backlog[TIPC_HIGH_IMPORTANCE].limit     = max_t(u16, 150, win * 3);
+       l->backlog[TIPC_CRITICAL_IMPORTANCE].limit = max_t(u16, 200, win * 4);
        l->backlog[TIPC_SYSTEM_IMPORTANCE].limit   = max_bulk;
 }