ofproto-dpif: Use fat_rwlock instead of ovs_rwlock.
[cascardo/ovs.git] / ofproto / ofproto-dpif-xlate.c
index b8e8084..c5dc9e8 100644 (file)
@@ -1,4 +1,4 @@
-/* Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014 Nicira, Inc.
+/* Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014, 2015 Nicira, Inc.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -51,6 +51,7 @@
 
 COVERAGE_DEFINE(xlate_actions);
 COVERAGE_DEFINE(xlate_actions_oversize);
+COVERAGE_DEFINE(xlate_actions_too_many_output);
 COVERAGE_DEFINE(xlate_actions_mpls_overflow);
 
 VLOG_DEFINE_THIS_MODULE(ofproto_dpif_xlate);
@@ -58,12 +59,14 @@ VLOG_DEFINE_THIS_MODULE(ofproto_dpif_xlate);
 /* Maximum depth of flow table recursion (due to resubmit actions) in a
  * flow translation. */
 #define MAX_RESUBMIT_RECURSION 64
+#define MAX_INTERNAL_RESUBMITS 1   /* Max resbmits allowed using rules in
+                                      internal table. */
 
 /* Maximum number of resubmit actions in a flow translation, whether they are
  * recursive or not. */
 #define MAX_RESUBMITS (MAX_RESUBMIT_RECURSION * MAX_RESUBMIT_RECURSION)
 
-struct ovs_rwlock xlate_rwlock = OVS_RWLOCK_INITIALIZER;
+struct fat_rwlock xlate_rwlock;
 
 struct xbridge {
     struct hmap_node hmap_node;   /* Node in global 'xbridges' map. */
@@ -89,6 +92,9 @@ struct xbridge {
     bool has_in_band;             /* Bridge has in band control? */
     bool forward_bpdu;            /* Bridge forwards STP BPDUs? */
 
+    /* True if the datapath supports recirculation. */
+    bool enable_recirc;
+
     /* True if the datapath supports variable-length
      * OVS_USERSPACE_ATTR_USERDATA in OVS_ACTION_ATTR_USERSPACE actions.
      * False if the datapath supports only 8-byte (or shorter) userdata. */
@@ -187,6 +193,10 @@ struct xlate_ctx {
     uint16_t user_cookie_offset;/* Used for user_action_cookie fixup. */
     bool exit;                  /* No further actions should be processed. */
 
+    bool use_recirc;            /* Should generate recirc? */
+    struct xlate_recirc recirc; /* Information used for generating
+                                 * recirculation actions */
+
     /* OpenFlow 1.1+ action set.
      *
      * 'action_set' accumulates "struct ofpact"s added by OFPACT_WRITE_ACTIONS.
@@ -217,6 +227,73 @@ struct skb_priority_to_dscp {
     uint8_t dscp;               /* DSCP bits to mark outgoing traffic with. */
 };
 
+enum xc_type {
+    XC_RULE,
+    XC_BOND,
+    XC_NETDEV,
+    XC_NETFLOW,
+    XC_MIRROR,
+    XC_LEARN,
+    XC_NORMAL,
+    XC_FIN_TIMEOUT,
+};
+
+/* xlate_cache entries hold enough information to perform the side effects of
+ * xlate_actions() for a rule, without needing to perform rule translation
+ * from scratch. The primary usage of these is to submit statistics to objects
+ * that a flow relates to, although they may be used for other effects as well
+ * (for instance, refreshing hard timeouts for learned flows). */
+struct xc_entry {
+    enum xc_type type;
+    union {
+        struct rule_dpif *rule;
+        struct {
+            struct netdev *tx;
+            struct netdev *rx;
+            struct bfd *bfd;
+        } dev;
+        struct {
+            struct netflow *netflow;
+            struct flow *flow;
+            ofp_port_t iface;
+        } nf;
+        struct {
+            struct mbridge *mbridge;
+            mirror_mask_t mirrors;
+        } mirror;
+        struct {
+            struct bond *bond;
+            struct flow *flow;
+            uint16_t vid;
+        } bond;
+        struct {
+            struct ofproto_dpif *ofproto;
+            struct ofputil_flow_mod *fm;
+            struct ofpbuf *ofpacts;
+        } learn;
+        struct {
+            struct ofproto_dpif *ofproto;
+            struct flow *flow;
+            int vlan;
+        } normal;
+        struct {
+            struct rule_dpif *rule;
+            uint16_t idle;
+            uint16_t hard;
+        } fin;
+    } u;
+};
+
+#define XC_ENTRY_FOR_EACH(entry, entries, xcache)               \
+    entries = xcache->entries;                                  \
+    for (entry = ofpbuf_try_pull(&entries, sizeof *entry);      \
+         entry;                                                 \
+         entry = ofpbuf_try_pull(&entries, sizeof *entry))
+
+struct xlate_cache {
+    struct ofpbuf entries;
+};
+
 static struct hmap xbridges = HMAP_INITIALIZER(&xbridges);
 static struct hmap xbundles = HMAP_INITIALIZER(&xbundles);
 static struct hmap xports = HMAP_INITIALIZER(&xports);
@@ -226,8 +303,8 @@ static void do_xlate_actions(const struct ofpact *, size_t ofpacts_len,
                              struct xlate_ctx *);
 static void xlate_actions__(struct xlate_in *, struct xlate_out *)
     OVS_REQ_RDLOCK(xlate_rwlock);
-    static void xlate_normal(struct xlate_ctx *);
-    static void xlate_report(struct xlate_ctx *, const char *);
+static void xlate_normal(struct xlate_ctx *);
+static void xlate_report(struct xlate_ctx *, const char *);
 static void xlate_table_action(struct xlate_ctx *, ofp_port_t in_port,
                                uint8_t table_id, bool may_packet_in,
                                bool honor_table_miss);
@@ -247,6 +324,9 @@ static void clear_skb_priorities(struct xport *);
 static bool dscp_from_skb_priority(const struct xport *, uint32_t skb_priority,
                                    uint8_t *dscp);
 
+static struct xc_entry *xlate_cache_add_entry(struct xlate_cache *xc,
+                                              enum xc_type type);
+
 void
 xlate_ofproto_set(struct ofproto_dpif *ofproto, const char *name,
                   struct dpif *dpif, struct rule_dpif *miss_rule,
@@ -257,6 +337,7 @@ xlate_ofproto_set(struct ofproto_dpif *ofproto, const char *name,
                   const struct dpif_ipfix *ipfix,
                   const struct netflow *netflow, enum ofp_config_flags frag,
                   bool forward_bpdu, bool has_in_band,
+                  bool enable_recirc,
                   bool variable_length_userdata,
                   size_t max_mpls_depth)
 {
@@ -310,6 +391,7 @@ xlate_ofproto_set(struct ofproto_dpif *ofproto, const char *name,
     xbridge->frag = frag;
     xbridge->miss_rule = miss_rule;
     xbridge->no_packet_in_rule = no_packet_in_rule;
+    xbridge->enable_recirc = enable_recirc;
     xbridge->variable_length_userdata = variable_length_userdata;
     xbridge->max_mpls_depth = max_mpls_depth;
 }
@@ -552,10 +634,12 @@ xlate_receive(const struct dpif_backer *backer, struct ofpbuf *packet,
               struct dpif_sflow **sflow, struct netflow **netflow,
               odp_port_t *odp_in_port)
 {
+    struct ofproto_dpif *recv_ofproto = NULL;
+    struct ofproto_dpif *recirc_ofproto = NULL;
     const struct xport *xport;
     int error = ENODEV;
 
-    ovs_rwlock_rdlock(&xlate_rwlock);
+    fat_rwlock_rdlock(&xlate_rwlock);
     if (odp_flow_key_to_flow(key, key_len, flow) == ODP_FIT_ERROR) {
         error = EINVAL;
         goto exit;
@@ -573,6 +657,7 @@ xlate_receive(const struct dpif_backer *backer, struct ofpbuf *packet,
     if (!xport) {
         goto exit;
     }
+    recv_ofproto = xport->xbridge->ofproto;
 
     if (vsp_adjust_flow(xport->xbridge->ofproto, flow)) {
         if (packet) {
@@ -585,24 +670,58 @@ xlate_receive(const struct dpif_backer *backer, struct ofpbuf *packet,
     }
     error = 0;
 
+    /* When recirc_id is set in 'flow', checks whether the ofproto_dpif that
+     * corresponds to the recirc_id is same as the receiving bridge.  If they
+     * are the same, uses the 'recv_ofproto' and keeps the 'ofp_in_port' as
+     * assigned.  Otherwise, uses the 'recirc_ofproto' that owns recirc_id and
+     * assigns OFPP_NONE to 'ofp_in_port'.  Doing this is in that, the
+     * recirculated flow must be processced by the ofproto which originates
+     * the recirculation, and as bridges can only see their own ports, the
+     * in_port of the 'recv_ofproto' should not be passed to the
+     * 'recirc_ofproto'.
+     *
+     * Admittedly, setting the 'ofp_in_port' to OFPP_NONE limits the
+     * 'recirc_ofproto' from meaningfully matching on in_port of recirculated
+     * flow, and should be fixed in the near future.
+     *
+     * TODO: Restore the original patch port.
+     */
+    if (flow->recirc_id) {
+        recirc_ofproto = ofproto_dpif_recirc_get_ofproto(backer,
+                                                         flow->recirc_id);
+        /* Returns error if could not find recirculation bridge */
+        if (!recirc_ofproto) {
+            error = ENOENT;
+            goto exit;
+        }
+
+        if (recv_ofproto != recirc_ofproto) {
+            xport = NULL;
+            flow->in_port.ofp_port = OFPP_NONE;
+            if (odp_in_port) {
+                *odp_in_port = ODPP_NONE;
+            }
+        }
+    }
+
     if (ofproto) {
-        *ofproto = xport->xbridge->ofproto;
+        *ofproto = xport ? recv_ofproto : recirc_ofproto;
     }
 
     if (ipfix) {
-        *ipfix = dpif_ipfix_ref(xport->xbridge->ipfix);
+        *ipfix = xport ? dpif_ipfix_ref(xport->xbridge->ipfix) : NULL;
     }
 
     if (sflow) {
-        *sflow = dpif_sflow_ref(xport->xbridge->sflow);
+        *sflow = xport ? dpif_sflow_ref(xport->xbridge->sflow) : NULL;
     }
 
     if (netflow) {
-        *netflow = netflow_ref(xport->xbridge->netflow);
+        *netflow = xport ? netflow_ref(xport->xbridge->netflow) : NULL;
     }
 
 exit:
-    ovs_rwlock_unlock(&xlate_rwlock);
+    fat_rwlock_unlock(&xlate_rwlock);
     return error;
 }
 
@@ -683,10 +802,10 @@ xport_stp_forward_state(const struct xport *xport)
 }
 
 static bool
-xport_stp_listen_state(const struct xport *xport)
+xport_stp_should_forward_bpdu(const struct xport *xport)
 {
     struct stp_port *sp = xport_get_stp_port(xport);
-    return stp_listen_in_state(sp ? stp_port_get_state(sp) : STP_DISABLED);
+    return stp_should_forward_bpdu(sp ? stp_port_get_state(sp) : STP_DISABLED);
 }
 
 /* Returns true if STP should process 'flow'.  Sets fields in 'wc' that
@@ -694,8 +813,9 @@ xport_stp_listen_state(const struct xport *xport)
 static bool
 stp_should_process_flow(const struct flow *flow, struct flow_wildcards *wc)
 {
+    /* is_stp() also checks dl_type, but dl_type is always set in 'wc'. */
     memset(&wc->masks.dl_dst, 0xff, sizeof wc->masks.dl_dst);
-    return eth_addr_equals(flow->dl_dst, eth_addr_stp);
+    return is_stp(flow);
 }
 
 static void
@@ -894,9 +1014,10 @@ lookup_input_bundle(const struct xbridge *xbridge, ofp_port_t in_port,
         return xport->xbundle;
     }
 
-    /* Special-case OFPP_NONE, which a controller may use as the ingress
-     * port for traffic that it is sourcing. */
-    if (in_port == OFPP_NONE) {
+    /* Special-case OFPP_NONE (OF1.0) and OFPP_CONTROLLER (OF1.1+),
+     * which a controller may use as the ingress port for traffic that
+     * it is sourcing. */
+    if (in_port == OFPP_CONTROLLER || in_port == OFPP_NONE) {
         return &ofpp_none_bundle;
     }
 
@@ -1131,13 +1252,30 @@ output_normal(struct xlate_ctx *ctx, const struct xbundle *out_xbundle,
         /* Partially configured bundle with no slaves.  Drop the packet. */
         return;
     } else if (!out_xbundle->bond) {
+        ctx->use_recirc = false;
         xport = CONTAINER_OF(list_front(&out_xbundle->xports), struct xport,
                              bundle_node);
     } else {
         struct ofport_dpif *ofport;
+        struct xlate_recirc *xr = &ctx->recirc;
+        struct flow_wildcards *wc = &ctx->xout->wc;
+
+        if (ctx->xbridge->enable_recirc) {
+            ctx->use_recirc = bond_may_recirc(
+                out_xbundle->bond, &xr->recirc_id, &xr->hash_basis);
+
+            if (ctx->use_recirc) {
+                /* Only TCP mode uses recirculation. */
+                xr->hash_alg = OVS_HASH_ALG_L4;
+                bond_update_post_recirc_rules(out_xbundle->bond, false);
+
+                /* Recirculation does not require unmasking hash fields. */
+                wc = NULL;
+            }
+        }
 
-        ofport = bond_choose_output_slave(out_xbundle->bond, &ctx->xin->flow,
-                                          &ctx->xout->wc, vid);
+        ofport = bond_choose_output_slave(out_xbundle->bond,
+                                          &ctx->xin->flow, wc, vid);
         xport = xport_lookup(ofport);
 
         if (!xport) {
@@ -1145,9 +1283,23 @@ output_normal(struct xlate_ctx *ctx, const struct xbundle *out_xbundle,
             return;
         }
 
-        if (ctx->xin->resubmit_stats) {
-            bond_account(out_xbundle->bond, &ctx->xin->flow, vid,
-                         ctx->xin->resubmit_stats->n_bytes);
+        /* If ctx->xout->use_recirc is set, the main thread will handle stats
+         * accounting for this bond. */
+        if (!ctx->use_recirc) {
+            if (ctx->xin->resubmit_stats) {
+                bond_account(out_xbundle->bond, &ctx->xin->flow, vid,
+                             ctx->xin->resubmit_stats->n_bytes);
+            }
+            if (ctx->xin->xcache) {
+                struct xc_entry *entry;
+                struct flow *flow;
+
+                flow = &ctx->xin->flow;
+                entry = xlate_cache_add_entry(ctx->xin->xcache, XC_BOND);
+                entry->u.bond.bond = bond_ref(out_xbundle->bond);
+                entry->u.bond.flow = xmemdup(flow, sizeof *flow);
+                entry->u.bond.vid = vid;
+            }
         }
     }
 
@@ -1231,7 +1383,7 @@ OVS_REQ_RDLOCK(ml->rwlock)
         }
     }
 
-    return mac->port.p != in_xbundle->ofbundle;
+    return mac_entry_get_port(ml, mac) != in_xbundle->ofbundle;
 }
 
 
@@ -1267,7 +1419,7 @@ OVS_REQ_WRLOCK(xbridge->ml->rwlock)
         }
     }
 
-    if (mac->port.p != in_xbundle->ofbundle) {
+    if (mac_entry_get_port(xbridge->ml, mac) != in_xbundle->ofbundle) {
         /* The log messages here could actually be useful in debugging,
          * so keep the rate limit relatively high. */
         static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(30, 300);
@@ -1277,8 +1429,7 @@ OVS_REQ_WRLOCK(xbridge->ml->rwlock)
                     xbridge->name, ETH_ADDR_ARGS(flow->dl_src),
                     in_xbundle->name, vlan);
 
-        mac->port.p = in_xbundle->ofbundle;
-        mac_learning_changed(xbridge->ml);
+        mac_entry_set_port(xbridge->ml, mac, in_xbundle->ofbundle);
     }
 }
 
@@ -1352,9 +1503,10 @@ is_admissible(struct xlate_ctx *ctx, struct xport *in_port,
         case BV_DROP_IF_MOVED:
             ovs_rwlock_rdlock(&xbridge->ml->rwlock);
             mac = mac_learning_lookup(xbridge->ml, flow->dl_src, vlan);
-            if (mac && mac->port.p != in_xbundle->ofbundle &&
-                (!is_gratuitous_arp(flow, &ctx->xout->wc)
-                 || mac_entry_is_grat_arp_locked(mac))) {
+            if (mac
+                && mac_entry_get_port(xbridge->ml, mac) != in_xbundle->ofbundle
+                && (!is_gratuitous_arp(flow, &ctx->xout->wc)
+                    || mac_entry_is_grat_arp_locked(mac))) {
                 ovs_rwlock_unlock(&xbridge->ml->rwlock);
                 xlate_report(ctx, "SLB bond thinks this packet looped back, "
                              "dropping");
@@ -1435,11 +1587,20 @@ xlate_normal(struct xlate_ctx *ctx)
     if (ctx->xin->may_learn) {
         update_learning_table(ctx->xbridge, flow, wc, vlan, in_xbundle);
     }
+    if (ctx->xin->xcache) {
+        struct xc_entry *entry;
+
+        /* Save enough info to update mac learning table later. */
+        entry = xlate_cache_add_entry(ctx->xin->xcache, XC_NORMAL);
+        entry->u.normal.ofproto = ctx->xbridge->ofproto;
+        entry->u.normal.flow = xmemdup(flow, sizeof *flow);
+        entry->u.normal.vlan = vlan;
+    }
 
     /* Determine output bundle. */
     ovs_rwlock_rdlock(&ctx->xbridge->ml->rwlock);
     mac = mac_learning_lookup(ctx->xbridge->ml, flow->dl_dst, vlan);
-    mac_port = mac ? mac->port.p : NULL;
+    mac_port = mac ? mac_entry_get_port(ctx->xbridge->ml, mac) : NULL;
     ovs_rwlock_unlock(&ctx->xbridge->ml->rwlock);
 
     if (mac_port) {
@@ -1492,8 +1653,10 @@ compose_sample_action(const struct xbridge *xbridge,
     actions_offset = nl_msg_start_nested(odp_actions, OVS_SAMPLE_ATTR_ACTIONS);
 
     odp_port = ofp_port_to_odp_port(xbridge, flow->in_port.ofp_port);
-    pid = dpif_port_get_pid(xbridge->dpif, odp_port, 0);
-    cookie_offset = odp_put_userspace_action(pid, cookie, cookie_size, odp_actions);
+    pid = dpif_port_get_pid(xbridge->dpif, odp_port,
+                            flow_hash_5tuple(flow, 0));
+    cookie_offset = odp_put_userspace_action(pid, cookie, cookie_size,
+                                             odp_actions);
 
     nl_msg_end_nested(odp_actions, actions_offset);
     nl_msg_end_nested(odp_actions, sample_offset);
@@ -1654,11 +1817,7 @@ process_special(struct xlate_ctx *ctx, const struct flow *flow,
             bfd_process_packet(xport->bfd, flow, packet);
             /* If POLL received, immediately sends FINAL back. */
             if (bfd_should_send_packet(xport->bfd)) {
-                if (xport->peer) {
-                    ofproto_dpif_monitor_port_send_soon(xport->ofport);
-                } else {
-                    ofproto_dpif_monitor_port_send_soon_safe(xport->ofport);
-                }
+                ofproto_dpif_monitor_port_send_soon(xport->ofport);
             }
         }
         return SLOW_BFD;
@@ -1693,7 +1852,7 @@ compose_output_action__(struct xlate_ctx *ctx, ofp_port_t ofp_port,
 
     /* If 'struct flow' gets additional metadata, we'll need to zero it out
      * before traversing a patch port. */
-    BUILD_ASSERT_DECL(FLOW_WC_SEQ == 25);
+    BUILD_ASSERT_DECL(FLOW_WC_SEQ == 26);
 
     if (!xport) {
         xlate_report(ctx, "Nonexistent output port");
@@ -1702,8 +1861,8 @@ compose_output_action__(struct xlate_ctx *ctx, ofp_port_t ofp_port,
         xlate_report(ctx, "OFPPC_NO_FWD set, skipping output");
         return;
     } else if (check_stp) {
-        if (eth_addr_equals(ctx->base_flow.dl_dst, eth_addr_stp)) {
-            if (!xport_stp_listen_state(xport)) {
+        if (is_stp(&ctx->base_flow)) {
+            if (!xport_stp_should_forward_bpdu(xport)) {
                 xlate_report(ctx, "STP not in listening state, "
                              "skipping bpdu output");
                 return;
@@ -1724,6 +1883,7 @@ compose_output_action__(struct xlate_ctx *ctx, ofp_port_t ofp_port,
         const struct xport *peer = xport->peer;
         struct flow old_flow = ctx->xin->flow;
         enum slow_path_reason special;
+        uint8_t table_id = rule_dpif_lookup_get_init_table_id(&ctx->xin->flow);
 
         ctx->xbridge = peer->xbridge;
         flow->in_port.ofp_port = peer->ofp_port;
@@ -1737,14 +1897,16 @@ compose_output_action__(struct xlate_ctx *ctx, ofp_port_t ofp_port,
             ctx->xout->slow |= special;
         } else if (may_receive(peer, ctx)) {
             if (xport_stp_forward_state(peer)) {
-                xlate_table_action(ctx, flow->in_port.ofp_port, 0, true, true);
+                xlate_table_action(ctx, flow->in_port.ofp_port, table_id,
+                                   true, true);
             } else {
                 /* Forwarding is disabled by STP.  Let OFPP_NORMAL and the
                  * learning action look at the packet, then drop it. */
                 struct flow old_base_flow = ctx->base_flow;
                 size_t old_size = ofpbuf_size(&ctx->xout->odp_actions);
                 mirror_mask_t old_mirrors = ctx->xout->mirrors;
-                xlate_table_action(ctx, flow->in_port.ofp_port, 0, true, true);
+                xlate_table_action(ctx, flow->in_port.ofp_port, table_id,
+                                   true, true);
                 ctx->xout->mirrors = old_mirrors;
                 ctx->base_flow = old_base_flow;
                 ofpbuf_set_size(&ctx->xout->odp_actions, old_size);
@@ -1761,6 +1923,14 @@ compose_output_action__(struct xlate_ctx *ctx, ofp_port_t ofp_port,
                 bfd_account_rx(peer->bfd, ctx->xin->resubmit_stats);
             }
         }
+        if (ctx->xin->xcache) {
+            struct xc_entry *entry;
+
+            entry = xlate_cache_add_entry(ctx->xin->xcache, XC_NETDEV);
+            entry->u.dev.tx = netdev_ref(xport->netdev);
+            entry->u.dev.rx = netdev_ref(peer->netdev);
+            entry->u.dev.bfd = bfd_ref(peer->bfd);
+        }
 
         return;
     }
@@ -1770,7 +1940,7 @@ compose_output_action__(struct xlate_ctx *ctx, ofp_port_t ofp_port,
     flow_nw_tos = flow->nw_tos;
 
     if (dscp_from_skb_priority(xport, flow->skb_priority, &dscp)) {
-        wc->masks.nw_tos |= IP_ECN_MASK;
+        wc->masks.nw_tos |= IP_DSCP_MASK;
         flow->nw_tos &= ~IP_DSCP_MASK;
         flow->nw_tos |= dscp;
     }
@@ -1793,6 +1963,12 @@ compose_output_action__(struct xlate_ctx *ctx, ofp_port_t ofp_port,
         if (ctx->xin->resubmit_stats) {
             netdev_vport_inc_tx(xport->netdev, ctx->xin->resubmit_stats);
         }
+        if (ctx->xin->xcache) {
+            struct xc_entry *entry;
+
+            entry = xlate_cache_add_entry(ctx->xin->xcache, XC_NETDEV);
+            entry->u.dev.tx = netdev_ref(xport->netdev);
+        }
         out_port = odp_port;
         commit_odp_tunnel_action(flow, &ctx->base_flow,
                                  &ctx->xout->odp_actions);
@@ -1817,8 +1993,25 @@ compose_output_action__(struct xlate_ctx *ctx, ofp_port_t ofp_port,
         ctx->xout->slow |= commit_odp_actions(flow, &ctx->base_flow,
                                               &ctx->xout->odp_actions,
                                               &ctx->xout->wc);
-        nl_msg_put_odp_port(&ctx->xout->odp_actions, OVS_ACTION_ATTR_OUTPUT,
-                            out_port);
+
+        if (ctx->use_recirc) {
+            struct ovs_action_hash *act_hash;
+            struct xlate_recirc *xr = &ctx->recirc;
+
+            /* Hash action. */
+            act_hash = nl_msg_put_unspec_uninit(&ctx->xout->odp_actions,
+                                                OVS_ACTION_ATTR_HASH,
+                                                sizeof *act_hash);
+            act_hash->hash_alg = xr->hash_alg;
+            act_hash->hash_basis = xr->hash_basis;
+
+            /* Recirc action. */
+            nl_msg_put_u32(&ctx->xout->odp_actions, OVS_ACTION_ATTR_RECIRC,
+                           xr->recirc_id);
+        } else {
+            nl_msg_put_odp_port(&ctx->xout->odp_actions, OVS_ACTION_ATTR_OUTPUT,
+                                out_port);
+        }
 
         ctx->sflow_odp_port = odp_port;
         ctx->sflow_n_outputs++;
@@ -1842,7 +2035,7 @@ static void
 xlate_recursively(struct xlate_ctx *ctx, struct rule_dpif *rule)
 {
     struct rule_dpif *old_rule = ctx->rule;
-    struct rule_actions *actions;
+    const struct rule_actions *actions;
 
     if (ctx->xin->resubmit_stats) {
         rule_dpif_credit_stats(rule, ctx->xin->resubmit_stats);
@@ -1862,10 +2055,10 @@ xlate_resubmit_resource_check(struct xlate_ctx *ctx)
 {
     static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
 
-    if (ctx->recurse >= MAX_RESUBMIT_RECURSION) {
+    if (ctx->recurse >= MAX_RESUBMIT_RECURSION + MAX_INTERNAL_RESUBMITS) {
         VLOG_ERR_RL(&rl, "resubmit actions recursed over %d times",
                     MAX_RESUBMIT_RECURSION);
-    } else if (ctx->resubmits >= MAX_RESUBMITS) {
+    } else if (ctx->resubmits >= MAX_RESUBMITS + MAX_INTERNAL_RESUBMITS) {
         VLOG_ERR_RL(&rl, "over %d resubmit actions", MAX_RESUBMITS);
     } else if (ofpbuf_size(&ctx->xout->odp_actions) > UINT16_MAX) {
         VLOG_ERR_RL(&rl, "resubmits yielded over 64 kB of actions");
@@ -1901,7 +2094,8 @@ xlate_table_action(struct xlate_ctx *ctx, ofp_port_t in_port, uint8_t table_id,
                                               !skip_wildcards
                                               ? &ctx->xout->wc : NULL,
                                               honor_table_miss,
-                                              &ctx->table_id, &rule);
+                                              &ctx->table_id, &rule,
+                                              ctx->xin->xcache != NULL);
         ctx->xin->flow.in_port.ofp_port = old_in_port;
 
         if (ctx->xin->resubmit_hook) {
@@ -1934,12 +2128,22 @@ xlate_table_action(struct xlate_ctx *ctx, ofp_port_t in_port, uint8_t table_id,
         }
 
         choose_miss_rule(config, ctx->xbridge->miss_rule,
-                         ctx->xbridge->no_packet_in_rule, &rule);
+                         ctx->xbridge->no_packet_in_rule, &rule,
+                         ctx->xin->xcache != NULL);
 
 match:
         if (rule) {
+            /* Fill in the cache entry here instead of xlate_recursively
+             * to make the reference counting more explicit.  We take a
+             * reference in the lookups above if we are going to cache the
+             * rule. */
+            if (ctx->xin->xcache) {
+                struct xc_entry *entry;
+
+                entry = xlate_cache_add_entry(ctx->xin->xcache, XC_RULE);
+                entry->u.rule = rule;
+            }
             xlate_recursively(ctx, rule);
-            rule_dpif_unref(rule);
         }
 
         ctx->table_id = old_table_id;
@@ -1954,6 +2158,7 @@ xlate_group_bucket(struct xlate_ctx *ctx, const struct ofputil_bucket *bucket)
 {
     uint64_t action_list_stub[1024 / 8];
     struct ofpbuf action_list, action_set;
+    struct flow old_flow = ctx->xin->flow;
 
     ofpbuf_use_const(&action_set, bucket->ofpacts, bucket->ofpacts_len);
     ofpbuf_use_stub(&action_list, action_list_stub, sizeof action_list_stub);
@@ -1965,6 +2170,25 @@ xlate_group_bucket(struct xlate_ctx *ctx, const struct ofputil_bucket *bucket)
 
     ofpbuf_uninit(&action_set);
     ofpbuf_uninit(&action_list);
+
+    /* Roll back flow to previous state.
+     * This is equivalent to cloning the packet for each bucket.
+     *
+     * As a side effect any subsequently applied actions will
+     * also effectively be applied to a clone of the packet taken
+     * just before applying the all or indirect group.
+     *
+     * Note that group buckets are action sets, hence they cannot modify the
+     * main action set.  Also any stack actions are ignored when executing an
+     * action set, so group buckets cannot change the stack either. */
+    ctx->xin->flow = old_flow;
+
+    /* The fact that the group bucket exits (for any reason) does not mean that
+     * the translation after the group action should exit.  Specifically, if
+     * the group bucket recirculates (which typically modifies the packet), the
+     * actions after the group action must continue processing with the
+     * original, not the recirculated packet! */
+    ctx->exit = false;
 }
 
 static void
@@ -1972,19 +2196,11 @@ xlate_all_group(struct xlate_ctx *ctx, struct group_dpif *group)
 {
     const struct ofputil_bucket *bucket;
     const struct list *buckets;
-    struct flow old_flow = ctx->xin->flow;
 
     group_dpif_get_buckets(group, &buckets);
 
     LIST_FOR_EACH (bucket, list_node, buckets) {
         xlate_group_bucket(ctx, bucket);
-        /* Roll back flow to previous state.
-         * This is equivalent to cloning the packet for each bucket.
-         *
-         * As a side effect any subsequently applied actions will
-         * also effectively be applied to a clone of the packet taken
-         * just before applying the all or indirect group. */
-        ctx->xin->flow = old_flow;
     }
 }
 
@@ -2086,6 +2302,15 @@ xlate_ofpact_resubmit(struct xlate_ctx *ctx,
 {
     ofp_port_t in_port;
     uint8_t table_id;
+    bool may_packet_in = false;
+    bool honor_table_miss = false;
+
+    if (ctx->rule && rule_dpif_is_internal(ctx->rule)) {
+        /* Still allow missed packets to be sent to the controller
+         * if resubmitting from an internal table. */
+        may_packet_in = true;
+        honor_table_miss = true;
+    }
 
     in_port = resubmit->in_port;
     if (in_port == OFPP_IN_PORT) {
@@ -2097,7 +2322,8 @@ xlate_ofpact_resubmit(struct xlate_ctx *ctx,
         table_id = ctx->table_id;
     }
 
-    xlate_table_action(ctx, in_port, table_id, false, false);
+    xlate_table_action(ctx, in_port, table_id, may_packet_in,
+                       honor_table_miss);
 }
 
 static void
@@ -2462,34 +2688,67 @@ xlate_bundle_action(struct xlate_ctx *ctx,
 }
 
 static void
-xlate_learn_action(struct xlate_ctx *ctx,
-                   const struct ofpact_learn *learn)
+xlate_learn_action__(struct xlate_ctx *ctx, const struct ofpact_learn *learn,
+                     struct ofputil_flow_mod *fm, struct ofpbuf *ofpacts)
 {
-    uint64_t ofpacts_stub[1024 / 8];
-    struct ofputil_flow_mod fm;
-    struct ofpbuf ofpacts;
+    learn_execute(learn, &ctx->xin->flow, fm, ofpacts);
+    if (ctx->xin->may_learn) {
+        ofproto_dpif_flow_mod(ctx->xbridge->ofproto, fm);
+    }
+}
 
+static void
+xlate_learn_action(struct xlate_ctx *ctx, const struct ofpact_learn *learn)
+{
     ctx->xout->has_learn = true;
-
     learn_mask(learn, &ctx->xout->wc);
 
-    if (!ctx->xin->may_learn) {
-        return;
+    if (ctx->xin->xcache) {
+        struct xc_entry *entry;
+
+        entry = xlate_cache_add_entry(ctx->xin->xcache, XC_LEARN);
+        entry->u.learn.ofproto = ctx->xbridge->ofproto;
+        entry->u.learn.fm = xmalloc(sizeof *entry->u.learn.fm);
+        entry->u.learn.ofpacts = ofpbuf_new(64);
+        xlate_learn_action__(ctx, learn, entry->u.learn.fm,
+                             entry->u.learn.ofpacts);
+    } else if (ctx->xin->may_learn) {
+        uint64_t ofpacts_stub[1024 / 8];
+        struct ofputil_flow_mod fm;
+        struct ofpbuf ofpacts;
+
+        ofpbuf_use_stub(&ofpacts, ofpacts_stub, sizeof ofpacts_stub);
+        xlate_learn_action__(ctx, learn, &fm, &ofpacts);
+        ofpbuf_uninit(&ofpacts);
     }
+}
 
-    ofpbuf_use_stub(&ofpacts, ofpacts_stub, sizeof ofpacts_stub);
-    learn_execute(learn, &ctx->xin->flow, &fm, &ofpacts);
-    ofproto_dpif_flow_mod(ctx->xbridge->ofproto, &fm);
-    ofpbuf_uninit(&ofpacts);
+static void
+xlate_fin_timeout__(struct rule_dpif *rule, uint16_t tcp_flags,
+                    uint16_t idle_timeout, uint16_t hard_timeout)
+{
+    if (tcp_flags & (TCP_FIN | TCP_RST)) {
+        rule_dpif_reduce_timeouts(rule, idle_timeout, hard_timeout);
+    }
 }
 
 static void
 xlate_fin_timeout(struct xlate_ctx *ctx,
                   const struct ofpact_fin_timeout *oft)
 {
-    if (ctx->xin->tcp_flags & (TCP_FIN | TCP_RST) && ctx->rule) {
-        rule_dpif_reduce_timeouts(ctx->rule, oft->fin_idle_timeout,
-                                  oft->fin_hard_timeout);
+    if (ctx->rule) {
+        xlate_fin_timeout__(ctx->rule, ctx->xin->tcp_flags,
+                            oft->fin_idle_timeout, oft->fin_hard_timeout);
+        if (ctx->xin->xcache) {
+            struct xc_entry *entry;
+
+            entry = xlate_cache_add_entry(ctx->xin->xcache, XC_FIN_TIMEOUT);
+            /* XC_RULE already holds a reference on the rule, none is taken
+             * here. */
+            entry->u.fin.rule = ctx->rule;
+            entry->u.fin.idle = oft->fin_idle_timeout;
+            entry->u.fin.hard = oft->fin_hard_timeout;
+        }
     }
 }
 
@@ -2524,7 +2783,7 @@ xlate_sample_action(struct xlate_ctx *ctx,
 static bool
 may_receive(const struct xport *xport, struct xlate_ctx *ctx)
 {
-    if (xport->config & (eth_addr_equals(ctx->xin->flow.dl_dst, eth_addr_stp)
+    if (xport->config & (is_stp(&ctx->xin->flow)
                          ? OFPUTIL_PC_NO_RECV_STP
                          : OFPUTIL_PC_NO_RECV)) {
         return false;
@@ -2725,15 +2984,22 @@ do_xlate_actions(const struct ofpact *ofpacts, size_t ofpacts_len,
         case OFPACT_SET_FIELD:
             set_field = ofpact_get_SET_FIELD(a);
             mf = set_field->field;
-            mf_mask_field_and_prereqs(mf, &wc->masks);
 
             /* Set field action only ever overwrites packet's outermost
              * applicable header fields.  Do nothing if no header exists. */
-            if ((mf->id != MFF_VLAN_VID || flow->vlan_tci & htons(VLAN_CFI))
-                && ((mf->id != MFF_MPLS_LABEL && mf->id != MFF_MPLS_TC)
-                    || eth_type_mpls(flow->dl_type))) {
-                mf_set_flow_value(mf, &set_field->value, flow);
+            if (mf->id == MFF_VLAN_VID) {
+                wc->masks.vlan_tci |= htons(VLAN_CFI);
+                if (!(flow->vlan_tci & htons(VLAN_CFI))) {
+                    break;
+                }
+            } else if ((mf->id == MFF_MPLS_LABEL || mf->id == MFF_MPLS_TC)
+                       /* 'dl_type' is already unwildcarded. */
+                       && !eth_type_mpls(flow->dl_type)) {
+                break;
             }
+
+            mf_mask_field_and_prereqs(mf, &wc->masks);
+            mf_set_flow_value(mf, &set_field->value, flow);
             break;
 
         case OFPACT_STACK_PUSH:
@@ -2854,6 +3120,7 @@ xlate_in_init(struct xlate_in *xin, struct ofproto_dpif *ofproto,
     xin->packet = packet;
     xin->may_learn = packet != NULL;
     xin->rule = rule;
+    xin->xcache = NULL;
     xin->ofpacts = NULL;
     xin->ofpacts_len = 0;
     xin->tcp_flags = tcp_flags;
@@ -2964,9 +3231,80 @@ void
 xlate_actions(struct xlate_in *xin, struct xlate_out *xout)
     OVS_EXCLUDED(xlate_rwlock)
 {
-    ovs_rwlock_rdlock(&xlate_rwlock);
+    fat_rwlock_rdlock(&xlate_rwlock);
     xlate_actions__(xin, xout);
-    ovs_rwlock_unlock(&xlate_rwlock);
+    fat_rwlock_unlock(&xlate_rwlock);
+}
+
+/* Returns the maximum number of packets that the Linux kernel is willing to
+ * queue up internally to certain kinds of software-implemented ports, or the
+ * default (and rarely modified) value if it cannot be determined. */
+static int
+netdev_max_backlog(void)
+{
+    static struct ovsthread_once once = OVSTHREAD_ONCE_INITIALIZER;
+    static int max_backlog = 1000; /* The normal default value. */
+
+    if (ovsthread_once_start(&once)) {
+        static const char filename[] = "/proc/sys/net/core/netdev_max_backlog";
+        FILE *stream;
+        int n;
+
+        stream = fopen(filename, "r");
+        if (!stream) {
+            VLOG_WARN("%s: open failed (%s)", filename, ovs_strerror(errno));
+        } else {
+            if (fscanf(stream, "%d", &n) != 1) {
+                VLOG_WARN("%s: read error", filename);
+            } else if (n <= 100) {
+                VLOG_WARN("%s: unexpectedly small value %d", filename, n);
+            } else {
+                max_backlog = n;
+            }
+            fclose(stream);
+        }
+        ovsthread_once_done(&once);
+
+        VLOG_DBG("%s: using %d max_backlog", filename, max_backlog);
+    }
+
+    return max_backlog;
+}
+
+/* Counts and returns the number of OVS_ACTION_ATTR_OUTPUT actions in
+ * 'odp_actions'. */
+static int
+count_output_actions(const struct ofpbuf *odp_actions)
+{
+    const struct nlattr *a;
+    size_t left;
+    int n = 0;
+
+    NL_ATTR_FOR_EACH_UNSAFE (a, left, ofpbuf_data(odp_actions),
+                             ofpbuf_size(odp_actions)) {
+        if (a->nla_type == OVS_ACTION_ATTR_OUTPUT) {
+            n++;
+        }
+    }
+    return n;
+}
+
+/* Returns true if 'odp_actions' contains more output actions than the datapath
+ * can reliably handle in one go.  On Linux, this is the value of the
+ * net.core.netdev_max_backlog sysctl, which limits the maximum number of
+ * packets that the kernel is willing to queue up for processing while the
+ * datapath is processing a set of actions. */
+static bool
+too_many_output_actions(const struct ofpbuf *odp_actions)
+{
+#ifdef __linux__
+    return (ofpbuf_size(odp_actions) / NL_A_U32_SIZE > netdev_max_backlog()
+            && count_output_actions(odp_actions) > netdev_max_backlog());
+#else
+    /* OSes other than Linux might have similar limits, but we don't know how
+     * to determine them.*/
+    return false;
+#endif
 }
 
 /* Translates the 'ofpacts_len' bytes of "struct ofpacts" starting at 'ofpacts'
@@ -2982,7 +3320,7 @@ xlate_actions__(struct xlate_in *xin, struct xlate_out *xout)
     struct flow *flow = &xin->flow;
     struct rule_dpif *rule = NULL;
 
-    struct rule_actions *actions = NULL;
+    const struct rule_actions *actions = NULL;
     enum slow_path_reason special;
     const struct ofpact *ofpacts;
     struct xport *in_port;
@@ -3029,7 +3367,7 @@ xlate_actions__(struct xlate_in *xin, struct xlate_out *xout)
 
     ctx.xbridge = xbridge_lookup(xin->ofproto);
     if (!ctx.xbridge) {
-        goto out;
+        return;
     }
 
     ctx.rule = xin->rule;
@@ -3058,14 +3396,21 @@ xlate_actions__(struct xlate_in *xin, struct xlate_out *xout)
     ctx.orig_skb_priority = flow->skb_priority;
     ctx.table_id = 0;
     ctx.exit = false;
+    ctx.use_recirc = false;
 
     if (!xin->ofpacts && !ctx.rule) {
         ctx.table_id = rule_dpif_lookup(ctx.xbridge->ofproto, flow,
                                         !xin->skip_wildcards ? wc : NULL,
-                                        &rule);
+                                        &rule, ctx.xin->xcache != NULL);
         if (ctx.xin->resubmit_stats) {
             rule_dpif_credit_stats(rule, ctx.xin->resubmit_stats);
         }
+        if (ctx.xin->xcache) {
+            struct xc_entry *entry;
+
+            entry = xlate_cache_add_entry(ctx.xin->xcache, XC_RULE);
+            entry->u.rule = rule;
+        }
         ctx.rule = rule;
     }
     xout->fail_open = ctx.rule && rule_dpif_is_fail_open(ctx.rule);
@@ -3100,7 +3445,7 @@ xlate_actions__(struct xlate_in *xin, struct xlate_out *xout)
             break;
 
         case OFPC_FRAG_DROP:
-            goto out;
+            return;
 
         case OFPC_FRAG_REASM:
             OVS_NOT_REACHED();
@@ -3115,10 +3460,19 @@ xlate_actions__(struct xlate_in *xin, struct xlate_out *xout)
     }
 
     in_port = get_ofp_port(ctx.xbridge, flow->in_port.ofp_port);
-    if (in_port && in_port->is_tunnel && ctx.xin->resubmit_stats) {
-        netdev_vport_inc_rx(in_port->netdev, ctx.xin->resubmit_stats);
-        if (in_port->bfd) {
-            bfd_account_rx(in_port->bfd, ctx.xin->resubmit_stats);
+    if (in_port && in_port->is_tunnel) {
+        if (ctx.xin->resubmit_stats) {
+            netdev_vport_inc_rx(in_port->netdev, ctx.xin->resubmit_stats);
+            if (in_port->bfd) {
+                bfd_account_rx(in_port->bfd, ctx.xin->resubmit_stats);
+            }
+        }
+        if (ctx.xin->xcache) {
+            struct xc_entry *entry;
+
+            entry = xlate_cache_add_entry(ctx.xin->xcache, XC_NETDEV);
+            entry->u.dev.rx = netdev_ref(in_port->netdev);
+            entry->u.dev.bfd = bfd_ref(in_port->bfd);
         }
     }
 
@@ -3173,30 +3527,47 @@ xlate_actions__(struct xlate_in *xin, struct xlate_out *xout)
          * prevent the flow from being installed. */
         COVERAGE_INC(xlate_actions_oversize);
         ctx.xout->slow |= SLOW_ACTION;
+    } else if (too_many_output_actions(&ctx.xout->odp_actions)) {
+        COVERAGE_INC(xlate_actions_too_many_output);
+        ctx.xout->slow |= SLOW_ACTION;
     }
 
-    if (ctx.xin->resubmit_stats) {
-        mirror_update_stats(ctx.xbridge->mbridge, xout->mirrors,
-                            ctx.xin->resubmit_stats->n_packets,
-                            ctx.xin->resubmit_stats->n_bytes);
-
-        if (ctx.xbridge->netflow) {
-            const struct ofpact *ofpacts;
-            size_t ofpacts_len;
-
-            ofpacts_len = actions->ofpacts_len;
-            ofpacts = actions->ofpacts;
-            if (ofpacts_len == 0
-                || ofpacts->type != OFPACT_CONTROLLER
-                || ofpact_next(ofpacts) < ofpact_end(ofpacts, ofpacts_len)) {
-                /* Only update netflow if we don't have controller flow.  We don't
-                 * report NetFlow expiration messages for such facets because they
-                 * are just part of the control logic for the network, not real
-                 * traffic. */
+    if (mbridge_has_mirrors(ctx.xbridge->mbridge)) {
+        if (ctx.xin->resubmit_stats) {
+            mirror_update_stats(ctx.xbridge->mbridge, xout->mirrors,
+                                ctx.xin->resubmit_stats->n_packets,
+                                ctx.xin->resubmit_stats->n_bytes);
+        }
+        if (ctx.xin->xcache) {
+            struct xc_entry *entry;
+
+            entry = xlate_cache_add_entry(ctx.xin->xcache, XC_MIRROR);
+            entry->u.mirror.mbridge = mbridge_ref(ctx.xbridge->mbridge);
+            entry->u.mirror.mirrors = xout->mirrors;
+        }
+    }
+
+    if (ctx.xbridge->netflow) {
+        /* Only update netflow if we don't have controller flow.  We don't
+         * report NetFlow expiration messages for such facets because they
+         * are just part of the control logic for the network, not real
+         * traffic. */
+        if (ofpacts_len == 0
+            || ofpacts->type != OFPACT_CONTROLLER
+            || ofpact_next(ofpacts) < ofpact_end(ofpacts, ofpacts_len)) {
+            if (ctx.xin->resubmit_stats) {
                 netflow_flow_update(ctx.xbridge->netflow, flow,
                                     xout->nf_output_iface,
                                     ctx.xin->resubmit_stats);
             }
+            if (ctx.xin->xcache) {
+                struct xc_entry *entry;
+
+                entry = xlate_cache_add_entry(ctx.xin->xcache, XC_NETFLOW);
+                entry->u.nf.netflow = netflow_ref(ctx.xbridge->netflow);
+                entry->u.nf.flow = xmemdup(flow, sizeof *flow);
+                entry->u.nf.iface = xout->nf_output_iface;
+            }
         }
     }
 
@@ -3221,9 +3592,6 @@ xlate_actions__(struct xlate_in *xin, struct xlate_out *xout)
         wc->masks.tp_src &= htons(UINT8_MAX);
         wc->masks.tp_dst &= htons(UINT8_MAX);
     }
-
-out:
-    rule_dpif_unref(rule);
 }
 
 /* Sends 'packet' out 'ofport'.
@@ -3241,17 +3609,198 @@ xlate_send_packet(const struct ofport_dpif *ofport, struct ofpbuf *packet)
     flow_extract(packet, NULL, &flow);
     flow.in_port.ofp_port = OFPP_NONE;
 
-    ovs_rwlock_rdlock(&xlate_rwlock);
+    fat_rwlock_rdlock(&xlate_rwlock);
     xport = xport_lookup(ofport);
     if (!xport) {
-        ovs_rwlock_unlock(&xlate_rwlock);
+        fat_rwlock_unlock(&xlate_rwlock);
         return EINVAL;
     }
     output.port = xport->ofp_port;
     output.max_len = 0;
-    ovs_rwlock_unlock(&xlate_rwlock);
+    fat_rwlock_unlock(&xlate_rwlock);
 
     return ofproto_dpif_execute_actions(xport->xbridge->ofproto, &flow, NULL,
                                         &output.ofpact, sizeof output,
                                         packet);
 }
+
+struct xlate_cache *
+xlate_cache_new(void)
+{
+    struct xlate_cache *xcache = xmalloc(sizeof *xcache);
+
+    ofpbuf_init(&xcache->entries, 512);
+    return xcache;
+}
+
+static struct xc_entry *
+xlate_cache_add_entry(struct xlate_cache *xcache, enum xc_type type)
+{
+    struct xc_entry *entry;
+
+    entry = ofpbuf_put_zeros(&xcache->entries, sizeof *entry);
+    entry->type = type;
+
+    return entry;
+}
+
+static void
+xlate_cache_netdev(struct xc_entry *entry, const struct dpif_flow_stats *stats)
+{
+    if (entry->u.dev.tx) {
+        netdev_vport_inc_tx(entry->u.dev.tx, stats);
+    }
+    if (entry->u.dev.rx) {
+        netdev_vport_inc_rx(entry->u.dev.rx, stats);
+    }
+    if (entry->u.dev.bfd) {
+        bfd_account_rx(entry->u.dev.bfd, stats);
+    }
+}
+
+static void
+xlate_cache_normal(struct ofproto_dpif *ofproto, struct flow *flow, int vlan)
+{
+    struct xbridge *xbridge;
+    struct xbundle *xbundle;
+    struct flow_wildcards wc;
+
+    xbridge = xbridge_lookup(ofproto);
+    if (!xbridge) {
+        return;
+    }
+
+    xbundle = lookup_input_bundle(xbridge, flow->in_port.ofp_port, false,
+                                  NULL);
+    if (!xbundle) {
+        return;
+    }
+
+    update_learning_table(xbridge, flow, &wc, vlan, xbundle);
+}
+
+/* Push stats and perform side effects of flow translation. */
+void
+xlate_push_stats(struct xlate_cache *xcache, bool may_learn,
+                 const struct dpif_flow_stats *stats)
+{
+    struct xc_entry *entry;
+    struct ofpbuf entries = xcache->entries;
+
+    XC_ENTRY_FOR_EACH (entry, entries, xcache) {
+        switch (entry->type) {
+        case XC_RULE:
+            rule_dpif_credit_stats(entry->u.rule, stats);
+            break;
+        case XC_BOND:
+            bond_account(entry->u.bond.bond, entry->u.bond.flow,
+                         entry->u.bond.vid, stats->n_bytes);
+            break;
+        case XC_NETDEV:
+            xlate_cache_netdev(entry, stats);
+            break;
+        case XC_NETFLOW:
+            netflow_flow_update(entry->u.nf.netflow, entry->u.nf.flow,
+                                entry->u.nf.iface, stats);
+            break;
+        case XC_MIRROR:
+            mirror_update_stats(entry->u.mirror.mbridge,
+                                entry->u.mirror.mirrors,
+                                stats->n_packets, stats->n_bytes);
+            break;
+        case XC_LEARN:
+            if (may_learn) {
+                ofproto_dpif_flow_mod(entry->u.learn.ofproto,
+                                      entry->u.learn.fm);
+            }
+            break;
+        case XC_NORMAL:
+            xlate_cache_normal(entry->u.normal.ofproto, entry->u.normal.flow,
+                               entry->u.normal.vlan);
+            break;
+        case XC_FIN_TIMEOUT:
+            xlate_fin_timeout__(entry->u.fin.rule, stats->tcp_flags,
+                                entry->u.fin.idle, entry->u.fin.hard);
+            break;
+        default:
+            OVS_NOT_REACHED();
+        }
+    }
+}
+
+static void
+xlate_dev_unref(struct xc_entry *entry)
+{
+    if (entry->u.dev.tx) {
+        netdev_close(entry->u.dev.tx);
+    }
+    if (entry->u.dev.rx) {
+        netdev_close(entry->u.dev.rx);
+    }
+    if (entry->u.dev.bfd) {
+        bfd_unref(entry->u.dev.bfd);
+    }
+}
+
+static void
+xlate_cache_clear_netflow(struct netflow *netflow, struct flow *flow)
+{
+    netflow_flow_clear(netflow, flow);
+    netflow_unref(netflow);
+    free(flow);
+}
+
+void
+xlate_cache_clear(struct xlate_cache *xcache)
+{
+    struct xc_entry *entry;
+    struct ofpbuf entries;
+
+    if (!xcache) {
+        return;
+    }
+
+    XC_ENTRY_FOR_EACH (entry, entries, xcache) {
+        switch (entry->type) {
+        case XC_RULE:
+            rule_dpif_unref(entry->u.rule);
+            break;
+        case XC_BOND:
+            free(entry->u.bond.flow);
+            bond_unref(entry->u.bond.bond);
+            break;
+        case XC_NETDEV:
+            xlate_dev_unref(entry);
+            break;
+        case XC_NETFLOW:
+            xlate_cache_clear_netflow(entry->u.nf.netflow, entry->u.nf.flow);
+            break;
+        case XC_MIRROR:
+            mbridge_unref(entry->u.mirror.mbridge);
+            break;
+        case XC_LEARN:
+            free(entry->u.learn.fm);
+            ofpbuf_delete(entry->u.learn.ofpacts);
+            break;
+        case XC_NORMAL:
+            free(entry->u.normal.flow);
+            break;
+        case XC_FIN_TIMEOUT:
+            /* 'u.fin.rule' is always already held as a XC_RULE, which
+             * has already released it's reference above. */
+            break;
+        default:
+            OVS_NOT_REACHED();
+        }
+    }
+
+    ofpbuf_clear(&xcache->entries);
+}
+
+void
+xlate_cache_delete(struct xlate_cache *xcache)
+{
+    xlate_cache_clear(xcache);
+    ofpbuf_uninit(&xcache->entries);
+    free(xcache);
+}