tipc: improve link FSM implementation
authorJon Paul Maloy <jon.maloy@ericsson.com>
Thu, 16 Jul 2015 20:54:27 +0000 (16:54 -0400)
committerDavid S. Miller <davem@davemloft.net>
Tue, 21 Jul 2015 03:41:15 +0000 (20:41 -0700)
The link FSM implementation is currently unnecessarily complex.
It sometimes checks for conditional state outside the FSM data
before deciding next state, and often performs actions directly
inside the FSM logics.

In this commit, we create a second, simpler FSM implementation,
that as far as possible acts only on states and events that it is
strictly defined for, and postpone any actions until it is finished
with its decisions. It also returns an event flag field and an a
buffer queue which may potentially contain a protocol message to
be sent by the caller.

Unfortunately, we cannot yet make the FSM "clean", in the sense
that its decisions are only based on FSM state and event, and that
state changes happen only here. That will have to wait until the
activate/reset logics has been cleaned up in a future commit.

We also rename the link states as follows:

WORKING_WORKING -> TIPC_LINK_WORKING
WORKING_UNKNOWN -> TIPC_LINK_PROBING
RESET_UNKNOWN   -> TIPC_LINK_RESETTING
RESET_RESET     -> TIPC_LINK_ESTABLISHING

The existing FSM function, link_state_event(), is still needed for
a while, so we redesign it to make use of the new function.

Reviewed-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
net/tipc/link.c
net/tipc/link.h

index 657ba91..5d2f919 100644 (file)
@@ -88,10 +88,10 @@ static const struct nla_policy tipc_nl_prop_policy[TIPC_NLA_PROP_MAX + 1] = {
 /* Link FSM states and events:
  */
 enum {
-       WORKING_WORKING,
-       WORKING_UNKNOWN,
-       RESET_RESET,
-       RESET_UNKNOWN
+       TIPC_LINK_WORKING,
+       TIPC_LINK_PROBING,
+       TIPC_LINK_RESETTING,
+       TIPC_LINK_ESTABLISHING
 };
 
 enum {
@@ -103,24 +103,24 @@ enum {
 
 /* Link FSM state checking routines
  */
-static int link_working_working(struct tipc_link *l)
+static int link_working(struct tipc_link *l)
 {
-       return l->state == WORKING_WORKING;
+       return l->state == TIPC_LINK_WORKING;
 }
 
-static int link_working_unknown(struct tipc_link *l)
+static int link_probing(struct tipc_link *l)
 {
-       return l->state == WORKING_UNKNOWN;
+       return l->state == TIPC_LINK_PROBING;
 }
 
-static int link_reset_unknown(struct tipc_link *l)
+static int link_resetting(struct tipc_link *l)
 {
-       return l->state == RESET_UNKNOWN;
+       return l->state == TIPC_LINK_RESETTING;
 }
 
-static int link_reset_reset(struct tipc_link *l)
+static int link_establishing(struct tipc_link *l)
 {
-       return l->state == RESET_RESET;
+       return l->state == TIPC_LINK_ESTABLISHING;
 }
 
 static void link_handle_out_of_seq_msg(struct tipc_link *link,
@@ -140,6 +140,8 @@ static void tipc_link_input(struct tipc_link *l, struct sk_buff *skb);
 static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb);
 static bool tipc_link_failover_rcv(struct tipc_link *l, struct sk_buff **skb);
 static void link_set_timer(struct tipc_link *link, unsigned long time);
+static void link_activate(struct tipc_link *link);
+
 /*
  *  Simple link routines
  */
@@ -179,7 +181,7 @@ int tipc_link_is_up(struct tipc_link *l_ptr)
 {
        if (!l_ptr)
                return 0;
-       return link_working_working(l_ptr) || link_working_unknown(l_ptr);
+       return link_working(l_ptr) || link_probing(l_ptr);
 }
 
 int tipc_link_is_active(struct tipc_link *l)
@@ -234,8 +236,11 @@ static void link_timeout(unsigned long data)
        }
 
        /* do all other link processing performed on a periodic basis */
-       if (l_ptr->silent_intv_cnt || tipc_bclink_acks_missing(l_ptr->owner))
+       if (l_ptr->silent_intv_cnt)
                link_state_event(l_ptr, SILENCE_EVT);
+       else if (link_working(l_ptr) && tipc_bclink_acks_missing(l_ptr->owner))
+               tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0);
+
        l_ptr->silent_intv_cnt++;
        if (skb_queue_len(&l_ptr->backlogq))
                tipc_link_push_packets(l_ptr);
@@ -304,7 +309,7 @@ struct tipc_link *tipc_link_create(struct tipc_node *n_ptr,
        l_ptr->peer_session = WILDCARD_SESSION;
        l_ptr->bearer_id = b_ptr->identity;
        link_set_supervision_props(l_ptr, b_ptr->tolerance);
-       l_ptr->state = RESET_UNKNOWN;
+       l_ptr->state = TIPC_LINK_RESETTING;
 
        l_ptr->pmsg = (struct tipc_msg *)&l_ptr->proto_msg;
        msg = l_ptr->pmsg;
@@ -366,6 +371,134 @@ void tipc_link_delete_list(struct net *net, unsigned int bearer_id)
        rcu_read_unlock();
 }
 
+/**
+ * tipc_link_fsm_evt - link finite state machine
+ * @l: pointer to link
+ * @evt: state machine event to be processed
+ * @xmitq: queue to prepend created protocol message, if any
+ */
+static int tipc_link_fsm_evt(struct tipc_link *l, int evt,
+                            struct sk_buff_head *xmitq)
+{
+       int mtyp = 0, rc = 0;
+       struct tipc_link *pl;
+       enum {
+               LINK_RESET    = 1,
+               LINK_ACTIVATE = (1 << 1),
+               SND_PROBE     = (1 << 2),
+               SND_STATE     = (1 << 3),
+               SND_RESET     = (1 << 4),
+               SND_ACTIVATE  = (1 << 5)
+       } actions = 0;
+
+       if (l->exec_mode == TIPC_LINK_BLOCKED)
+               return rc;
+
+       switch (l->state) {
+       case TIPC_LINK_WORKING:
+               switch (evt) {
+               case TRAFFIC_EVT:
+               case ACTIVATE_EVT:
+                       break;
+               case SILENCE_EVT:
+                       l->state = TIPC_LINK_PROBING;
+                       actions |= SND_PROBE;
+                       break;
+               case PEER_RESET_EVT:
+                       actions |= LINK_RESET | SND_ACTIVATE;
+                       break;
+               default:
+                       pr_debug("%s%u WORKING\n", link_unk_evt, evt);
+               }
+               break;
+       case TIPC_LINK_PROBING:
+               switch (evt) {
+               case TRAFFIC_EVT:
+               case ACTIVATE_EVT:
+                       l->state = TIPC_LINK_WORKING;
+                       break;
+               case PEER_RESET_EVT:
+                       actions |= LINK_RESET | SND_ACTIVATE;
+                       break;
+               case SILENCE_EVT:
+                       if (l->silent_intv_cnt <= l->abort_limit) {
+                               actions |= SND_PROBE;
+                               break;
+                       }
+                       actions |= LINK_RESET | SND_RESET;
+                       break;
+               default:
+                       pr_err("%s%u PROBING\n", link_unk_evt, evt);
+               }
+               break;
+       case TIPC_LINK_RESETTING:
+               switch (evt) {
+               case TRAFFIC_EVT:
+                       break;
+               case ACTIVATE_EVT:
+                       pl = node_active_link(l->owner, 0);
+                       if (pl && link_probing(pl))
+                               break;
+                       actions |= LINK_ACTIVATE;
+                       if (l->owner->working_links == 1)
+                               tipc_link_sync_xmit(l);
+                       break;
+               case PEER_RESET_EVT:
+                       l->state = TIPC_LINK_ESTABLISHING;
+                       actions |= SND_ACTIVATE;
+                       break;
+               case SILENCE_EVT:
+                       actions |= SND_RESET;
+                       break;
+               default:
+                       pr_err("%s%u in RESETTING\n", link_unk_evt, evt);
+               }
+               break;
+       case TIPC_LINK_ESTABLISHING:
+               switch (evt) {
+               case TRAFFIC_EVT:
+               case ACTIVATE_EVT:
+                       pl = node_active_link(l->owner, 0);
+                       if (pl && link_probing(pl))
+                               break;
+                       actions |= LINK_ACTIVATE;
+                       if (l->owner->working_links == 1)
+                               tipc_link_sync_xmit(l);
+                       break;
+               case PEER_RESET_EVT:
+                       break;
+               case SILENCE_EVT:
+                       actions |= SND_ACTIVATE;
+                       break;
+               default:
+                       pr_err("%s%u ESTABLISHING\n", link_unk_evt, evt);
+               }
+               break;
+       default:
+               pr_err("Unknown link state %u/%u\n", l->state, evt);
+       }
+
+       /* Perform actions as decided by FSM */
+       if (actions & LINK_RESET) {
+               l->exec_mode = TIPC_LINK_BLOCKED;
+               rc |= TIPC_LINK_DOWN_EVT;
+       }
+       if (actions & LINK_ACTIVATE) {
+               l->exec_mode = TIPC_LINK_OPEN;
+               rc |= TIPC_LINK_UP_EVT;
+       }
+       if (actions & (SND_STATE | SND_PROBE))
+               mtyp = STATE_MSG;
+       if (actions & SND_RESET)
+               mtyp = RESET_MSG;
+       if (actions & SND_ACTIVATE)
+               mtyp = ACTIVATE_MSG;
+       if (actions & (SND_PROBE | SND_STATE | SND_RESET | SND_ACTIVATE))
+               tipc_link_build_proto_msg(l, mtyp, actions & SND_PROBE,
+                                         0, 0, 0, xmitq);
+       return rc;
+}
+
 /**
  * link_schedule_user - schedule a message sender for wakeup after congestion
  * @link: congested link
@@ -474,9 +607,10 @@ void tipc_link_reset(struct tipc_link *l_ptr)
        /* Prepare for renewed mtu size negotiation */
        l_ptr->mtu = l_ptr->advertised_mtu;
 
-       l_ptr->state = RESET_UNKNOWN;
+       l_ptr->state = TIPC_LINK_RESETTING;
 
-       if ((prev_state == RESET_UNKNOWN) || (prev_state == RESET_RESET))
+       if ((prev_state == TIPC_LINK_RESETTING) ||
+           (prev_state == TIPC_LINK_ESTABLISHING))
                return;
 
        tipc_node_link_down(l_ptr->owner, l_ptr->bearer_id);
@@ -515,6 +649,8 @@ static void link_activate(struct tipc_link *link)
        link->rcv_nxt = 1;
        link->stats.recv_info = 1;
        link->silent_intv_cnt = 0;
+       link->state = TIPC_LINK_WORKING;
+       link->exec_mode = TIPC_LINK_OPEN;
        tipc_node_link_up(node, link->bearer_id);
        tipc_bearer_add_dest(node->net, link->bearer_id, link->addr);
 }
@@ -524,132 +660,29 @@ static void link_activate(struct tipc_link *link)
  * @l_ptr: pointer to link
  * @event: state machine event to process
  */
-static void link_state_event(struct tipc_link *l_ptr, unsigned int event)
+static void link_state_event(struct tipc_link *l, unsigned int evt)
 {
-       struct tipc_link *other;
+       int rc;
+       struct sk_buff_head xmitq;
+       struct sk_buff *skb;
 
-       if (l_ptr->exec_mode == TIPC_LINK_BLOCKED)
+       if (l->exec_mode == TIPC_LINK_BLOCKED)
                return;
 
-       switch (l_ptr->state) {
-       case WORKING_WORKING:
-               switch (event) {
-               case TRAFFIC_EVT:
-               case ACTIVATE_MSG:
-                       l_ptr->silent_intv_cnt = 0;
-                       break;
-               case SILENCE_EVT:
-                       if (!l_ptr->silent_intv_cnt) {
-                               if (tipc_bclink_acks_missing(l_ptr->owner))
-                                       tipc_link_proto_xmit(l_ptr, STATE_MSG,
-                                                            0, 0, 0, 0);
-                               break;
-                       }
-                       l_ptr->state = WORKING_UNKNOWN;
-                       tipc_link_proto_xmit(l_ptr, STATE_MSG, 1, 0, 0, 0);
-                       break;
-               case RESET_MSG:
-                       pr_debug("%s<%s>, requested by peer\n",
-                                link_rst_msg, l_ptr->name);
-                       tipc_link_reset(l_ptr);
-                       l_ptr->state = RESET_RESET;
-                       tipc_link_proto_xmit(l_ptr, ACTIVATE_MSG,
-                                            0, 0, 0, 0);
-                       break;
-               default:
-                       pr_debug("%s%u in WW state\n", link_unk_evt, event);
-               }
-               break;
-       case WORKING_UNKNOWN:
-               switch (event) {
-               case TRAFFIC_EVT:
-               case ACTIVATE_MSG:
-                       l_ptr->state = WORKING_WORKING;
-                       l_ptr->silent_intv_cnt = 0;
-                       break;
-               case RESET_MSG:
-                       pr_debug("%s<%s>, requested by peer while probing\n",
-                                link_rst_msg, l_ptr->name);
-                       tipc_link_reset(l_ptr);
-                       l_ptr->state = RESET_RESET;
-                       tipc_link_proto_xmit(l_ptr, ACTIVATE_MSG,
-                                            0, 0, 0, 0);
-                       break;
-               case SILENCE_EVT:
-                       if (!l_ptr->silent_intv_cnt) {
-                               l_ptr->state = WORKING_WORKING;
-                               if (tipc_bclink_acks_missing(l_ptr->owner))
-                                       tipc_link_proto_xmit(l_ptr, STATE_MSG,
-                                                            0, 0, 0, 0);
-                       } else if (l_ptr->silent_intv_cnt <
-                                  l_ptr->abort_limit) {
-                               tipc_link_proto_xmit(l_ptr, STATE_MSG,
-                                                    1, 0, 0, 0);
-                       } else {        /* Link has failed */
-                               pr_debug("%s<%s>, peer not responding\n",
-                                        link_rst_msg, l_ptr->name);
-                               tipc_link_reset(l_ptr);
-                               l_ptr->state = RESET_UNKNOWN;
-                               tipc_link_proto_xmit(l_ptr, RESET_MSG,
-                                                    0, 0, 0, 0);
-                       }
-                       break;
-               default:
-                       pr_err("%s%u in WU state\n", link_unk_evt, event);
-               }
-               break;
-       case RESET_UNKNOWN:
-               switch (event) {
-               case TRAFFIC_EVT:
-                       break;
-               case ACTIVATE_MSG:
-                       other = node_active_link(l_ptr->owner, 0);
-                       if (other && link_working_unknown(other))
-                               break;
-                       l_ptr->state = WORKING_WORKING;
-                       link_activate(l_ptr);
-                       tipc_link_proto_xmit(l_ptr, STATE_MSG, 1, 0, 0, 0);
-                       if (l_ptr->owner->working_links == 1)
-                               tipc_link_sync_xmit(l_ptr);
-                       break;
-               case RESET_MSG:
-                       l_ptr->state = RESET_RESET;
-                       tipc_link_proto_xmit(l_ptr, ACTIVATE_MSG,
-                                            1, 0, 0, 0);
-                       break;
-               case SILENCE_EVT:
-                       tipc_link_proto_xmit(l_ptr, RESET_MSG, 0, 0, 0, 0);
-                       break;
-               default:
-                       pr_err("%s%u in RU state\n", link_unk_evt, event);
-               }
-               break;
-       case RESET_RESET:
-               switch (event) {
-               case TRAFFIC_EVT:
-               case ACTIVATE_MSG:
-                       other = node_active_link(l_ptr->owner, 0);
-                       if (other && link_working_unknown(other))
-                               break;
-                       l_ptr->state = WORKING_WORKING;
-                       link_activate(l_ptr);
-                       tipc_link_proto_xmit(l_ptr, STATE_MSG, 1, 0, 0, 0);
-                       if (l_ptr->owner->working_links == 1)
-                               tipc_link_sync_xmit(l_ptr);
-                       break;
-               case RESET_MSG:
-                       break;
-               case SILENCE_EVT:
-                       tipc_link_proto_xmit(l_ptr, ACTIVATE_MSG,
-                                            0, 0, 0, 0);
-                       break;
-               default:
-                       pr_err("%s%u in RR state\n", link_unk_evt, event);
-               }
-               break;
-       default:
-               pr_err("Unknown link state %u/%u\n", l_ptr->state, event);
-       }
+       __skb_queue_head_init(&xmitq);
+
+       rc = tipc_link_fsm_evt(l, evt, &xmitq);
+
+       if (rc & TIPC_LINK_UP_EVT)
+               link_activate(l);
+
+       if (rc & TIPC_LINK_DOWN_EVT)
+               tipc_link_reset(l);
+
+       skb = __skb_dequeue(&xmitq);
+       if (!skb)
+               return;
+       tipc_bearer_send(l->owner->net, l->bearer_id, skb, &l->media_addr);
 }
 
 /**
@@ -1102,7 +1135,7 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b_ptr)
                        link_prepare_wakeup(l_ptr);
 
                /* Process the incoming packet */
-               if (unlikely(!link_working_working(l_ptr))) {
+               if (unlikely(!link_working(l_ptr))) {
                        if (msg_user(msg) == LINK_PROTOCOL) {
                                tipc_link_proto_rcv(l_ptr, skb);
                                link_retrieve_defq(l_ptr, &head);
@@ -1113,7 +1146,7 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b_ptr)
                        /* Traffic message. Conditionally activate link */
                        link_state_event(l_ptr, TRAFFIC_EVT);
 
-                       if (link_working_working(l_ptr)) {
+                       if (link_working(l_ptr)) {
                                /* Re-insert buffer in front of queue */
                                __skb_queue_head(&head, skb);
                                skb = NULL;
@@ -1122,7 +1155,7 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b_ptr)
                        goto unlock;
                }
 
-               /* Link is now in state WORKING_WORKING */
+               /* Link is now in state TIPC_LINK_WORKING */
                if (unlikely(seq_no != l_ptr->rcv_nxt)) {
                        link_handle_out_of_seq_msg(l_ptr, skb);
                        link_retrieve_defq(l_ptr, &head);
@@ -1365,16 +1398,15 @@ static void tipc_link_proto_rcv(struct tipc_link *l_ptr,
        switch (msg_type(msg)) {
 
        case RESET_MSG:
-               if (!link_working_unknown(l_ptr) &&
+               if (!link_probing(l_ptr) &&
                    (l_ptr->peer_session != WILDCARD_SESSION)) {
                        if (less_eq(msg_session(msg), l_ptr->peer_session))
                                break; /* duplicate or old reset: ignore */
                }
 
-               if (!msg_redundant_link(msg) && (link_working_working(l_ptr) ||
-                               link_working_unknown(l_ptr))) {
-                       /*
-                        * peer has lost contact -- don't allow peer's links
+               if (!msg_redundant_link(msg) && (link_working(l_ptr) ||
+                                                link_probing(l_ptr))) {
+                       /* peer has lost contact -- don't allow peer's links
                         * to reactivate before we recognize loss & clean up
                         */
                        l_ptr->owner->action_flags |= TIPC_WAIT_OWN_LINKS_DOWN;
@@ -1432,7 +1464,7 @@ static void tipc_link_proto_rcv(struct tipc_link *l_ptr,
 
                link_state_event(l_ptr, TRAFFIC_EVT);
                l_ptr->stats.recv_states++;
-               if (link_reset_unknown(l_ptr))
+               if (link_resetting(l_ptr))
                        break;
 
                if (less_eq(l_ptr->rcv_nxt, msg_next_sent(msg)))
@@ -1822,14 +1854,14 @@ static void link_print(struct tipc_link *l_ptr, const char *str)
                pr_info("%s Link %x<%s>:", str, l_ptr->addr, b_ptr->name);
        rcu_read_unlock();
 
-       if (link_working_unknown(l_ptr))
-               pr_cont(":WU\n");
-       else if (link_reset_reset(l_ptr))
-               pr_cont(":RR\n");
-       else if (link_reset_unknown(l_ptr))
-               pr_cont(":RU\n");
-       else if (link_working_working(l_ptr))
-               pr_cont(":WW\n");
+       if (link_probing(l_ptr))
+               pr_cont(":P\n");
+       else if (link_establishing(l_ptr))
+               pr_cont(":E\n");
+       else if (link_resetting(l_ptr))
+               pr_cont(":R\n");
+       else if (link_working(l_ptr))
+               pr_cont(":W\n");
        else
                pr_cont("\n");
 }
index 0509c6d..ef68424 100644 (file)
@@ -58,6 +58,13 @@ enum {
        TIPC_LINK_TUNNEL
 };
 
+/* Events occurring at packet reception or at timeout
+ */
+enum {
+       TIPC_LINK_UP_EVT       = 1,
+       TIPC_LINK_DOWN_EVT     = (1 << 1)
+};
+
 /* Starting value for maximum packet size negotiation on unicast links
  * (unless bearer MTU is less)
  */