tests: Add bundle action test with buffer realloc.
[cascardo/ovs.git] / ofproto / ofproto-dpif.c
index 983cad5..b963ff2 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014 Nicira, Inc.
+ * Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014, 2015, 2016 Nicira, Inc.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -29,6 +29,7 @@
 #include "connmgr.h"
 #include "coverage.h"
 #include "cfm.h"
+#include "ovs-lldp.h"
 #include "dpif.h"
 #include "dynamic-string.h"
 #include "fail-open.h"
@@ -37,6 +38,7 @@
 #include "lacp.h"
 #include "learn.h"
 #include "mac-learning.h"
+#include "mcast-snooping.h"
 #include "meta-flow.h"
 #include "multipath.h"
 #include "netdev-vport.h"
@@ -58,6 +60,8 @@
 #include "ofproto-dpif-upcall.h"
 #include "ofproto-dpif-xlate.h"
 #include "poll-loop.h"
+#include "ovs-rcu.h"
+#include "ovs-router.h"
 #include "seq.h"
 #include "simap.h"
 #include "smap.h"
 #include "unaligned.h"
 #include "unixctl.h"
 #include "vlan-bitmap.h"
-#include "vlog.h"
+#include "openvswitch/vlog.h"
 
 VLOG_DEFINE_THIS_MODULE(ofproto_dpif);
 
 COVERAGE_DEFINE(ofproto_dpif_expired);
 COVERAGE_DEFINE(packet_in_overflow);
 
-/* Number of implemented OpenFlow tables. */
-enum { N_TABLES = 255 };
-enum { TBL_INTERNAL = N_TABLES - 1 };    /* Used for internal hidden rules. */
-BUILD_ASSERT_DECL(N_TABLES >= 2 && N_TABLES <= 255);
-
 struct flow_miss;
 
 struct rule_dpif {
@@ -89,8 +88,26 @@ struct rule_dpif {
      *   recently been processed by a revalidator. */
     struct ovs_mutex stats_mutex;
     struct dpif_flow_stats stats OVS_GUARDED;
+
+   /* In non-NULL, will point to a new rule (for which a reference is held) to
+    * which all the stats updates should be forwarded. This exists only
+    * transitionally when flows are replaced.
+    *
+    * Protected by stats_mutex.  If both 'rule->stats_mutex' and
+    * 'rule->new_rule->stats_mutex' must be held together, acquire them in that
+    * order, */
+    struct rule_dpif *new_rule OVS_GUARDED;
+
+    /* If non-zero then the recirculation id that has
+     * been allocated for use with this rule.
+     * The recirculation id and associated internal flow should
+     * be freed when the rule is freed */
+    uint32_t recirc_id;
 };
 
+/* RULE_CAST() depends on this. */
+BUILD_ASSERT_DECL(offsetof(struct rule_dpif, up) == 0);
+
 static void rule_get_stats(struct rule *, uint64_t *packets, uint64_t *bytes,
                            long long int *used);
 static struct rule_dpif *rule_dpif_cast(const struct rule *);
@@ -106,7 +123,6 @@ struct group_dpif {
     struct ovs_mutex stats_mutex;
     uint64_t packet_count OVS_GUARDED;  /* Number of packets received. */
     uint64_t byte_count OVS_GUARDED;    /* Number of bytes received. */
-    struct bucket_counter *bucket_stats OVS_GUARDED;  /* Bucket statistics. */
 };
 
 struct ofbundle {
@@ -116,7 +132,7 @@ struct ofbundle {
     char *name;                 /* Identifier for log messages. */
 
     /* Configuration. */
-    struct list ports;          /* Contains "struct ofport"s. */
+    struct ovs_list ports;      /* Contains "struct ofport"s. */
     enum port_vlan_mode vlan_mode; /* VLAN mode */
     int vlan;                   /* -1=trunk port, else a 12-bit VLAN ID. */
     unsigned long *trunks;      /* Bitmap of trunked VLANs, if 'vlan' == -1.
@@ -135,21 +151,28 @@ static void bundle_destroy(struct ofbundle *);
 static void bundle_del_port(struct ofport_dpif *);
 static void bundle_run(struct ofbundle *);
 static void bundle_wait(struct ofbundle *);
+static void bundle_flush_macs(struct ofbundle *, bool);
+static void bundle_move(struct ofbundle *, struct ofbundle *);
 
 static void stp_run(struct ofproto_dpif *ofproto);
 static void stp_wait(struct ofproto_dpif *ofproto);
 static int set_stp_port(struct ofport *,
                         const struct ofproto_port_stp_settings *);
 
+static void rstp_run(struct ofproto_dpif *ofproto);
+static void set_rstp_port(struct ofport *,
+                         const struct ofproto_port_rstp_settings *);
+
 struct ofport_dpif {
     struct hmap_node odp_port_node; /* In dpif_backer's "odp_to_ofport_map". */
     struct ofport up;
 
     odp_port_t odp_port;
     struct ofbundle *bundle;    /* Bundle that contains this port, if any. */
-    struct list bundle_node;    /* In struct ofbundle's "ports" list. */
+    struct ovs_list bundle_node;/* In struct ofbundle's "ports" list. */
     struct cfm *cfm;            /* Connectivity Fault Management, if any. */
     struct bfd *bfd;            /* BFD, if any. */
+    struct lldp *lldp;          /* lldp, if any. */
     bool may_enable;            /* May be enabled in bonds. */
     bool is_tunnel;             /* This port is a tunnel. */
     bool is_layer3;             /* This is a layer 3 port. */
@@ -161,6 +184,10 @@ struct ofport_dpif {
     enum stp_state stp_state;   /* Always STP_DISABLED if STP not in use. */
     long long int stp_state_entered;
 
+    /* Rapid Spanning Tree. */
+    struct rstp_port *rstp_port; /* Rapid Spanning Tree Protocol, if any. */
+    enum rstp_state rstp_state; /* Always RSTP_DISABLED if RSTP not in use. */
+
     /* Queue to DSCP mapping. */
     struct ofproto_port_queue *qdscp;
     size_t n_qdscp;
@@ -207,13 +234,9 @@ ofport_dpif_cast(const struct ofport *ofport)
 static void port_run(struct ofport_dpif *);
 static int set_bfd(struct ofport *, const struct smap *);
 static int set_cfm(struct ofport *, const struct cfm_settings *);
+static int set_lldp(struct ofport *ofport_, const struct smap *cfg);
 static void ofport_update_peer(struct ofport_dpif *);
 
-struct dpif_completion {
-    struct list list_node;
-    struct ofoperation *op;
-};
-
 /* Reasons that we might need to revalidate every datapath flow, and
  * corresponding coverage counters.
  *
@@ -225,17 +248,21 @@ struct dpif_completion {
 enum revalidate_reason {
     REV_RECONFIGURE = 1,       /* Switch configuration changed. */
     REV_STP,                   /* Spanning tree protocol port status change. */
+    REV_RSTP,                  /* RSTP port status change. */
     REV_BOND,                  /* Bonding changed. */
     REV_PORT_TOGGLED,          /* Port enabled or disabled by CFM, LACP, ...*/
     REV_FLOW_TABLE,            /* Flow table changed. */
     REV_MAC_LEARNING,          /* Mac learning changed. */
+    REV_MCAST_SNOOPING,        /* Multicast snooping changed. */
 };
 COVERAGE_DEFINE(rev_reconfigure);
 COVERAGE_DEFINE(rev_stp);
+COVERAGE_DEFINE(rev_rstp);
 COVERAGE_DEFINE(rev_bond);
 COVERAGE_DEFINE(rev_port_toggled);
 COVERAGE_DEFINE(rev_flow_table);
 COVERAGE_DEFINE(rev_mac_learning);
+COVERAGE_DEFINE(rev_mcast_snooping);
 
 /* All datapaths of a given type share a single dpif backer instance. */
 struct dpif_backer {
@@ -253,18 +280,12 @@ struct dpif_backer {
 
     bool recv_set_enable; /* Enables or disables receiving packets. */
 
-    /* Recirculation. */
-    struct recirc_id_pool *rid_pool;       /* Recirculation ID pool. */
-    bool enable_recirc;   /* True if the datapath supports recirculation */
-
-    /* True if the datapath supports variable-length
-     * OVS_USERSPACE_ATTR_USERDATA in OVS_ACTION_ATTR_USERSPACE actions.
-     * False if the datapath supports only 8-byte (or shorter) userdata. */
-    bool variable_length_userdata;
+    /* Version string of the datapath stored in OVSDB. */
+    char *dp_version_string;
 
-    /* Maximum number of MPLS label stack entries that the datapath supports
-     * in a match */
-    size_t max_mpls_depth;
+    /* Datapath feature support. */
+    struct dpif_backer_support support;
+    struct atomic_count tnl_count;
 };
 
 /* All existing ofproto_backer instances, indexed by ofproto->up.type. */
@@ -275,12 +296,18 @@ struct ofproto_dpif {
     struct ofproto up;
     struct dpif_backer *backer;
 
+    /* Unique identifier for this instantiation of this bridge in this running
+     * process.  */
+    struct uuid uuid;
+
+    ATOMIC(cls_version_t) tables_version;  /* For classifier lookups. */
+
     uint64_t dump_seq; /* Last read of udpif_dump_seq(). */
 
     /* Special OpenFlow rules. */
     struct rule_dpif *miss_rule; /* Sends flow table misses to controller. */
     struct rule_dpif *no_packet_in_rule; /* Drops flow table misses. */
-    struct rule_dpif *drop_frags_rule; /* Used in OFPC_FRAG_DROP mode. */
+    struct rule_dpif *drop_frags_rule; /* Used in OFPUTIL_FRAG_DROP mode. */
 
     /* Bridging. */
     struct netflow *netflow;
@@ -288,6 +315,7 @@ struct ofproto_dpif {
     struct dpif_ipfix *ipfix;
     struct hmap bundles;        /* Contains "struct ofbundle"s. */
     struct mac_learning *ml;
+    struct mcast_snooping *ms;
     bool has_bonded_bundles;
     bool lacp_enabled;
     struct mbridge *mbridge;
@@ -300,6 +328,10 @@ struct ofproto_dpif {
     struct stp *stp;
     long long int stp_last_tick;
 
+    /* Rapid Spanning Tree. */
+    struct rstp *rstp;
+    long long int rstp_last_tick;
+
     /* VLAN splinters. */
     struct ovs_mutex vsp_mutex;
     struct hmap realdev_vid_map OVS_GUARDED; /* (realdev,vid) -> vlandev. */
@@ -313,13 +345,16 @@ struct ofproto_dpif {
     uint64_t change_seq;           /* Connectivity status changes. */
 
     /* Work queues. */
-    struct guarded_list pins;      /* Contains "struct ofputil_packet_in"s. */
+    struct guarded_list ams;      /* Contains "struct ofproto_async_msgs"s. */
+    struct seq *ams_seq;          /* For notifying 'ams' reception. */
+    uint64_t ams_seqno;
 };
 
 /* All existing ofproto_dpif instances, indexed by ->up.name. */
 static struct hmap all_ofproto_dpifs = HMAP_INITIALIZER(&all_ofproto_dpifs);
 
-static void ofproto_dpif_unixctl_init(void);
+static bool ofproto_use_tnl_push_pop = true;
+static void ofproto_unixctl_init(void);
 
 static inline struct ofproto_dpif *
 ofproto_dpif_cast(const struct ofproto *ofproto)
@@ -328,22 +363,20 @@ ofproto_dpif_cast(const struct ofproto *ofproto)
     return CONTAINER_OF(ofproto, struct ofproto_dpif, up);
 }
 
-size_t
-ofproto_dpif_get_max_mpls_depth(const struct ofproto_dpif *ofproto)
+bool
+ofproto_dpif_get_enable_ufid(const struct dpif_backer *backer)
 {
-    return ofproto->backer->max_mpls_depth;
+    return backer->support.ufid;
 }
 
-bool
-ofproto_dpif_get_enable_recirc(const struct ofproto_dpif *ofproto)
+struct dpif_backer_support *
+ofproto_dpif_get_support(const struct ofproto_dpif *ofproto)
 {
-    return ofproto->backer->enable_recirc;
+    return &ofproto->backer->support;
 }
 
-static struct ofport_dpif *get_ofp_port(const struct ofproto_dpif *ofproto,
-                                        ofp_port_t ofp_port);
 static void ofproto_trace(struct ofproto_dpif *, struct flow *,
-                          const struct ofpbuf *packet,
+                          const struct dp_packet *packet,
                           const struct ofpact[], size_t ofpacts_len,
                           struct ds *);
 
@@ -357,34 +390,33 @@ static struct shash init_ofp_ports = SHASH_INITIALIZER(&init_ofp_ports);
  * it. */
 void
 ofproto_dpif_flow_mod(struct ofproto_dpif *ofproto,
-                      struct ofputil_flow_mod *fm)
+                      const struct ofputil_flow_mod *fm)
 {
-    ofproto_flow_mod(&ofproto->up, fm);
-}
+    struct ofproto_flow_mod ofm;
 
-/* Resets the modified time for 'rule' or an equivalent rule. If 'rule' is not
- * in the classifier, but an equivalent rule is, unref 'rule' and ref the new
- * rule. Otherwise if 'rule' is no longer installed in the classifier,
- * reinstall it.
- *
- * Returns the rule whose modified time has been reset. */
-struct rule_dpif *
-ofproto_dpif_refresh_rule(struct rule_dpif *rule)
-{
-    return rule_dpif_cast(ofproto_refresh_rule(&rule->up));
+    /* Multiple threads may do this for the same 'fm' at the same time.
+     * Allocate ofproto_flow_mod with execution context from stack.
+     *
+     * Note: This copy could be avoided by making ofproto_flow_mod more
+     * complex, but that may not be desireable, and a learn action is not that
+     * fast to begin with. */
+    ofm.fm = *fm;
+    ofproto_flow_mod(&ofproto->up, &ofm);
 }
 
-/* Appends 'pin' to the queue of "packet ins" to be sent to the controller.
- * Takes ownership of 'pin' and pin->packet. */
+/* Appends 'am' to the queue of asynchronous messages to be sent to the
+ * controller.  Takes ownership of 'am' and any data it points to. */
 void
-ofproto_dpif_send_packet_in(struct ofproto_dpif *ofproto,
-                            struct ofproto_packet_in *pin)
+ofproto_dpif_send_async_msg(struct ofproto_dpif *ofproto,
+                            struct ofproto_async_msg *am)
 {
-    if (!guarded_list_push_back(&ofproto->pins, &pin->list_node, 1024)) {
+    if (!guarded_list_push_back(&ofproto->ams, &am->list_node, 1024)) {
         COVERAGE_INC(packet_in_overflow);
-        free(CONST_CAST(void *, pin->up.packet));
-        free(pin);
+        ofproto_async_msg_free(am);
     }
+
+    /* Wakes up main thread for packet-in I/O. */
+    seq_change(ofproto->ams_seq);
 }
 
 /* The default "table-miss" behaviour for OpenFlow1.3+ is to drop the
@@ -417,6 +449,9 @@ init(const struct shash *iface_hints)
 
         shash_add(&init_ofp_ports, node->name, new_hint);
     }
+
+    ofproto_unixctl_init();
+    udpif_init();
 }
 
 static void
@@ -483,6 +518,12 @@ lookup_ofproto_dpif_by_port_name(const char *name)
     return NULL;
 }
 
+bool
+ofproto_dpif_backer_enabled(struct dpif_backer* backer)
+{
+    return backer->recv_set_enable;
+}
+
 static int
 type_run(const char *type)
 {
@@ -495,7 +536,12 @@ type_run(const char *type)
         return 0;
     }
 
-    dpif_run(backer->dpif);
+
+    if (dpif_run(backer->dpif)) {
+        backer->need_revalidate = REV_RECONFIGURE;
+    }
+
+    udpif_run(backer->udpif);
 
     /* If vswitchd started with other_config:flow_restore_wait set as "true",
      * and the configuration has now changed to "false", enable receiving
@@ -518,6 +564,8 @@ type_run(const char *type)
         udpif_set_threads(backer->udpif, n_handlers, n_revalidators);
     }
 
+    dpif_poll_threads_set(backer->dpif, pmd_cpu_mask);
+
     if (backer->need_revalidate) {
         struct ofproto_dpif *ofproto;
         struct simap_node *node;
@@ -565,7 +613,8 @@ type_run(const char *type)
 
                 iter->odp_port = node ? u32_to_odp(node->data) : ODPP_NONE;
                 if (tnl_port_reconfigure(iter, iter->up.netdev,
-                                         iter->odp_port)) {
+                                         iter->odp_port,
+                                         ovs_native_tunneling_is_on(ofproto), dp_port)) {
                     backer->need_revalidate = REV_RECONFIGURE;
                 }
             }
@@ -577,12 +626,14 @@ type_run(const char *type)
         simap_destroy(&tmp_backers);
 
         switch (backer->need_revalidate) {
-        case REV_RECONFIGURE:   COVERAGE_INC(rev_reconfigure);   break;
-        case REV_STP:           COVERAGE_INC(rev_stp);           break;
-        case REV_BOND:          COVERAGE_INC(rev_bond);          break;
-        case REV_PORT_TOGGLED:  COVERAGE_INC(rev_port_toggled);  break;
-        case REV_FLOW_TABLE:    COVERAGE_INC(rev_flow_table);    break;
-        case REV_MAC_LEARNING:  COVERAGE_INC(rev_mac_learning);  break;
+        case REV_RECONFIGURE:    COVERAGE_INC(rev_reconfigure);    break;
+        case REV_STP:            COVERAGE_INC(rev_stp);            break;
+        case REV_RSTP:           COVERAGE_INC(rev_rstp);           break;
+        case REV_BOND:           COVERAGE_INC(rev_bond);           break;
+        case REV_PORT_TOGGLED:   COVERAGE_INC(rev_port_toggled);   break;
+        case REV_FLOW_TABLE:     COVERAGE_INC(rev_flow_table);     break;
+        case REV_MAC_LEARNING:   COVERAGE_INC(rev_mac_learning);   break;
+        case REV_MCAST_SNOOPING: COVERAGE_INC(rev_mcast_snooping); break;
         }
         backer->need_revalidate = 0;
 
@@ -594,18 +645,15 @@ type_run(const char *type)
                 continue;
             }
 
-            ovs_rwlock_wrlock(&xlate_rwlock);
+            xlate_txn_start();
             xlate_ofproto_set(ofproto, ofproto->up.name,
-                              ofproto->backer->dpif, ofproto->miss_rule,
-                              ofproto->no_packet_in_rule, ofproto->ml,
-                              ofproto->stp, ofproto->mbridge,
-                              ofproto->sflow, ofproto->ipfix,
-                              ofproto->netflow, ofproto->up.frag_handling,
+                              ofproto->backer->dpif, ofproto->ml,
+                              ofproto->stp, ofproto->rstp, ofproto->ms,
+                              ofproto->mbridge, ofproto->sflow, ofproto->ipfix,
+                              ofproto->netflow,
                               ofproto->up.forward_bpdu,
                               connmgr_has_in_band(ofproto->up.connmgr),
-                              ofproto->backer->enable_recirc,
-                              ofproto->backer->variable_length_userdata,
-                              ofproto->backer->max_mpls_depth);
+                              &ofproto->backer->support);
 
             HMAP_FOR_EACH (bundle, hmap_node, &ofproto->bundles) {
                 xlate_bundle_set(ofproto, bundle, bundle->name,
@@ -621,13 +669,14 @@ type_run(const char *type)
                     : -1;
                 xlate_ofport_set(ofproto, ofport->bundle, ofport,
                                  ofport->up.ofp_port, ofport->odp_port,
-                                 ofport->up.netdev, ofport->cfm,
-                                 ofport->bfd, ofport->peer, stp_port,
-                                 ofport->qdscp, ofport->n_qdscp,
-                                 ofport->up.pp.config, ofport->up.pp.state,
-                                 ofport->is_tunnel, ofport->may_enable);
+                                 ofport->up.netdev, ofport->cfm, ofport->bfd,
+                                 ofport->lldp, ofport->peer, stp_port,
+                                 ofport->rstp_port, ofport->qdscp,
+                                 ofport->n_qdscp, ofport->up.pp.config,
+                                 ofport->up.pp.state, ofport->is_tunnel,
+                                 ofport->may_enable);
             }
-            ovs_rwlock_unlock(&xlate_rwlock);
+            xlate_txn_commit();
         }
 
         udpif_revalidate(backer->udpif);
@@ -789,7 +838,7 @@ static int add_internal_flows(struct ofproto_dpif *);
 static struct ofproto *
 alloc(void)
 {
-    struct ofproto_dpif *ofproto = xmalloc(sizeof *ofproto);
+    struct ofproto_dpif *ofproto = xzalloc(sizeof *ofproto);
     return &ofproto->up;
 }
 
@@ -815,21 +864,20 @@ close_dpif_backer(struct dpif_backer *backer)
     ovs_rwlock_destroy(&backer->odp_to_ofport_lock);
     hmap_destroy(&backer->odp_to_ofport_map);
     shash_find_and_delete(&all_dpif_backers, backer->type);
-    recirc_id_pool_destroy(backer->rid_pool);
     free(backer->type);
+    free(backer->dp_version_string);
     dpif_close(backer->dpif);
     free(backer);
 }
 
 /* Datapath port slated for removal from datapath. */
 struct odp_garbage {
-    struct list list_node;
+    struct ovs_list list_node;
     odp_port_t odp_port;
 };
 
 static bool check_variable_length_userdata(struct dpif_backer *backer);
-static size_t check_max_mpls_depth(struct dpif_backer *backer);
-static bool check_recirc(struct dpif_backer *backer);
+static void check_support(struct dpif_backer *backer);
 
 static int
 open_dpif_backer(const char *type, struct dpif_backer **backerp)
@@ -838,14 +886,16 @@ open_dpif_backer(const char *type, struct dpif_backer **backerp)
     struct dpif_port_dump port_dump;
     struct dpif_port port;
     struct shash_node *node;
-    struct list garbage_list;
-    struct odp_garbage *garbage, *next;
+    struct ovs_list garbage_list;
+    struct odp_garbage *garbage;
 
     struct sset names;
     char *backer_name;
     const char *name;
     int error;
 
+    recirc_init();
+
     backer = shash_find_data(&all_dpif_backers, type);
     if (backer) {
         backer->refcount++;
@@ -915,14 +965,16 @@ open_dpif_backer(const char *type, struct dpif_backer **backerp)
     }
     dpif_port_dump_done(&port_dump);
 
-    LIST_FOR_EACH_SAFE (garbage, next, list_node, &garbage_list) {
+    LIST_FOR_EACH_POP (garbage, list_node, &garbage_list) {
         dpif_port_del(backer->dpif, garbage->odp_port);
-        list_remove(&garbage->list_node);
         free(garbage);
     }
 
     shash_add(&all_dpif_backers, type, backer);
 
+    check_support(backer);
+    atomic_count_init(&backer->tnl_count, 0);
+
     error = dpif_recv_set(backer->dpif, backer->recv_set_enable);
     if (error) {
         VLOG_ERR("failed to listen on datapath of type %s: %s",
@@ -930,21 +982,31 @@ open_dpif_backer(const char *type, struct dpif_backer **backerp)
         close_dpif_backer(backer);
         return error;
     }
-    backer->enable_recirc = check_recirc(backer);
-    backer->variable_length_userdata = check_variable_length_userdata(backer);
-    backer->max_mpls_depth = check_max_mpls_depth(backer);
-    backer->rid_pool = recirc_id_pool_create();
 
     if (backer->recv_set_enable) {
         udpif_set_threads(backer->udpif, n_handlers, n_revalidators);
     }
 
+    /* This check fails if performed before udpif threads have been set,
+     * as the kernel module checks that the 'pid' in userspace action
+     * is non-zero. */
+    backer->support.variable_length_userdata
+        = check_variable_length_userdata(backer);
+    backer->dp_version_string = dpif_get_dp_version(backer->dpif);
+
     return error;
 }
 
-/* Tests whether 'backer''s datapath supports recirculation Only newer datapath
- * supports OVS_KEY_ATTR in OVS_ACTION_ATTR_USERSPACE actions.  We need to
- * disable some features on older datapaths that don't support this feature.
+bool
+ovs_native_tunneling_is_on(struct ofproto_dpif *ofproto)
+{
+    return ofproto_use_tnl_push_pop && ofproto->backer->support.tnl_push_pop &&
+           atomic_count_get(&ofproto->backer->tnl_count);
+}
+
+/* Tests whether 'backer''s datapath supports recirculation.  Only newer
+ * datapaths support OVS_KEY_ATTR_RECIRC_ID in keys.  We need to disable some
+ * features on older datapaths that don't support this feature.
  *
  * Returns false if 'backer' definitely does not support recirculation, true if
  * it seems to support recirculation or if at least the error we get is
@@ -955,37 +1017,23 @@ check_recirc(struct dpif_backer *backer)
     struct flow flow;
     struct odputil_keybuf keybuf;
     struct ofpbuf key;
-    int error;
-    bool enable_recirc = false;
+    bool enable_recirc;
+    struct odp_flow_key_parms odp_parms = {
+        .flow = &flow,
+        .support = {
+            .recirc = true,
+        },
+    };
 
     memset(&flow, 0, sizeof flow);
     flow.recirc_id = 1;
     flow.dp_hash = 1;
 
     ofpbuf_use_stack(&key, &keybuf, sizeof keybuf);
-    odp_flow_key_from_flow(&key, &flow, NULL, 0);
-
-    error = dpif_flow_put(backer->dpif, DPIF_FP_CREATE | DPIF_FP_MODIFY,
-                          ofpbuf_data(&key), ofpbuf_size(&key), NULL, 0, NULL,
-                          0, NULL);
-    if (error && error != EEXIST) {
-        if (error != EINVAL) {
-            VLOG_WARN("%s: Reciculation flow probe failed (%s)",
-                      dpif_name(backer->dpif), ovs_strerror(error));
-        }
-        goto done;
-    }
+    odp_flow_key_from_flow(&odp_parms, &key);
+    enable_recirc = dpif_probe_feature(backer->dpif, "recirculation", &key,
+                                       NULL);
 
-    error = dpif_flow_del(backer->dpif, ofpbuf_data(&key), ofpbuf_size(&key),
-                          NULL);
-    if (error) {
-        VLOG_WARN("%s: failed to delete recirculation feature probe flow",
-                  dpif_name(backer->dpif));
-    }
-
-    enable_recirc = true;
-
-done:
     if (enable_recirc) {
         VLOG_INFO("%s: Datapath supports recirculation",
                   dpif_name(backer->dpif));
@@ -997,6 +1045,42 @@ done:
     return enable_recirc;
 }
 
+/* Tests whether 'dpif' supports unique flow ids. We can skip serializing
+ * some flow attributes for datapaths that support this feature.
+ *
+ * Returns true if 'dpif' supports UFID for flow operations.
+ * Returns false if  'dpif' does not support UFID. */
+static bool
+check_ufid(struct dpif_backer *backer)
+{
+    struct flow flow;
+    struct odputil_keybuf keybuf;
+    struct ofpbuf key;
+    ovs_u128 ufid;
+    bool enable_ufid;
+    struct odp_flow_key_parms odp_parms = {
+        .flow = &flow,
+    };
+
+    memset(&flow, 0, sizeof flow);
+    flow.dl_type = htons(0x1234);
+
+    ofpbuf_use_stack(&key, &keybuf, sizeof keybuf);
+    odp_flow_key_from_flow(&odp_parms, &key);
+    dpif_flow_hash(backer->dpif, key.data, key.size, &ufid);
+
+    enable_ufid = dpif_probe_feature(backer->dpif, "UFID", &key, &ufid);
+
+    if (enable_ufid) {
+        VLOG_INFO("%s: Datapath supports unique flow ids",
+                  dpif_name(backer->dpif));
+    } else {
+        VLOG_INFO("%s: Datapath does not support unique flow ids",
+                  dpif_name(backer->dpif));
+    }
+    return enable_ufid;
+}
+
 /* Tests whether 'backer''s datapath supports variable-length
  * OVS_USERSPACE_ATTR_USERDATA in OVS_ACTION_ATTR_USERSPACE actions.  We need
  * to disable some features on older datapaths that don't support this
@@ -1011,7 +1095,7 @@ check_variable_length_userdata(struct dpif_backer *backer)
     struct eth_header *eth;
     struct ofpbuf actions;
     struct dpif_execute execute;
-    struct ofpbuf packet;
+    struct dp_packet packet;
     size_t start;
     int error;
 
@@ -1030,30 +1114,26 @@ check_variable_length_userdata(struct dpif_backer *backer)
     nl_msg_end_nested(&actions, start);
 
     /* Compose a dummy ethernet packet. */
-    ofpbuf_init(&packet, ETH_HEADER_LEN);
-    eth = ofpbuf_put_zeros(&packet, ETH_HEADER_LEN);
+    dp_packet_init(&packet, ETH_HEADER_LEN);
+    eth = dp_packet_put_zeros(&packet, ETH_HEADER_LEN);
     eth->eth_type = htons(0x1234);
 
     /* Execute the actions.  On older datapaths this fails with ERANGE, on
      * newer datapaths it succeeds. */
-    execute.actions = ofpbuf_data(&actions);
-    execute.actions_len = ofpbuf_size(&actions);
+    execute.actions = actions.data;
+    execute.actions_len = actions.size;
     execute.packet = &packet;
-    execute.md = PKT_METADATA_INITIALIZER(0);
     execute.needs_help = false;
+    execute.probe = true;
+    execute.mtu = 0;
 
     error = dpif_execute(backer->dpif, &execute);
 
-    ofpbuf_uninit(&packet);
+    dp_packet_uninit(&packet);
     ofpbuf_uninit(&actions);
 
     switch (error) {
     case 0:
-        /* Variable-length userdata is supported.
-         *
-         * Purge received packets to avoid processing the nonsense packet we
-         * sent to userspace, then report success. */
-        dpif_recv_purge(backer->dpif);
         return true;
 
     case ERANGE:
@@ -1089,30 +1169,19 @@ check_max_mpls_depth(struct dpif_backer *backer)
     for (n = 0; n < FLOW_MAX_MPLS_LABELS; n++) {
         struct odputil_keybuf keybuf;
         struct ofpbuf key;
-        int error;
+        struct odp_flow_key_parms odp_parms = {
+            .flow = &flow,
+        };
 
         memset(&flow, 0, sizeof flow);
         flow.dl_type = htons(ETH_TYPE_MPLS);
         flow_set_mpls_bos(&flow, n, 1);
 
         ofpbuf_use_stack(&key, &keybuf, sizeof keybuf);
-        odp_flow_key_from_flow(&key, &flow, NULL, 0);
-
-        error = dpif_flow_put(backer->dpif, DPIF_FP_CREATE | DPIF_FP_MODIFY,
-                              ofpbuf_data(&key), ofpbuf_size(&key), NULL, 0, NULL, 0, NULL);
-        if (error && error != EEXIST) {
-            if (error != EINVAL) {
-                VLOG_WARN("%s: MPLS stack length feature probe failed (%s)",
-                          dpif_name(backer->dpif), ovs_strerror(error));
-            }
+        odp_flow_key_from_flow(&odp_parms, &key);
+        if (!dpif_probe_feature(backer->dpif, "MPLS", &key, NULL)) {
             break;
         }
-
-        error = dpif_flow_del(backer->dpif, ofpbuf_data(&key), ofpbuf_size(&key), NULL);
-        if (error) {
-            VLOG_WARN("%s: failed to delete MPLS feature probe flow",
-                      dpif_name(backer->dpif));
-        }
     }
 
     VLOG_INFO("%s: MPLS label stack length probed as %d",
@@ -1120,6 +1189,118 @@ check_max_mpls_depth(struct dpif_backer *backer)
     return n;
 }
 
+/* Tests whether 'backer''s datapath supports masked data in
+ * OVS_ACTION_ATTR_SET actions.  We need to disable some features on older
+ * datapaths that don't support this feature. */
+static bool
+check_masked_set_action(struct dpif_backer *backer)
+{
+    struct eth_header *eth;
+    struct ofpbuf actions;
+    struct dpif_execute execute;
+    struct dp_packet packet;
+    int error;
+    struct ovs_key_ethernet key, mask;
+
+    /* Compose a set action that will cause an EINVAL error on older
+     * datapaths that don't support masked set actions.
+     * Avoid using a full mask, as it could be translated to a non-masked
+     * set action instead. */
+    ofpbuf_init(&actions, 64);
+    memset(&key, 0x53, sizeof key);
+    memset(&mask, 0x7f, sizeof mask);
+    commit_masked_set_action(&actions, OVS_KEY_ATTR_ETHERNET, &key, &mask,
+                             sizeof key);
+
+    /* Compose a dummy ethernet packet. */
+    dp_packet_init(&packet, ETH_HEADER_LEN);
+    eth = dp_packet_put_zeros(&packet, ETH_HEADER_LEN);
+    eth->eth_type = htons(0x1234);
+
+    /* Execute the actions.  On older datapaths this fails with EINVAL, on
+     * newer datapaths it succeeds. */
+    execute.actions = actions.data;
+    execute.actions_len = actions.size;
+    execute.packet = &packet;
+    execute.needs_help = false;
+    execute.probe = true;
+    execute.mtu = 0;
+
+    error = dpif_execute(backer->dpif, &execute);
+
+    dp_packet_uninit(&packet);
+    ofpbuf_uninit(&actions);
+
+    if (error) {
+        /* Masked set action is not supported. */
+        VLOG_INFO("%s: datapath does not support masked set action feature.",
+                  dpif_name(backer->dpif));
+    }
+    return !error;
+}
+
+#define CHECK_FEATURE__(NAME, SUPPORT, FIELD, VALUE)                        \
+static bool                                                                 \
+check_##NAME(struct dpif_backer *backer)                                    \
+{                                                                           \
+    struct flow flow;                                                       \
+    struct odputil_keybuf keybuf;                                           \
+    struct ofpbuf key;                                                      \
+    bool enable;                                                            \
+    struct odp_flow_key_parms odp_parms = {                                 \
+        .flow = &flow,                                                      \
+        .support = {                                                        \
+            .SUPPORT = true,                                                \
+        },                                                                  \
+    };                                                                      \
+                                                                            \
+    memset(&flow, 0, sizeof flow);                                          \
+    flow.FIELD = VALUE;                                                     \
+                                                                            \
+    ofpbuf_use_stack(&key, &keybuf, sizeof keybuf);                         \
+    odp_flow_key_from_flow(&odp_parms, &key);                               \
+    enable = dpif_probe_feature(backer->dpif, #NAME, &key, NULL);           \
+                                                                            \
+    if (enable) {                                                           \
+        VLOG_INFO("%s: Datapath supports "#NAME, dpif_name(backer->dpif));  \
+    } else {                                                                \
+        VLOG_INFO("%s: Datapath does not support "#NAME,                    \
+                  dpif_name(backer->dpif));                                 \
+    }                                                                       \
+                                                                            \
+    return enable;                                                          \
+}
+#define CHECK_FEATURE(FIELD) CHECK_FEATURE__(FIELD, FIELD, FIELD, 1)
+
+CHECK_FEATURE(ct_state)
+CHECK_FEATURE(ct_zone)
+CHECK_FEATURE(ct_mark)
+CHECK_FEATURE__(ct_label, ct_label, ct_label.u64.lo, 1)
+CHECK_FEATURE__(ct_state_nat, ct_state, ct_state, CS_TRACKED|CS_SRC_NAT)
+
+#undef CHECK_FEATURE
+#undef CHECK_FEATURE__
+
+static void
+check_support(struct dpif_backer *backer)
+{
+    /* This feature needs to be tested after udpif threads are set. */
+    backer->support.variable_length_userdata = false;
+
+    backer->support.odp.recirc = check_recirc(backer);
+    backer->support.odp.max_mpls_depth = check_max_mpls_depth(backer);
+    backer->support.masked_set_action = check_masked_set_action(backer);
+    backer->support.ufid = check_ufid(backer);
+    backer->support.tnl_push_pop = dpif_supports_tnl_push_pop(backer->dpif);
+
+    backer->support.odp.ct_state = check_ct_state(backer);
+    backer->support.odp.ct_zone = check_ct_zone(backer);
+    backer->support.odp.ct_mark = check_ct_mark(backer);
+    backer->support.odp.ct_label = check_ct_label(backer);
+
+    backer->support.odp.ct_state_nat = check_ct_state_nat(backer);
+}
+
 static int
 construct(struct ofproto *ofproto_)
 {
@@ -1127,27 +1308,32 @@ construct(struct ofproto *ofproto_)
     struct shash_node *node, *next;
     int error;
 
+    /* Tunnel module can get used right after the udpif threads are running. */
+    ofproto_tunnel_init();
+
     error = open_dpif_backer(ofproto->up.type, &ofproto->backer);
     if (error) {
         return error;
     }
 
+    uuid_generate(&ofproto->uuid);
+    atomic_init(&ofproto->tables_version, CLS_MIN_VERSION);
     ofproto->netflow = NULL;
     ofproto->sflow = NULL;
     ofproto->ipfix = NULL;
     ofproto->stp = NULL;
+    ofproto->rstp = NULL;
     ofproto->dump_seq = 0;
     hmap_init(&ofproto->bundles);
     ofproto->ml = mac_learning_create(MAC_ENTRY_DEFAULT_IDLE_TIME);
+    ofproto->ms = NULL;
     ofproto->mbridge = mbridge_create();
     ofproto->has_bonded_bundles = false;
     ofproto->lacp_enabled = false;
     ovs_mutex_init_adaptive(&ofproto->stats_mutex);
     ovs_mutex_init(&ofproto->vsp_mutex);
 
-    guarded_list_init(&ofproto->pins);
-
-    ofproto_dpif_unixctl_init();
+    guarded_list_init(&ofproto->ams);
 
     hmap_init(&ofproto->vlandev_map);
     hmap_init(&ofproto->realdev_vid_map);
@@ -1157,6 +1343,9 @@ construct(struct ofproto *ofproto_)
     sset_init(&ofproto->port_poll_set);
     ofproto->port_poll_errno = 0;
     ofproto->change_seq = 0;
+    ofproto->ams_seq = seq_create();
+    ofproto->ams_seqno = seq_read(ofproto->ams_seq);
+
 
     SHASH_FOR_EACH_SAFE (node, next, &init_ofp_ports) {
         struct iface_hint *iface_hint = node->data;
@@ -1197,7 +1386,8 @@ add_internal_miss_flow(struct ofproto_dpif *ofproto, int id,
     match_init_catchall(&match);
     match_set_reg(&match, 0, id);
 
-    error = ofproto_dpif_add_internal_flow(ofproto, &match, 0, ofpacts, &rule);
+    error = ofproto_dpif_add_internal_flow(ofproto, &match, 0, 0, ofpacts,
+                                           &rule);
     *rulep = error ? NULL : rule_dpif_cast(rule);
 
     return error;
@@ -1210,7 +1400,6 @@ add_internal_flows(struct ofproto_dpif *ofproto)
     uint64_t ofpacts_stub[128 / 8];
     struct ofpbuf ofpacts;
     struct rule *unused_rulep OVS_UNUSED;
-    struct ofpact_resubmit *resubmit;
     struct match match;
     int error;
     int id;
@@ -1221,8 +1410,8 @@ add_internal_flows(struct ofproto_dpif *ofproto)
     controller = ofpact_put_CONTROLLER(&ofpacts);
     controller->max_len = UINT16_MAX;
     controller->controller_id = 0;
-    controller->reason = OFPR_NO_MATCH;
-    ofpact_pad(&ofpacts);
+    controller->reason = OFPR_IMPLICIT_MISS;
+    ofpact_finish(&ofpacts, &controller->ofpact);
 
     error = add_internal_miss_flow(ofproto, id++, &ofpacts,
                                    &ofproto->miss_rule);
@@ -1232,45 +1421,27 @@ add_internal_flows(struct ofproto_dpif *ofproto)
 
     ofpbuf_clear(&ofpacts);
     error = add_internal_miss_flow(ofproto, id++, &ofpacts,
-                              &ofproto->no_packet_in_rule);
+                                   &ofproto->no_packet_in_rule);
     if (error) {
         return error;
     }
 
     error = add_internal_miss_flow(ofproto, id++, &ofpacts,
-                              &ofproto->drop_frags_rule);
-    if (error) {
-        return error;
-    }
-
-    /* Continue non-recirculation rule lookups from table 0.
-     *
-     * (priority=2), recirc=0, actions=resubmit(, 0)
-     */
-    resubmit = ofpact_put_RESUBMIT(&ofpacts);
-    resubmit->ofpact.compat = 0;
-    resubmit->in_port = OFPP_IN_PORT;
-    resubmit->table_id = 0;
-
-    match_init_catchall(&match);
-    match_set_recirc_id(&match, 0);
-
-    error = ofproto_dpif_add_internal_flow(ofproto, &match, 2,  &ofpacts,
-                                           &unused_rulep);
+                                   &ofproto->drop_frags_rule);
     if (error) {
         return error;
     }
 
-    /* Drop any run away recirc rule lookups. Recirc_id has to be
-     * non-zero when reaching this rule.
+    /* Drop any run away non-recirc rule lookups. Recirc_id has to be
+     * zero when reaching this rule.
      *
-     * (priority=1), *, actions=drop
+     * (priority=2), recirc_id=0, actions=drop
      */
     ofpbuf_clear(&ofpacts);
     match_init_catchall(&match);
-    error = ofproto_dpif_add_internal_flow(ofproto, &match, 1,  &ofpacts,
+    match_set_recirc_id(&match, 0);
+    error = ofproto_dpif_add_internal_flow(ofproto, &match, 2, 0, &ofpacts,
                                            &unused_rulep);
-
     return error;
 }
 
@@ -1278,15 +1449,15 @@ static void
 destruct(struct ofproto *ofproto_)
 {
     struct ofproto_dpif *ofproto = ofproto_dpif_cast(ofproto_);
-    struct rule_dpif *rule, *next_rule;
-    struct ofproto_packet_in *pin, *next_pin;
+    struct ofproto_async_msg *am;
+    struct rule_dpif *rule;
     struct oftable *table;
-    struct list pins;
+    struct ovs_list ams;
 
     ofproto->backer->need_revalidate = REV_RECONFIGURE;
-    ovs_rwlock_wrlock(&xlate_rwlock);
+    xlate_txn_start();
     xlate_remove_ofproto(ofproto);
-    ovs_rwlock_unlock(&xlate_rwlock);
+    xlate_txn_commit();
 
     /* Ensure that the upcall processing threads have no remaining references
      * to the ofproto or anything in it. */
@@ -1295,30 +1466,28 @@ destruct(struct ofproto *ofproto_)
     hmap_remove(&all_ofproto_dpifs, &ofproto->all_ofproto_dpifs_node);
 
     OFPROTO_FOR_EACH_TABLE (table, &ofproto->up) {
-        struct cls_cursor cursor;
-
-        fat_rwlock_rdlock(&table->cls.rwlock);
-        cls_cursor_init(&cursor, &table->cls, NULL);
-        fat_rwlock_unlock(&table->cls.rwlock);
-        CLS_CURSOR_FOR_EACH_SAFE (rule, next_rule, up.cr, &cursor) {
+        CLS_FOR_EACH (rule, up.cr, &table->cls) {
             ofproto_rule_delete(&ofproto->up, &rule->up);
         }
     }
+    ofproto_group_delete_all(&ofproto->up);
 
-    guarded_list_pop_all(&ofproto->pins, &pins);
-    LIST_FOR_EACH_SAFE (pin, next_pin, list_node, &pins) {
-        list_remove(&pin->list_node);
-        free(CONST_CAST(void *, pin->up.packet));
-        free(pin);
+    guarded_list_pop_all(&ofproto->ams, &ams);
+    LIST_FOR_EACH_POP (am, list_node, &ams) {
+        ofproto_async_msg_free(am);
     }
-    guarded_list_destroy(&ofproto->pins);
+    guarded_list_destroy(&ofproto->ams);
+
+    recirc_free_ofproto(ofproto, ofproto->up.name);
 
     mbridge_unref(ofproto->mbridge);
 
     netflow_unref(ofproto->netflow);
     dpif_sflow_unref(ofproto->sflow);
+    dpif_ipfix_unref(ofproto->ipfix);
     hmap_destroy(&ofproto->bundles);
     mac_learning_unref(ofproto->ml);
+    mcast_snooping_unref(ofproto->ms);
 
     hmap_destroy(&ofproto->vlandev_map);
     hmap_destroy(&ofproto->realdev_vid_map);
@@ -1330,6 +1499,8 @@ destruct(struct ofproto *ofproto_)
     ovs_mutex_destroy(&ofproto->stats_mutex);
     ovs_mutex_destroy(&ofproto->vsp_mutex);
 
+    seq_destroy(ofproto->ams_seq);
+
     close_dpif_backer(ofproto->backer);
 }
 
@@ -1338,27 +1509,31 @@ run(struct ofproto *ofproto_)
 {
     struct ofproto_dpif *ofproto = ofproto_dpif_cast(ofproto_);
     uint64_t new_seq, new_dump_seq;
-    const bool enable_recirc = ofproto_dpif_get_enable_recirc(ofproto);
 
     if (mbridge_need_revalidate(ofproto->mbridge)) {
         ofproto->backer->need_revalidate = REV_RECONFIGURE;
         ovs_rwlock_wrlock(&ofproto->ml->rwlock);
         mac_learning_flush(ofproto->ml);
         ovs_rwlock_unlock(&ofproto->ml->rwlock);
+        mcast_snooping_mdb_flush(ofproto->ms);
     }
 
+    /* Always updates the ofproto->ams_seqno to avoid frequent wakeup during
+     * flow restore.  Even though nothing is processed during flow restore,
+     * all queued 'ams' will be handled immediately when flow restore
+     * completes. */
+    ofproto->ams_seqno = seq_read(ofproto->ams_seq);
+
     /* Do not perform any periodic activity required by 'ofproto' while
      * waiting for flow restore to complete. */
     if (!ofproto_get_flow_restore_wait()) {
-        struct ofproto_packet_in *pin, *next_pin;
-        struct list pins;
+        struct ofproto_async_msg *am;
+        struct ovs_list ams;
 
-        guarded_list_pop_all(&ofproto->pins, &pins);
-        LIST_FOR_EACH_SAFE (pin, next_pin, list_node, &pins) {
-            connmgr_send_packet_in(ofproto->up.connmgr, pin);
-            list_remove(&pin->list_node);
-            free(CONST_CAST(void *, pin->up.packet));
-            free(pin);
+        guarded_list_pop_all(&ofproto->ams, &ams);
+        LIST_FOR_EACH_POP (am, list_node, &ams) {
+            connmgr_send_async_msg(ofproto->up.connmgr, am);
+            ofproto_async_msg_free(am);
         }
     }
 
@@ -1391,12 +1566,17 @@ run(struct ofproto *ofproto_)
     }
 
     stp_run(ofproto);
+    rstp_run(ofproto);
     ovs_rwlock_wrlock(&ofproto->ml->rwlock);
     if (mac_learning_run(ofproto->ml)) {
         ofproto->backer->need_revalidate = REV_MAC_LEARNING;
     }
     ovs_rwlock_unlock(&ofproto->ml->rwlock);
 
+    if (mcast_snooping_run(ofproto->ms)) {
+        ofproto->backer->need_revalidate = REV_MCAST_SNOOPING;
+    }
+
     new_dump_seq = seq_read(udpif_dump_seq(ofproto->backer->udpif));
     if (ofproto->dump_seq != new_dump_seq) {
         struct rule *rule, *next_rule;
@@ -1416,22 +1596,16 @@ run(struct ofproto *ofproto_)
 
         /* All outstanding data in existing flows has been accounted, so it's a
          * good time to do bond rebalancing. */
-        if (enable_recirc && ofproto->has_bonded_bundles) {
+        if (ofproto->has_bonded_bundles) {
             struct ofbundle *bundle;
 
             HMAP_FOR_EACH (bundle, hmap_node, &ofproto->bundles) {
-                struct bond *bond = bundle->bond;
-
-                if (bond && bond_may_recirc(bond, NULL, NULL)) {
-                    bond_recirculation_account(bond);
-                    if (bond_rebalance(bundle->bond)) {
-                        bond_update_post_recirc_rules(bond, true);
-                    }
+                if (bundle->bond) {
+                    bond_rebalance(bundle->bond);
                 }
             }
         }
     }
-
     return 0;
 }
 
@@ -1463,14 +1637,14 @@ wait(struct ofproto *ofproto_)
     ovs_rwlock_rdlock(&ofproto->ml->rwlock);
     mac_learning_wait(ofproto->ml);
     ovs_rwlock_unlock(&ofproto->ml->rwlock);
+    mcast_snooping_wait(ofproto->ms);
     stp_wait(ofproto);
     if (ofproto->backer->need_revalidate) {
-        /* Shouldn't happen, but if it does just go around again. */
-        VLOG_DBG_RL(&rl, "need revalidate in ofproto_wait_cb()");
         poll_immediate_wake();
     }
 
     seq_wait(udpif_dump_seq(ofproto->backer->udpif), ofproto->dump_seq);
+    seq_wait(ofproto->ams_seq, ofproto->ams_seqno);
 }
 
 static void
@@ -1496,50 +1670,40 @@ flush(struct ofproto *ofproto_)
 }
 
 static void
-get_features(struct ofproto *ofproto_ OVS_UNUSED,
-             bool *arp_match_ip, enum ofputil_action_bitmap *actions)
-{
-    *arp_match_ip = true;
-    *actions = (OFPUTIL_A_OUTPUT |
-                OFPUTIL_A_SET_VLAN_VID |
-                OFPUTIL_A_SET_VLAN_PCP |
-                OFPUTIL_A_STRIP_VLAN |
-                OFPUTIL_A_SET_DL_SRC |
-                OFPUTIL_A_SET_DL_DST |
-                OFPUTIL_A_SET_NW_SRC |
-                OFPUTIL_A_SET_NW_DST |
-                OFPUTIL_A_SET_NW_TOS |
-                OFPUTIL_A_SET_TP_SRC |
-                OFPUTIL_A_SET_TP_DST |
-                OFPUTIL_A_ENQUEUE);
+query_tables(struct ofproto *ofproto,
+             struct ofputil_table_features *features,
+             struct ofputil_table_stats *stats)
+{
+    strcpy(features->name, "classifier");
+
+    if (stats) {
+        int i;
+
+        for (i = 0; i < ofproto->n_tables; i++) {
+            unsigned long missed, matched;
+
+            atomic_read_relaxed(&ofproto->tables[i].n_matched, &matched);
+            atomic_read_relaxed(&ofproto->tables[i].n_missed, &missed);
+
+            stats[i].matched_count = matched;
+            stats[i].lookup_count = matched + missed;
+        }
+    }
 }
 
 static void
-get_tables(struct ofproto *ofproto_, struct ofp12_table_stats *ots)
+set_tables_version(struct ofproto *ofproto_, cls_version_t version)
 {
     struct ofproto_dpif *ofproto = ofproto_dpif_cast(ofproto_);
-    struct dpif_dp_stats s;
-    uint64_t n_miss, n_no_pkt_in, n_bytes, n_dropped_frags;
-    uint64_t n_lookup;
-    long long int used;
 
-    strcpy(ots->name, "classifier");
-
-    dpif_get_dp_stats(ofproto->backer->dpif, &s);
-    rule_get_stats(&ofproto->miss_rule->up, &n_miss, &n_bytes, &used);
-    rule_get_stats(&ofproto->no_packet_in_rule->up, &n_no_pkt_in, &n_bytes,
-                   &used);
-    rule_get_stats(&ofproto->drop_frags_rule->up, &n_dropped_frags, &n_bytes,
-                   &used);
-    n_lookup = s.n_hit + s.n_missed - n_dropped_frags;
-    ots->lookup_count = htonll(n_lookup);
-    ots->matched_count = htonll(n_lookup - n_miss - n_no_pkt_in);
+    atomic_store_relaxed(&ofproto->tables_version, version);
 }
 
+
 static struct ofport *
 port_alloc(void)
 {
-    struct ofport_dpif *port = xmalloc(sizeof *port);
+    struct ofport_dpif *port = xzalloc(sizeof *port);
     return &port->up;
 }
 
@@ -1557,6 +1721,7 @@ port_construct(struct ofport *port_)
     struct ofproto_dpif *ofproto = ofproto_dpif_cast(port->up.ofproto);
     const struct netdev *netdev = port->up.netdev;
     char namebuf[NETDEV_VPORT_NAME_BUFSIZE];
+    const char *dp_port_name;
     struct dpif_port dpif_port;
     int error;
 
@@ -1564,9 +1729,12 @@ port_construct(struct ofport *port_)
     port->bundle = NULL;
     port->cfm = NULL;
     port->bfd = NULL;
-    port->may_enable = true;
+    port->lldp = NULL;
+    port->may_enable = false;
     port->stp_port = NULL;
     port->stp_state = STP_DISABLED;
+    port->rstp_port = NULL;
+    port->rstp_state = RSTP_DISABLED;
     port->is_tunnel = false;
     port->peer = NULL;
     port->qdscp = NULL;
@@ -1578,18 +1746,17 @@ port_construct(struct ofport *port_)
 
     if (netdev_vport_is_patch(netdev)) {
         /* By bailing out here, we don't submit the port to the sFlow module
-        * to be considered for counter polling export.  This is correct
-        * because the patch port represents an interface that sFlow considers
-        * to be "internal" to the switch as a whole, and therefore not an
-        * candidate for counter polling. */
+         * to be considered for counter polling export.  This is correct
+         * because the patch port represents an interface that sFlow considers
+         * to be "internal" to the switch as a whole, and therefore not a
+         * candidate for counter polling. */
         port->odp_port = ODPP_NONE;
         ofport_update_peer(port);
         return 0;
     }
 
-    error = dpif_port_query_by_name(ofproto->backer->dpif,
-                                    netdev_vport_get_dpif_port(netdev, namebuf,
-                                                               sizeof namebuf),
+    dp_port_name = netdev_vport_get_dpif_port(netdev, namebuf, sizeof namebuf);
+    error = dpif_port_query_by_name(ofproto->backer->dpif, dp_port_name,
                                     &dpif_port);
     if (error) {
         return error;
@@ -1598,8 +1765,19 @@ port_construct(struct ofport *port_)
     port->odp_port = dpif_port.port_no;
 
     if (netdev_get_tunnel_config(netdev)) {
-        tnl_port_add(port, port->up.netdev, port->odp_port);
+        atomic_count_inc(&ofproto->backer->tnl_count);
+        error = tnl_port_add(port, port->up.netdev, port->odp_port,
+                             ovs_native_tunneling_is_on(ofproto), dp_port_name);
+        if (error) {
+            atomic_count_dec(&ofproto->backer->tnl_count);
+            dpif_port_destroy(&dpif_port);
+            return error;
+        }
+
         port->is_tunnel = true;
+        if (ofproto->ipfix) {
+           dpif_ipfix_add_tunnel_port(ofproto->ipfix, port_, port->odp_port);
+        }
     } else {
         /* Sanity-check that a mapping doesn't already exist.  This
          * shouldn't happen for non-tunnel ports. */
@@ -1625,7 +1803,7 @@ port_construct(struct ofport *port_)
 }
 
 static void
-port_destruct(struct ofport *port_)
+port_destruct(struct ofport *port_, bool del)
 {
     struct ofport_dpif *port = ofport_dpif_cast(port_);
     struct ofproto_dpif *ofproto = ofproto_dpif_cast(port->up.ofproto);
@@ -1634,13 +1812,13 @@ port_destruct(struct ofport *port_)
     const char *dp_port_name;
 
     ofproto->backer->need_revalidate = REV_RECONFIGURE;
-    ovs_rwlock_wrlock(&xlate_rwlock);
+    xlate_txn_start();
     xlate_ofport_remove(port);
-    ovs_rwlock_unlock(&xlate_rwlock);
+    xlate_txn_commit();
 
     dp_port_name = netdev_vport_get_dpif_port(port->up.netdev, namebuf,
                                               sizeof namebuf);
-    if (dpif_port_exists(ofproto->backer->dpif, dp_port_name)) {
+    if (del && dpif_port_exists(ofproto->backer->dpif, dp_port_name)) {
         /* The underlying device is still there, so delete it.  This
          * happens when the ofproto is being destroyed, since the caller
          * assumes that removal of attached ports will happen as part of
@@ -1661,15 +1839,25 @@ port_destruct(struct ofport *port_)
         ovs_rwlock_unlock(&ofproto->backer->odp_to_ofport_lock);
     }
 
+    if (port->is_tunnel) {
+        atomic_count_dec(&ofproto->backer->tnl_count);
+    }
+
+    if (port->is_tunnel && ofproto->ipfix) {
+       dpif_ipfix_del_tunnel_port(ofproto->ipfix, port->odp_port);
+    }
+
     tnl_port_del(port);
     sset_find_and_delete(&ofproto->ports, devname);
     sset_find_and_delete(&ofproto->ghost_ports, devname);
     bundle_remove(port_);
     set_cfm(port_, NULL);
     set_bfd(port_, NULL);
+    set_lldp(port_, NULL);
     if (port->stp_port) {
         stp_port_disable(port->stp_port);
     }
+    set_rstp_port(port_, NULL);
     if (ofproto->sflow) {
         dpif_sflow_del_port(ofproto->sflow, port->odp_port);
     }
@@ -1681,26 +1869,35 @@ static void
 port_modified(struct ofport *port_)
 {
     struct ofport_dpif *port = ofport_dpif_cast(port_);
+    char namebuf[NETDEV_VPORT_NAME_BUFSIZE];
+    const char *dp_port_name;
+    struct netdev *netdev = port->up.netdev;
 
     if (port->bundle && port->bundle->bond) {
-        bond_slave_set_netdev(port->bundle->bond, port, port->up.netdev);
+        bond_slave_set_netdev(port->bundle->bond, port, netdev);
     }
 
     if (port->cfm) {
-        cfm_set_netdev(port->cfm, port->up.netdev);
+        cfm_set_netdev(port->cfm, netdev);
     }
 
     if (port->bfd) {
-        bfd_set_netdev(port->bfd, port->up.netdev);
+        bfd_set_netdev(port->bfd, netdev);
     }
 
     ofproto_dpif_monitor_port_update(port, port->bfd, port->cfm,
-                                     port->up.pp.hw_addr);
+                                     port->lldp, &port->up.pp.hw_addr);
+
+    dp_port_name = netdev_vport_get_dpif_port(netdev, namebuf, sizeof namebuf);
+
+    if (port->is_tunnel) {
+        struct ofproto_dpif *ofproto = ofproto_dpif_cast(port->up.ofproto);
 
-    if (port->is_tunnel && tnl_port_reconfigure(port, port->up.netdev,
-                                                port->odp_port)) {
-        ofproto_dpif_cast(port->up.ofproto)->backer->need_revalidate =
-            REV_RECONFIGURE;
+        if (tnl_port_reconfigure(port, netdev, port->odp_port,
+                                 ovs_native_tunneling_is_on(ofproto),
+                                 dp_port_name)) {
+            ofproto->backer->need_revalidate = REV_RECONFIGURE;
+        }
     }
 
     ofport_update_peer(port);
@@ -1732,6 +1929,7 @@ set_sflow(struct ofproto *ofproto_,
     struct dpif_sflow *ds = ofproto->sflow;
 
     if (sflow_options) {
+        uint32_t old_probability = ds ? dpif_sflow_get_probability(ds) : 0;
         if (!ds) {
             struct ofport_dpif *ofport;
 
@@ -1739,9 +1937,11 @@ set_sflow(struct ofproto *ofproto_,
             HMAP_FOR_EACH (ofport, up.hmap_node, &ofproto->up.ports) {
                 dpif_sflow_add_port(ds, &ofport->up, ofport->odp_port);
             }
-            ofproto->backer->need_revalidate = REV_RECONFIGURE;
         }
         dpif_sflow_set_options(ds, sflow_options);
+        if (dpif_sflow_get_probability(ds) != old_probability) {
+            ofproto->backer->need_revalidate = REV_RECONFIGURE;
+        }
     } else {
         if (ds) {
             dpif_sflow_unref(ds);
@@ -1762,9 +1962,11 @@ set_ipfix(
     struct ofproto_dpif *ofproto = ofproto_dpif_cast(ofproto_);
     struct dpif_ipfix *di = ofproto->ipfix;
     bool has_options = bridge_exporter_options || flow_exporters_options;
+    bool new_di = false;
 
     if (has_options && !di) {
         di = ofproto->ipfix = dpif_ipfix_create();
+        new_di = true;
     }
 
     if (di) {
@@ -1774,6 +1976,16 @@ set_ipfix(
             di, bridge_exporter_options, flow_exporters_options,
             n_flow_exporters_options);
 
+        /* Add tunnel ports only when a new ipfix created */
+        if (new_di == true) {
+            struct ofport_dpif *ofport;
+            HMAP_FOR_EACH (ofport, up.hmap_node, &ofproto->up.ports) {
+                if (ofport->is_tunnel == true) {
+                    dpif_ipfix_add_tunnel_port(di, &ofport->up, ofport->odp_port);
+                }
+            }
+        }
+
         if (!has_options) {
             dpif_ipfix_unref(di);
             ofproto->ipfix = NULL;
@@ -1787,14 +1999,12 @@ static int
 set_cfm(struct ofport *ofport_, const struct cfm_settings *s)
 {
     struct ofport_dpif *ofport = ofport_dpif_cast(ofport_);
+    struct ofproto_dpif *ofproto = ofproto_dpif_cast(ofport->up.ofproto);
+    struct cfm *old = ofport->cfm;
     int error = 0;
 
     if (s) {
         if (!ofport->cfm) {
-            struct ofproto_dpif *ofproto;
-
-            ofproto = ofproto_dpif_cast(ofport->up.ofproto);
-            ofproto->backer->need_revalidate = REV_RECONFIGURE;
             ofport->cfm = cfm_create(ofport->up.netdev);
         }
 
@@ -1808,27 +2018,36 @@ set_cfm(struct ofport *ofport_, const struct cfm_settings *s)
     cfm_unref(ofport->cfm);
     ofport->cfm = NULL;
 out:
+    if (ofport->cfm != old) {
+        ofproto->backer->need_revalidate = REV_RECONFIGURE;
+    }
     ofproto_dpif_monitor_port_update(ofport, ofport->bfd, ofport->cfm,
-                                     ofport->up.pp.hw_addr);
+                                     ofport->lldp, &ofport->up.pp.hw_addr);
     return error;
 }
 
 static bool
+cfm_status_changed(struct ofport *ofport_)
+{
+    struct ofport_dpif *ofport = ofport_dpif_cast(ofport_);
+
+    return ofport->cfm ? cfm_check_status_change(ofport->cfm) : true;
+}
+
+static int
 get_cfm_status(const struct ofport *ofport_,
-               struct ofproto_cfm_status *status)
+               struct cfm_status *status)
 {
     struct ofport_dpif *ofport = ofport_dpif_cast(ofport_);
+    int ret = 0;
 
     if (ofport->cfm) {
-        status->faults = cfm_get_fault(ofport->cfm);
-        status->flap_count = cfm_get_flap_count(ofport->cfm);
-        status->remote_opstate = cfm_get_opup(ofport->cfm);
-        status->health = cfm_get_health(ofport->cfm);
-        cfm_get_remote_mpids(ofport->cfm, &status->rmps, &status->n_rmps);
-        return true;
+        cfm_get_status(ofport->cfm, status);
     } else {
-        return false;
+        ret = ENOENT;
     }
+
+    return ret;
 }
 
 static int
@@ -1845,27 +2064,131 @@ set_bfd(struct ofport *ofport_, const struct smap *cfg)
         ofproto->backer->need_revalidate = REV_RECONFIGURE;
     }
     ofproto_dpif_monitor_port_update(ofport, ofport->bfd, ofport->cfm,
-                                     ofport->up.pp.hw_addr);
+                                     ofport->lldp, &ofport->up.pp.hw_addr);
     return 0;
 }
 
+static bool
+bfd_status_changed(struct ofport *ofport_)
+{
+    struct ofport_dpif *ofport = ofport_dpif_cast(ofport_);
+
+    return ofport->bfd ? bfd_check_status_change(ofport->bfd) : true;
+}
+
 static int
 get_bfd_status(struct ofport *ofport_, struct smap *smap)
 {
     struct ofport_dpif *ofport = ofport_dpif_cast(ofport_);
+    int ret = 0;
 
     if (ofport->bfd) {
         bfd_get_status(ofport->bfd, smap);
-        return 0;
     } else {
-        return ENOENT;
+        ret = ENOENT;
+    }
+
+    return ret;
+}
+
+static int
+set_lldp(struct ofport *ofport_,
+         const struct smap *cfg)
+{
+    struct ofport_dpif *ofport = ofport_dpif_cast(ofport_);
+    int error = 0;
+
+    if (cfg) {
+        if (!ofport->lldp) {
+            struct ofproto_dpif *ofproto;
+
+            ofproto = ofproto_dpif_cast(ofport->up.ofproto);
+            ofproto->backer->need_revalidate = REV_RECONFIGURE;
+            ofport->lldp = lldp_create(ofport->up.netdev, ofport_->mtu, cfg);
+        }
+
+        if (!lldp_configure(ofport->lldp, cfg)) {
+            error = EINVAL;
+        }
     }
+    if (error) {
+        lldp_unref(ofport->lldp);
+        ofport->lldp = NULL;
+    }
+
+    ofproto_dpif_monitor_port_update(ofport,
+                                     ofport->bfd,
+                                     ofport->cfm,
+                                     ofport->lldp,
+                                     &ofport->up.pp.hw_addr);
+    return error;
+}
+
+static bool
+get_lldp_status(const struct ofport *ofport_,
+               struct lldp_status *status OVS_UNUSED)
+{
+    struct ofport_dpif *ofport = ofport_dpif_cast(ofport_);
+
+    return ofport->lldp ? true : false;
+}
+
+static int
+set_aa(struct ofproto *ofproto OVS_UNUSED,
+       const struct aa_settings *s)
+{
+    return aa_configure(s);
+}
+
+static int
+aa_mapping_set(struct ofproto *ofproto_ OVS_UNUSED, void *aux,
+               const struct aa_mapping_settings *s)
+{
+    return aa_mapping_register(aux, s);
+}
+
+static int
+aa_mapping_unset(struct ofproto *ofproto OVS_UNUSED, void *aux)
+{
+    return aa_mapping_unregister(aux);
+}
+
+static int
+aa_vlan_get_queued(struct ofproto *ofproto OVS_UNUSED, struct ovs_list *list)
+{
+    return aa_get_vlan_queued(list);
+}
+
+static unsigned int
+aa_vlan_get_queue_size(struct ofproto *ofproto OVS_UNUSED)
+{
+    return aa_get_vlan_queue_size();
 }
+
 \f
 /* Spanning Tree. */
 
+/* Called while rstp_mutex is held. */
+static void
+rstp_send_bpdu_cb(struct dp_packet *pkt, void *ofport_, void *ofproto_)
+{
+    struct ofproto_dpif *ofproto = ofproto_;
+    struct ofport_dpif *ofport = ofport_;
+    struct eth_header *eth = dp_packet_l2(pkt);
+
+    netdev_get_etheraddr(ofport->up.netdev, &eth->eth_src);
+    if (eth_addr_is_zero(eth->eth_src)) {
+        VLOG_WARN_RL(&rl, "%s port %d: cannot send RSTP BPDU on a port which "
+                     "does not have a configured source MAC address.",
+                     ofproto->up.name, ofp_to_u16(ofport->up.ofp_port));
+    } else {
+        ofproto_dpif_send_packet(ofport, pkt);
+    }
+    dp_packet_delete(pkt);
+}
+
 static void
-send_bpdu_cb(struct ofpbuf *pkt, int port_num, void *ofproto_)
+send_bpdu_cb(struct dp_packet *pkt, int port_num, void *ofproto_)
 {
     struct ofproto_dpif *ofproto = ofproto_;
     struct stp_port *sp = stp_get_port(ofproto->stp, port_num);
@@ -1876,9 +2199,9 @@ send_bpdu_cb(struct ofpbuf *pkt, int port_num, void *ofproto_)
         VLOG_WARN_RL(&rl, "%s: cannot send BPDU on unknown port %d",
                      ofproto->up.name, port_num);
     } else {
-        struct eth_header *eth = ofpbuf_l2(pkt);
+        struct eth_header *eth = dp_packet_l2(pkt);
 
-        netdev_get_etheraddr(ofport->up.netdev, eth->eth_src);
+        netdev_get_etheraddr(ofport->up.netdev, &eth->eth_src);
         if (eth_addr_is_zero(eth->eth_src)) {
             VLOG_WARN_RL(&rl, "%s: cannot send BPDU on port %d "
                          "with unknown MAC", ofproto->up.name, port_num);
@@ -1886,25 +2209,172 @@ send_bpdu_cb(struct ofpbuf *pkt, int port_num, void *ofproto_)
             ofproto_dpif_send_packet(ofport, pkt);
         }
     }
-    ofpbuf_delete(pkt);
+    dp_packet_delete(pkt);
 }
 
-/* ConfigureSTP on 'ofproto_' using the settings defined in 's'. */
-static int
-set_stp(struct ofproto *ofproto_, const struct ofproto_stp_settings *s)
+/* Configure RSTP on 'ofproto_' using the settings defined in 's'. */
+static void
+set_rstp(struct ofproto *ofproto_, const struct ofproto_rstp_settings *s)
 {
     struct ofproto_dpif *ofproto = ofproto_dpif_cast(ofproto_);
 
     /* Only revalidate flows if the configuration changed. */
-    if (!s != !ofproto->stp) {
+    if (!s != !ofproto->rstp) {
         ofproto->backer->need_revalidate = REV_RECONFIGURE;
     }
 
     if (s) {
-        if (!ofproto->stp) {
-            ofproto->stp = stp_create(ofproto_->name, s->system_id,
-                                      send_bpdu_cb, ofproto);
-            ofproto->stp_last_tick = time_msec();
+        if (!ofproto->rstp) {
+            ofproto->rstp = rstp_create(ofproto_->name, s->address,
+                                        rstp_send_bpdu_cb, ofproto);
+            ofproto->rstp_last_tick = time_msec();
+        }
+        rstp_set_bridge_address(ofproto->rstp, s->address);
+        rstp_set_bridge_priority(ofproto->rstp, s->priority);
+        rstp_set_bridge_ageing_time(ofproto->rstp, s->ageing_time);
+        rstp_set_bridge_force_protocol_version(ofproto->rstp,
+                                               s->force_protocol_version);
+        rstp_set_bridge_max_age(ofproto->rstp, s->bridge_max_age);
+        rstp_set_bridge_forward_delay(ofproto->rstp, s->bridge_forward_delay);
+        rstp_set_bridge_transmit_hold_count(ofproto->rstp,
+                                            s->transmit_hold_count);
+    } else {
+        struct ofport *ofport;
+        HMAP_FOR_EACH (ofport, hmap_node, &ofproto->up.ports) {
+            set_rstp_port(ofport, NULL);
+        }
+        rstp_unref(ofproto->rstp);
+        ofproto->rstp = NULL;
+    }
+}
+
+static void
+get_rstp_status(struct ofproto *ofproto_, struct ofproto_rstp_status *s)
+{
+    struct ofproto_dpif *ofproto = ofproto_dpif_cast(ofproto_);
+
+    if (ofproto->rstp) {
+        s->enabled = true;
+        s->root_id = rstp_get_root_id(ofproto->rstp);
+        s->bridge_id = rstp_get_bridge_id(ofproto->rstp);
+        s->designated_id = rstp_get_designated_id(ofproto->rstp);
+        s->root_path_cost = rstp_get_root_path_cost(ofproto->rstp);
+        s->designated_port_id = rstp_get_designated_port_id(ofproto->rstp);
+        s->bridge_port_id = rstp_get_bridge_port_id(ofproto->rstp);
+    } else {
+        s->enabled = false;
+    }
+}
+
+static void
+update_rstp_port_state(struct ofport_dpif *ofport)
+{
+    struct ofproto_dpif *ofproto = ofproto_dpif_cast(ofport->up.ofproto);
+    enum rstp_state state;
+
+    /* Figure out new state. */
+    state = ofport->rstp_port ? rstp_port_get_state(ofport->rstp_port)
+        : RSTP_DISABLED;
+
+    /* Update state. */
+    if (ofport->rstp_state != state) {
+        enum ofputil_port_state of_state;
+        bool fwd_change;
+
+        VLOG_DBG("port %s: RSTP state changed from %s to %s",
+                 netdev_get_name(ofport->up.netdev),
+                 rstp_state_name(ofport->rstp_state),
+                 rstp_state_name(state));
+
+        if (rstp_learn_in_state(ofport->rstp_state)
+            != rstp_learn_in_state(state)) {
+            /* XXX: Learning action flows should also be flushed. */
+            if (ofport->bundle) {
+                if (!rstp_shift_root_learned_address(ofproto->rstp)
+                    || rstp_get_old_root_aux(ofproto->rstp) != ofport) {
+                    bundle_flush_macs(ofport->bundle, false);
+                }
+            }
+        }
+        fwd_change = rstp_forward_in_state(ofport->rstp_state)
+            != rstp_forward_in_state(state);
+
+        ofproto->backer->need_revalidate = REV_RSTP;
+        ofport->rstp_state = state;
+
+        if (fwd_change && ofport->bundle) {
+            bundle_update(ofport->bundle);
+        }
+
+        /* Update the RSTP state bits in the OpenFlow port description. */
+        of_state = ofport->up.pp.state & ~OFPUTIL_PS_STP_MASK;
+        of_state |= (state == RSTP_LEARNING ? OFPUTIL_PS_STP_LEARN
+                : state == RSTP_FORWARDING ? OFPUTIL_PS_STP_FORWARD
+                : state == RSTP_DISCARDING ?  OFPUTIL_PS_STP_LISTEN
+                : 0);
+        ofproto_port_set_state(&ofport->up, of_state);
+    }
+}
+
+static void
+rstp_run(struct ofproto_dpif *ofproto)
+{
+    if (ofproto->rstp) {
+        long long int now = time_msec();
+        long long int elapsed = now - ofproto->rstp_last_tick;
+        struct rstp_port *rp;
+        struct ofport_dpif *ofport;
+
+        /* Every second, decrease the values of the timers. */
+        if (elapsed >= 1000) {
+            rstp_tick_timers(ofproto->rstp);
+            ofproto->rstp_last_tick = now;
+        }
+        rp = NULL;
+        while ((ofport = rstp_get_next_changed_port_aux(ofproto->rstp, &rp))) {
+            update_rstp_port_state(ofport);
+        }
+        rp = NULL;
+        ofport = NULL;
+        /* FIXME: This check should be done on-event (i.e., when setting
+         * p->fdb_flush) and not periodically.
+         */
+        while ((ofport = rstp_check_and_reset_fdb_flush(ofproto->rstp, &rp))) {
+            if (!rstp_shift_root_learned_address(ofproto->rstp)
+                || rstp_get_old_root_aux(ofproto->rstp) != ofport) {
+                bundle_flush_macs(ofport->bundle, false);
+            }
+        }
+
+        if (rstp_shift_root_learned_address(ofproto->rstp)) {
+            struct ofport_dpif *old_root_aux =
+                (struct ofport_dpif *)rstp_get_old_root_aux(ofproto->rstp);
+            struct ofport_dpif *new_root_aux =
+                (struct ofport_dpif *)rstp_get_new_root_aux(ofproto->rstp);
+            if (old_root_aux != NULL && new_root_aux != NULL) {
+                bundle_move(old_root_aux->bundle, new_root_aux->bundle);
+                rstp_reset_root_changed(ofproto->rstp);
+            }
+        }
+    }
+}
+
+/* Configures STP on 'ofproto_' using the settings defined in 's'. */
+static int
+set_stp(struct ofproto *ofproto_, const struct ofproto_stp_settings *s)
+{
+    struct ofproto_dpif *ofproto = ofproto_dpif_cast(ofproto_);
+
+    /* Only revalidate flows if the configuration changed. */
+    if (!s != !ofproto->stp) {
+        ofproto->backer->need_revalidate = REV_RECONFIGURE;
+    }
+
+    if (s) {
+        if (!ofproto->stp) {
+            ofproto->stp = stp_create(ofproto_->name, s->system_id,
+                                      send_bpdu_cb, ofproto);
+            ofproto->stp_last_tick = time_msec();
         }
 
         stp_set_bridge_id(ofproto->stp, s->system_id);
@@ -1958,16 +2428,17 @@ update_stp_port_state(struct ofport_dpif *ofport)
         enum ofputil_port_state of_state;
         bool fwd_change;
 
-        VLOG_DBG_RL(&rl, "port %s: STP state changed from %s to %s",
-                    netdev_get_name(ofport->up.netdev),
-                    stp_state_name(ofport->stp_state),
-                    stp_state_name(state));
+        VLOG_DBG("port %s: STP state changed from %s to %s",
+                 netdev_get_name(ofport->up.netdev),
+                 stp_state_name(ofport->stp_state),
+                 stp_state_name(state));
         if (stp_learn_in_state(ofport->stp_state)
                 != stp_learn_in_state(state)) {
             /* xxx Learning action flows should also be flushed. */
             ovs_rwlock_wrlock(&ofproto->ml->rwlock);
             mac_learning_flush(ofproto->ml);
             ovs_rwlock_unlock(&ofproto->ml->rwlock);
+            mcast_snooping_mdb_flush(ofproto->ms);
         }
         fwd_change = stp_forward_in_state(ofport->stp_state)
                         != stp_forward_in_state(state);
@@ -2010,13 +2481,17 @@ set_stp_port(struct ofport *ofport_,
         }
         return 0;
     } else if (sp && stp_port_no(sp) != s->port_num
-            && ofport == stp_port_get_aux(sp)) {
+               && ofport == stp_port_get_aux(sp)) {
         /* The port-id changed, so disable the old one if it's not
          * already in use by another port. */
         stp_port_disable(sp);
     }
 
     sp = ofport->stp_port = stp_get_port(ofproto->stp, s->port_num);
+
+    /* Set name before enabling the port so that debugging messages can print
+     * the name. */
+    stp_port_set_name(sp, netdev_get_name(ofport->up.netdev));
     stp_port_enable(sp);
 
     stp_port_set_aux(sp, ofport);
@@ -2093,6 +2568,7 @@ stp_run(struct ofproto_dpif *ofproto)
             ovs_rwlock_wrlock(&ofproto->ml->rwlock);
             mac_learning_flush(ofproto->ml);
             ovs_rwlock_unlock(&ofproto->ml->rwlock);
+            mcast_snooping_mdb_flush(ofproto->ms);
         }
     }
 }
@@ -2104,6 +2580,64 @@ stp_wait(struct ofproto_dpif *ofproto)
         poll_timer_wait(1000);
     }
 }
+
+/* Configures RSTP on 'ofport_' using the settings defined in 's'.  The
+ * caller is responsible for assigning RSTP port numbers and ensuring
+ * there are no duplicates. */
+static void
+set_rstp_port(struct ofport *ofport_,
+              const struct ofproto_port_rstp_settings *s)
+{
+    struct ofport_dpif *ofport = ofport_dpif_cast(ofport_);
+    struct ofproto_dpif *ofproto = ofproto_dpif_cast(ofport->up.ofproto);
+    struct rstp_port *rp = ofport->rstp_port;
+
+    if (!s || !s->enable) {
+        if (rp) {
+            rstp_port_set_aux(rp, NULL);
+            rstp_port_set_state(rp, RSTP_DISABLED);
+            rstp_port_set_mac_operational(rp, false);
+            ofport->rstp_port = NULL;
+            rstp_port_unref(rp);
+            update_rstp_port_state(ofport);
+        }
+        return;
+    }
+
+    /* Check if need to add a new port. */
+    if (!rp) {
+        rp = ofport->rstp_port = rstp_add_port(ofproto->rstp);
+    }
+
+    rstp_port_set(rp, s->port_num, s->priority, s->path_cost,
+                  s->admin_edge_port, s->auto_edge,
+                  s->admin_p2p_mac_state, s->admin_port_state, s->mcheck,
+                  ofport);
+    update_rstp_port_state(ofport);
+    /* Synchronize operational status. */
+    rstp_port_set_mac_operational(rp, ofport->may_enable);
+}
+
+static void
+get_rstp_port_status(struct ofport *ofport_,
+        struct ofproto_port_rstp_status *s)
+{
+    struct ofport_dpif *ofport = ofport_dpif_cast(ofport_);
+    struct ofproto_dpif *ofproto = ofproto_dpif_cast(ofport->up.ofproto);
+    struct rstp_port *rp = ofport->rstp_port;
+
+    if (!ofproto->rstp || !rp) {
+        s->enabled = false;
+        return;
+    }
+
+    s->enabled = true;
+    rstp_port_get_status(rp, &s->port_id, &s->state, &s->role,
+                         &s->designated_bridge_id, &s->designated_port_id,
+                         &s->designated_path_cost, &s->tx_count,
+                         &s->rx_count, &s->error_count, &s->uptime);
+}
+
 \f
 static int
 set_queues(struct ofport *ofport_, const struct ofproto_port_queue *qdscp,
@@ -2147,7 +2681,7 @@ bundle_flush_macs(struct ofbundle *bundle, bool all_ofprotos)
     ofproto->backer->need_revalidate = REV_RECONFIGURE;
     ovs_rwlock_wrlock(&ml->rwlock);
     LIST_FOR_EACH_SAFE (mac, next_mac, lru_node, &ml->lrus) {
-        if (mac->port.p == bundle) {
+        if (mac_entry_get_port(ml, mac) == bundle) {
             if (all_ofprotos) {
                 struct ofproto_dpif *o;
 
@@ -2171,6 +2705,25 @@ bundle_flush_macs(struct ofbundle *bundle, bool all_ofprotos)
     ovs_rwlock_unlock(&ml->rwlock);
 }
 
+static void
+bundle_move(struct ofbundle *old, struct ofbundle *new)
+{
+    struct ofproto_dpif *ofproto = old->ofproto;
+    struct mac_learning *ml = ofproto->ml;
+    struct mac_entry *mac, *next_mac;
+
+    ovs_assert(new->ofproto == old->ofproto);
+
+    ofproto->backer->need_revalidate = REV_RECONFIGURE;
+    ovs_rwlock_wrlock(&ml->rwlock);
+    LIST_FOR_EACH_SAFE (mac, next_mac, lru_node, &ml->lrus) {
+        if (mac_entry_get_port(ml, mac) == old) {
+            mac_entry_set_port(ml, mac, new);
+        }
+    }
+    ovs_rwlock_unlock(&ml->rwlock);
+}
+
 static struct ofbundle *
 bundle_lookup(const struct ofproto_dpif *ofproto, void *aux)
 {
@@ -2194,7 +2747,8 @@ bundle_update(struct ofbundle *bundle)
     LIST_FOR_EACH (port, bundle_node, &bundle->ports) {
         if (port->up.pp.config & OFPUTIL_PC_NO_FLOOD
             || port->is_layer3
-            || !stp_forward_in_state(port->stp_state)) {
+            || (bundle->ofproto->stp && !stp_forward_in_state(port->stp_state))
+            || (bundle->ofproto->rstp && !rstp_forward_in_state(port->rstp_state))) {
             bundle->floodable = false;
             break;
         }
@@ -2227,7 +2781,7 @@ bundle_add_port(struct ofbundle *bundle, ofp_port_t ofp_port,
 {
     struct ofport_dpif *port;
 
-    port = get_ofp_port(bundle->ofproto, ofp_port);
+    port = ofp_port_to_ofport(bundle->ofproto, ofp_port);
     if (!port) {
         return false;
     }
@@ -2242,7 +2796,8 @@ bundle_add_port(struct ofbundle *bundle, ofp_port_t ofp_port,
         list_push_back(&bundle->ports, &port->bundle_node);
         if (port->up.pp.config & OFPUTIL_PC_NO_FLOOD
             || port->is_layer3
-            || !stp_forward_in_state(port->stp_state)) {
+            || (bundle->ofproto->stp && !stp_forward_in_state(port->stp_state))
+            || (bundle->ofproto->rstp && !rstp_forward_in_state(port->rstp_state))) {
             bundle->floodable = false;
         }
     }
@@ -2265,11 +2820,11 @@ bundle_destroy(struct ofbundle *bundle)
     }
 
     ofproto = bundle->ofproto;
-    mbridge_unregister_bundle(ofproto->mbridge, bundle->aux);
+    mbridge_unregister_bundle(ofproto->mbridge, bundle);
 
-    ovs_rwlock_wrlock(&xlate_rwlock);
+    xlate_txn_start();
     xlate_bundle_remove(bundle);
-    ovs_rwlock_unlock(&xlate_rwlock);
+    xlate_txn_commit();
 
     LIST_FOR_EACH_SAFE (port, next_port, bundle_node, &bundle->ports) {
         bundle_del_port(port);
@@ -2488,21 +3043,21 @@ send_pdu_cb(void *port_, const void *pdu, size_t pdu_size)
 {
     static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 10);
     struct ofport_dpif *port = port_;
-    uint8_t ea[ETH_ADDR_LEN];
+    struct eth_addr ea;
     int error;
 
-    error = netdev_get_etheraddr(port->up.netdev, ea);
+    error = netdev_get_etheraddr(port->up.netdev, &ea);
     if (!error) {
-        struct ofpbuf packet;
+        struct dp_packet packet;
         void *packet_pdu;
 
-        ofpbuf_init(&packet, 0);
+        dp_packet_init(&packet, 0);
         packet_pdu = eth_compose(&packet, eth_addr_lacp, ea, ETH_TYPE_LACP,
                                  pdu_size);
         memcpy(packet_pdu, pdu, pdu_size);
 
         ofproto_dpif_send_packet(port, &packet);
-        ofpbuf_uninit(&packet);
+        dp_packet_uninit(&packet);
     } else {
         VLOG_ERR_RL(&rl, "port %s: cannot obtain Ethernet address of iface "
                     "%s (%s)", port->bundle->name,
@@ -2514,43 +3069,41 @@ static void
 bundle_send_learning_packets(struct ofbundle *bundle)
 {
     struct ofproto_dpif *ofproto = bundle->ofproto;
-    struct ofpbuf *learning_packet;
     int error, n_packets, n_errors;
     struct mac_entry *e;
-    struct list packets;
+    struct pkt_list {
+        struct ovs_list list_node;
+        struct ofport_dpif *port;
+        struct dp_packet *pkt;
+    } *pkt_node;
+    struct ovs_list packets;
 
     list_init(&packets);
     ovs_rwlock_rdlock(&ofproto->ml->rwlock);
     LIST_FOR_EACH (e, lru_node, &ofproto->ml->lrus) {
-        if (e->port.p != bundle) {
-            void *port_void;
-
-            learning_packet = bond_compose_learning_packet(bundle->bond,
-                                                           e->mac, e->vlan,
-                                                           &port_void);
-            /* Temporarily use 'frame' as a private pointer (see below). */
-            ovs_assert(learning_packet->frame == ofpbuf_data(learning_packet));
-            learning_packet->frame = port_void;
-            list_push_back(&packets, &learning_packet->list_node);
+        if (mac_entry_get_port(ofproto->ml, e) != bundle) {
+            pkt_node = xmalloc(sizeof *pkt_node);
+            pkt_node->pkt = bond_compose_learning_packet(bundle->bond,
+                                                         e->mac, e->vlan,
+                                                         (void **)&pkt_node->port);
+            list_push_back(&packets, &pkt_node->list_node);
         }
     }
     ovs_rwlock_unlock(&ofproto->ml->rwlock);
 
     error = n_packets = n_errors = 0;
-    LIST_FOR_EACH (learning_packet, list_node, &packets) {
+    LIST_FOR_EACH_POP (pkt_node, list_node, &packets) {
         int ret;
-        void *port_void = learning_packet->frame;
 
-        /* Restore 'frame'. */
-        learning_packet->frame = ofpbuf_data(learning_packet);
-        ret = ofproto_dpif_send_packet(port_void, learning_packet);
+        ret = ofproto_dpif_send_packet(pkt_node->port, pkt_node->pkt);
+        dp_packet_delete(pkt_node->pkt);
+        free(pkt_node);
         if (ret) {
             error = ret;
             n_errors++;
         }
         n_packets++;
     }
-    ofpbuf_list_delete(&packets);
 
     if (n_errors) {
         static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
@@ -2677,11 +3230,63 @@ set_mac_table_config(struct ofproto *ofproto_, unsigned int idle_time,
     mac_learning_set_max_entries(ofproto->ml, max_entries);
     ovs_rwlock_unlock(&ofproto->ml->rwlock);
 }
+
+/* Configures multicast snooping on 'ofport' using the settings
+ * defined in 's'. */
+static int
+set_mcast_snooping(struct ofproto *ofproto_,
+                   const struct ofproto_mcast_snooping_settings *s)
+{
+    struct ofproto_dpif *ofproto = ofproto_dpif_cast(ofproto_);
+
+    /* Only revalidate flows if the configuration changed. */
+    if (!s != !ofproto->ms) {
+        ofproto->backer->need_revalidate = REV_RECONFIGURE;
+    }
+
+    if (s) {
+        if (!ofproto->ms) {
+            ofproto->ms = mcast_snooping_create();
+        }
+
+        ovs_rwlock_wrlock(&ofproto->ms->rwlock);
+        mcast_snooping_set_idle_time(ofproto->ms, s->idle_time);
+        mcast_snooping_set_max_entries(ofproto->ms, s->max_entries);
+        if (mcast_snooping_set_flood_unreg(ofproto->ms, s->flood_unreg)) {
+            ofproto->backer->need_revalidate = REV_RECONFIGURE;
+        }
+        ovs_rwlock_unlock(&ofproto->ms->rwlock);
+    } else {
+        mcast_snooping_unref(ofproto->ms);
+        ofproto->ms = NULL;
+    }
+
+    return 0;
+}
+
+/* Configures multicast snooping port's flood settings on 'ofproto'. */
+static int
+set_mcast_snooping_port(struct ofproto *ofproto_, void *aux,
+                        const struct ofproto_mcast_snooping_port_settings *s)
+{
+    struct ofproto_dpif *ofproto = ofproto_dpif_cast(ofproto_);
+    struct ofbundle *bundle = bundle_lookup(ofproto, aux);
+
+    if (ofproto->ms && s) {
+        ovs_rwlock_wrlock(&ofproto->ms->rwlock);
+        mcast_snooping_set_port_flood(ofproto->ms, bundle, s->flood);
+        mcast_snooping_set_port_flood_reports(ofproto->ms, bundle,
+                                              s->flood_reports);
+        ovs_rwlock_unlock(&ofproto->ms->rwlock);
+    }
+    return 0;
+}
+
 \f
 /* Ports. */
 
-static struct ofport_dpif *
-get_ofp_port(const struct ofproto_dpif *ofproto, ofp_port_t ofp_port)
+struct ofport_dpif *
+ofp_port_to_ofport(const struct ofproto_dpif *ofproto, ofp_port_t ofp_port)
 {
     struct ofport *ofport = ofproto_get_port(&ofproto->up, ofp_port);
     return ofport ? ofport_dpif_cast(ofport) : NULL;
@@ -2787,7 +3392,12 @@ port_run(struct ofport_dpif *ofport)
 
     if (ofport->may_enable != enable) {
         struct ofproto_dpif *ofproto = ofproto_dpif_cast(ofport->up.ofproto);
+
         ofproto->backer->need_revalidate = REV_PORT_TOGGLED;
+
+        if (ofport->rstp_port) {
+            rstp_port_set_mac_operational(ofport->rstp_port, enable);
+        }
     }
 
     ofport->may_enable = enable;
@@ -2870,7 +3480,7 @@ static int
 port_del(struct ofproto *ofproto_, ofp_port_t ofp_port)
 {
     struct ofproto_dpif *ofproto = ofproto_dpif_cast(ofproto_);
-    struct ofport_dpif *ofport = get_ofp_port(ofproto, ofp_port);
+    struct ofport_dpif *ofport = ofp_port_to_ofport(ofproto, ofp_port);
     int error = 0;
 
     if (!ofport) {
@@ -2935,6 +3545,18 @@ port_get_stats(const struct ofport *ofport_, struct netdev_stats *stats)
     return error;
 }
 
+static int
+port_get_lacp_stats(const struct ofport *ofport_, struct lacp_slave_stats *stats)
+{
+    struct ofport_dpif *ofport = ofport_dpif_cast(ofport_);
+    if (ofport->bundle && ofport->bundle->lacp) {
+        if (lacp_get_slave_stats(ofport->bundle->lacp, ofport, stats)) {
+            return 0;
+        }
+    }
+    return -1;
+}
+
 struct port_dump_state {
     uint32_t bucket;
     uint32_t offset;
@@ -3045,8 +3667,6 @@ rule_expire(struct rule_dpif *rule)
     long long int now = time_msec();
     int reason = -1;
 
-    ovs_assert(!rule->up.pending);
-
     hard_timeout = rule->up.hard_timeout;
     idle_timeout = rule->up.idle_timeout;
 
@@ -3081,14 +3701,13 @@ rule_expire(struct rule_dpif *rule)
     }
 }
 
-/* Executes, within 'ofproto', the actions in 'rule' or 'ofpacts' on 'packet'.
- * 'flow' must reflect the data in 'packet'. */
 int
-ofproto_dpif_execute_actions(struct ofproto_dpif *ofproto,
-                             const struct flow *flow,
-                             struct rule_dpif *rule,
-                             const struct ofpact *ofpacts, size_t ofpacts_len,
-                             struct ofpbuf *packet)
+ofproto_dpif_execute_actions__(struct ofproto_dpif *ofproto,
+                               const struct flow *flow,
+                               struct rule_dpif *rule,
+                               const struct ofpact *ofpacts, size_t ofpacts_len,
+                               int recurse, int resubmits,
+                               struct dp_packet *packet)
 {
     struct dpif_flow_stats stats;
     struct xlate_out xout;
@@ -3105,61 +3724,72 @@ ofproto_dpif_execute_actions(struct ofproto_dpif *ofproto,
         rule_dpif_credit_stats(rule, &stats);
     }
 
-    xlate_in_init(&xin, ofproto, flow, rule, stats.tcp_flags, packet);
+    uint64_t odp_actions_stub[1024 / 8];
+    struct ofpbuf odp_actions = OFPBUF_STUB_INITIALIZER(odp_actions_stub);
+    xlate_in_init(&xin, ofproto, flow, flow->in_port.ofp_port, rule,
+                  stats.tcp_flags, packet, NULL, &odp_actions);
     xin.ofpacts = ofpacts;
     xin.ofpacts_len = ofpacts_len;
     xin.resubmit_stats = &stats;
-    xlate_actions(&xin, &xout);
+    xin.recurse = recurse;
+    xin.resubmits = resubmits;
+    if (xlate_actions(&xin, &xout) != XLATE_OK) {
+        error = EINVAL;
+        goto out;
+    }
+
+    execute.actions = odp_actions.data;
+    execute.actions_len = odp_actions.size;
 
+    pkt_metadata_from_flow(&packet->md, flow);
+    execute.packet = packet;
+    execute.needs_help = (xout.slow & SLOW_ACTION) != 0;
+    execute.probe = false;
+    execute.mtu = 0;
+
+    /* Fix up in_port. */
     in_port = flow->in_port.ofp_port;
     if (in_port == OFPP_NONE) {
         in_port = OFPP_LOCAL;
     }
-    execute.actions = ofpbuf_data(&xout.odp_actions);
-    execute.actions_len = ofpbuf_size(&xout.odp_actions);
-    execute.packet = packet;
-    execute.md.tunnel = flow->tunnel;
-    execute.md.skb_priority = flow->skb_priority;
-    execute.md.pkt_mark = flow->pkt_mark;
-    execute.md.in_port.odp_port = ofp_port_to_odp_port(ofproto, in_port);
-    execute.needs_help = (xout.slow & SLOW_ACTION) != 0;
+    execute.packet->md.in_port.odp_port = ofp_port_to_odp_port(ofproto, in_port);
 
     error = dpif_execute(ofproto->backer->dpif, &execute);
-
+out:
     xlate_out_uninit(&xout);
+    ofpbuf_uninit(&odp_actions);
 
     return error;
 }
 
+/* Executes, within 'ofproto', the actions in 'rule' or 'ofpacts' on 'packet'.
+ * 'flow' must reflect the data in 'packet'. */
+int
+ofproto_dpif_execute_actions(struct ofproto_dpif *ofproto,
+                             const struct flow *flow,
+                             struct rule_dpif *rule,
+                             const struct ofpact *ofpacts, size_t ofpacts_len,
+                             struct dp_packet *packet)
+{
+    return ofproto_dpif_execute_actions__(ofproto, flow, rule, ofpacts,
+                                          ofpacts_len, 0, 0, packet);
+}
+
 void
 rule_dpif_credit_stats(struct rule_dpif *rule,
                        const struct dpif_flow_stats *stats)
 {
     ovs_mutex_lock(&rule->stats_mutex);
-    rule->stats.n_packets += stats->n_packets;
-    rule->stats.n_bytes += stats->n_bytes;
-    rule->stats.used = MAX(rule->stats.used, stats->used);
+    if (OVS_UNLIKELY(rule->new_rule)) {
+        rule_dpif_credit_stats(rule->new_rule, stats);
+    } else {
+        rule->stats.n_packets += stats->n_packets;
+        rule->stats.n_bytes += stats->n_bytes;
+        rule->stats.used = MAX(rule->stats.used, stats->used);
+    }
     ovs_mutex_unlock(&rule->stats_mutex);
 }
 
-bool
-rule_dpif_is_fail_open(const struct rule_dpif *rule)
-{
-    return is_fail_open_rule(&rule->up);
-}
-
-bool
-rule_dpif_is_table_miss(const struct rule_dpif *rule)
-{
-    return rule_is_table_miss(&rule->up);
-}
-
-bool
-rule_dpif_is_internal(const struct rule_dpif *rule)
-{
-    return rule_is_internal(&rule->up);
-}
-
 ovs_be64
 rule_dpif_get_flow_cookie(const struct rule_dpif *rule)
     OVS_REQUIRES(rule->up.mutex)
@@ -3174,130 +3804,71 @@ rule_dpif_reduce_timeouts(struct rule_dpif *rule, uint16_t idle_timeout,
     ofproto_rule_reduce_timeouts(&rule->up, idle_timeout, hard_timeout);
 }
 
-/* Returns 'rule''s actions.  The caller owns a reference on the returned
- * actions and must eventually release it (with rule_actions_unref()) to avoid
- * a memory leak. */
-struct rule_actions *
+/* Returns 'rule''s actions.  The returned actions are RCU-protected, and can
+ * be read until the calling thread quiesces. */
+const struct rule_actions *
 rule_dpif_get_actions(const struct rule_dpif *rule)
 {
     return rule_get_actions(&rule->up);
 }
 
-/* Lookup 'flow' in table 0 of 'ofproto''s classifier.
- * If 'wc' is non-null, sets the fields that were relevant as part of
- * the lookup. Returns the table_id where a match or miss occurred.
- *
- * The return value will be zero unless there was a miss and
- * OFPTC11_TABLE_MISS_CONTINUE is in effect for the sequence of tables
- * where misses occur. */
-uint8_t
-rule_dpif_lookup(struct ofproto_dpif *ofproto, struct flow *flow,
-                 struct flow_wildcards *wc, struct rule_dpif **rule)
-{
-    enum rule_dpif_lookup_verdict verdict;
-    enum ofputil_port_config config = 0;
-    uint8_t table_id;
-
-    if (ofproto_dpif_get_enable_recirc(ofproto)) {
-        /* Always exactly match recirc_id since datapath supports
-         * recirculation.  */
-        if (wc) {
-            wc->masks.recirc_id = UINT32_MAX;
-        }
-
-        /* Start looking up from internal table for post recirculation flows
-         * or packets. We can also simply send all, including normal flows
-         * or packets to the internal table. They will not match any post
-         * recirculation rules except the 'catch all' rule that resubmit
-         * them to table 0.
-         *
-         * As an optimization, we send normal flows and packets to table 0
-         * directly, saving one table lookup.  */
-        table_id = flow->recirc_id ? TBL_INTERNAL : 0;
+/* Sets 'rule''s recirculation id. */
+static void
+rule_dpif_set_recirc_id(struct rule_dpif *rule, uint32_t id)
+    OVS_REQUIRES(rule->up.mutex)
+{
+    ovs_assert(!rule->recirc_id || rule->recirc_id == id);
+    if (rule->recirc_id == id) {
+        /* Release the new reference to the same id. */
+        recirc_free_id(id);
     } else {
-        table_id = 0;
+        rule->recirc_id = id;
     }
+}
 
-    verdict = rule_dpif_lookup_from_table(ofproto, flow, wc, true,
-                                          &table_id, rule);
+/* Sets 'rule''s recirculation id. */
+void
+rule_set_recirc_id(struct rule *rule_, uint32_t id)
+{
+    struct rule_dpif *rule = rule_dpif_cast(rule_);
 
-    switch (verdict) {
-    case RULE_DPIF_LOOKUP_VERDICT_MATCH:
-        return table_id;
-    case RULE_DPIF_LOOKUP_VERDICT_CONTROLLER: {
-        struct ofport_dpif *port;
+    ovs_mutex_lock(&rule->up.mutex);
+    rule_dpif_set_recirc_id(rule, id);
+    ovs_mutex_unlock(&rule->up.mutex);
+}
 
-        port = get_ofp_port(ofproto, flow->in_port.ofp_port);
-        if (!port) {
-            VLOG_WARN_RL(&rl, "packet-in on unknown OpenFlow port %"PRIu16,
-                         flow->in_port.ofp_port);
-        }
-        config = port ? port->up.pp.config : 0;
-        break;
-    }
-    case RULE_DPIF_LOOKUP_VERDICT_DROP:
-        config = OFPUTIL_PC_NO_PACKET_IN;
-        break;
-    case RULE_DPIF_LOOKUP_VERDICT_DEFAULT:
-        if (!connmgr_wants_packet_in_on_miss(ofproto->up.connmgr)) {
-            config = OFPUTIL_PC_NO_PACKET_IN;
-        }
-        break;
-    default:
-        OVS_NOT_REACHED();
-    }
+cls_version_t
+ofproto_dpif_get_tables_version(struct ofproto_dpif *ofproto OVS_UNUSED)
+{
+    cls_version_t version;
+
+    atomic_read_relaxed(&ofproto->tables_version, &version);
 
-    choose_miss_rule(config, ofproto->miss_rule,
-                     ofproto->no_packet_in_rule, rule);
-    return table_id;
+    return version;
 }
 
+/* The returned rule (if any) is valid at least until the next RCU quiescent
+ * period.  If the rule needs to stay around longer, the caller should take
+ * a reference.
+ *
+ * 'flow' is non-const to allow for temporary modifications during the lookup.
+ * Any changes are restored before returning. */
 static struct rule_dpif *
-rule_dpif_lookup_in_table(struct ofproto_dpif *ofproto, uint8_t table_id,
-                          const struct flow *flow, struct flow_wildcards *wc)
+rule_dpif_lookup_in_table(struct ofproto_dpif *ofproto, cls_version_t version,
+                          uint8_t table_id, struct flow *flow,
+                          struct flow_wildcards *wc)
 {
     struct classifier *cls = &ofproto->up.tables[table_id].cls;
-    const struct cls_rule *cls_rule;
-    struct rule_dpif *rule;
-
-    fat_rwlock_rdlock(&cls->rwlock);
-    if (ofproto->up.frag_handling != OFPC_FRAG_NX_MATCH) {
-        if (wc) {
-            memset(&wc->masks.dl_type, 0xff, sizeof wc->masks.dl_type);
-            if (is_ip_any(flow)) {
-                wc->masks.nw_frag |= FLOW_NW_FRAG_MASK;
-            }
-        }
-
-        if (flow->nw_frag & FLOW_NW_FRAG_ANY) {
-            if (ofproto->up.frag_handling == OFPC_FRAG_NORMAL) {
-                /* We must pretend that transport ports are unavailable. */
-                struct flow ofpc_normal_flow = *flow;
-                ofpc_normal_flow.tp_src = htons(0);
-                ofpc_normal_flow.tp_dst = htons(0);
-                cls_rule = classifier_lookup(cls, &ofpc_normal_flow, wc);
-            } else {
-                /* Must be OFPC_FRAG_DROP (we don't have OFPC_FRAG_REASM). */
-                cls_rule = &ofproto->drop_frags_rule->up.cr;
-            }
-        } else {
-            cls_rule = classifier_lookup(cls, flow, wc);
-        }
-    } else {
-        cls_rule = classifier_lookup(cls, flow, wc);
-    }
-
-    rule = rule_dpif_cast(rule_from_cls_rule(cls_rule));
-    rule_dpif_ref(rule);
-    fat_rwlock_unlock(&cls->rwlock);
-
-    return rule;
+    return rule_dpif_cast(rule_from_cls_rule(classifier_lookup(cls, version,
+                                                               flow, wc)));
 }
 
-/* Look up 'flow' in 'ofproto''s classifier starting from table '*table_id'.
- * Stores the rule that was found in '*rule', or NULL if none was found.
- * Updates 'wc', if nonnull, to reflect the fields that were used during the
- * lookup.
+/* Look up 'flow' in 'ofproto''s classifier version 'version', starting from
+ * table '*table_id'.  Returns the rule that was found, which may be one of the
+ * special rules according to packet miss hadling.  If 'may_packet_in' is
+ * false, returning of the miss_rule (which issues packet ins for the
+ * controller) is avoided.  Updates 'wc', if nonnull, to reflect the fields
+ * that were used during the lookup.
  *
  * If 'honor_table_miss' is true, the first lookup occurs in '*table_id', but
  * if none is found then the table miss configuration for that table is
@@ -3308,86 +3879,114 @@ rule_dpif_lookup_in_table(struct ofproto_dpif *ofproto, uint8_t table_id,
  * If 'honor_table_miss' is false, then only one table lookup occurs, in
  * '*table_id'.
  *
- * Returns:
- *
- *    - RULE_DPIF_LOOKUP_VERDICT_MATCH if a rule (in '*rule') was found.
+ * The rule is returned in '*rule', which is valid at least until the next
+ * RCU quiescent period.  If the '*rule' needs to stay around longer, the
+ * caller must take a reference.
  *
- *    - RULE_OFPTC_TABLE_MISS_CONTROLLER if no rule was found and either:
- *      + 'honor_table_miss' is false
- *      + a table miss configuration specified that the packet should be
- *        sent to the controller in this case.
+ * 'in_port' allows the lookup to take place as if the in port had the value
+ * 'in_port'.  This is needed for resubmit action support.
  *
- *    - RULE_DPIF_LOOKUP_VERDICT_DROP if no rule was found, 'honor_table_miss'
- *      is true and a table miss configuration specified that the packet
- *      should be dropped in this case.
- *
- *    - RULE_DPIF_LOOKUP_VERDICT_DEFAULT if no rule was found,
- *      'honor_table_miss' is true and a table miss configuration has
- *      not been specified in this case. */
-enum rule_dpif_lookup_verdict
+ * 'flow' is non-const to allow for temporary modifications during the lookup.
+ * Any changes are restored before returning. */
+struct rule_dpif *
 rule_dpif_lookup_from_table(struct ofproto_dpif *ofproto,
-                            const struct flow *flow,
+                            cls_version_t version, struct flow *flow,
                             struct flow_wildcards *wc,
-                            bool honor_table_miss,
-                            uint8_t *table_id, struct rule_dpif **rule)
+                            const struct dpif_flow_stats *stats,
+                            uint8_t *table_id, ofp_port_t in_port,
+                            bool may_packet_in, bool honor_table_miss)
 {
+    ovs_be16 old_tp_src = flow->tp_src, old_tp_dst = flow->tp_dst;
+    ofp_port_t old_in_port = flow->in_port.ofp_port;
+    enum ofputil_table_miss miss_config;
+    struct rule_dpif *rule;
     uint8_t next_id;
 
+    /* We always unwildcard nw_frag (for IP), so they
+     * need not be unwildcarded here. */
+    if (flow->nw_frag & FLOW_NW_FRAG_ANY
+        && ofproto->up.frag_handling != OFPUTIL_FRAG_NX_MATCH) {
+        if (ofproto->up.frag_handling == OFPUTIL_FRAG_NORMAL) {
+            /* We must pretend that transport ports are unavailable. */
+            flow->tp_src = htons(0);
+            flow->tp_dst = htons(0);
+        } else {
+            /* Must be OFPUTIL_FRAG_DROP (we don't have OFPUTIL_FRAG_REASM).
+             * Use the drop_frags_rule (which cannot disappear). */
+            rule = ofproto->drop_frags_rule;
+            if (stats) {
+                struct oftable *tbl = &ofproto->up.tables[*table_id];
+                unsigned long orig;
+
+                atomic_add_relaxed(&tbl->n_matched, stats->n_packets, &orig);
+            }
+            return rule;
+        }
+    }
+
+    /* Look up a flow with 'in_port' as the input port.  Then restore the
+     * original input port (otherwise OFPP_NORMAL and OFPP_IN_PORT will
+     * have surprising behavior). */
+    flow->in_port.ofp_port = in_port;
+
+    /* Our current implementation depends on n_tables == N_TABLES, and
+     * TBL_INTERNAL being the last table. */
+    BUILD_ASSERT_DECL(N_TABLES == TBL_INTERNAL + 1);
+
+    miss_config = OFPUTIL_TABLE_MISS_CONTINUE;
+
     for (next_id = *table_id;
          next_id < ofproto->up.n_tables;
          next_id++, next_id += (next_id == TBL_INTERNAL))
     {
         *table_id = next_id;
-        *rule = rule_dpif_lookup_in_table(ofproto, *table_id, flow, wc);
-        if (*rule) {
-            return RULE_DPIF_LOOKUP_VERDICT_MATCH;
-        } else if (!honor_table_miss) {
-            return RULE_DPIF_LOOKUP_VERDICT_CONTROLLER;
-        } else {
-            switch (ofproto_table_get_config(&ofproto->up, *table_id)) {
-            case OFPROTO_TABLE_MISS_CONTINUE:
-                break;
-
-            case OFPROTO_TABLE_MISS_CONTROLLER:
-                return RULE_DPIF_LOOKUP_VERDICT_CONTROLLER;
-
-            case OFPROTO_TABLE_MISS_DROP:
-                return RULE_DPIF_LOOKUP_VERDICT_DROP;
+        rule = rule_dpif_lookup_in_table(ofproto, version, next_id, flow, wc);
+        if (stats) {
+            struct oftable *tbl = &ofproto->up.tables[next_id];
+            unsigned long orig;
 
-            case OFPROTO_TABLE_MISS_DEFAULT:
-                return RULE_DPIF_LOOKUP_VERDICT_DEFAULT;
+            atomic_add_relaxed(rule ? &tbl->n_matched : &tbl->n_missed,
+                               stats->n_packets, &orig);
+        }
+        if (rule) {
+            goto out;   /* Match. */
+        }
+        if (honor_table_miss) {
+            miss_config = ofproto_table_get_miss_config(&ofproto->up,
+                                                        *table_id);
+            if (miss_config == OFPUTIL_TABLE_MISS_CONTINUE) {
+                continue;
+            }
+        }
+        break;
+    }
+    /* Miss. */
+    rule = ofproto->no_packet_in_rule;
+    if (may_packet_in) {
+        if (miss_config == OFPUTIL_TABLE_MISS_CONTINUE
+            || miss_config == OFPUTIL_TABLE_MISS_CONTROLLER) {
+            struct ofport_dpif *port;
+
+            port = ofp_port_to_ofport(ofproto, old_in_port);
+            if (!port) {
+                VLOG_WARN_RL(&rl, "packet-in on unknown OpenFlow port %"PRIu16,
+                             old_in_port);
+            } else if (!(port->up.pp.config & OFPUTIL_PC_NO_PACKET_IN)) {
+                rule = ofproto->miss_rule;
             }
+        } else if (miss_config == OFPUTIL_TABLE_MISS_DEFAULT &&
+                   connmgr_wants_packet_in_on_miss(ofproto->up.connmgr)) {
+            rule = ofproto->miss_rule;
         }
     }
+out:
+    /* Restore port numbers, as they may have been modified above. */
+    flow->tp_src = old_tp_src;
+    flow->tp_dst = old_tp_dst;
+    /* Restore the old in port. */
+    flow->in_port.ofp_port = old_in_port;
 
-    return RULE_DPIF_LOOKUP_VERDICT_CONTROLLER;
-}
-
-/* Given a port configuration (specified as zero if there's no port), chooses
- * which of 'miss_rule' and 'no_packet_in_rule' should be used in case of a
- * flow table miss. */
-void
-choose_miss_rule(enum ofputil_port_config config, struct rule_dpif *miss_rule,
-                 struct rule_dpif *no_packet_in_rule, struct rule_dpif **rule)
-{
-    *rule = config & OFPUTIL_PC_NO_PACKET_IN ? no_packet_in_rule : miss_rule;
-    rule_dpif_ref(*rule);
-}
-
-void
-rule_dpif_ref(struct rule_dpif *rule)
-{
-    if (rule) {
-        ofproto_rule_ref(&rule->up);
-    }
-}
-
-void
-rule_dpif_unref(struct rule_dpif *rule)
-{
-    if (rule) {
-        ofproto_rule_unref(&rule->up);
-    }
+    return rule;
 }
 
 static void
@@ -3397,7 +3996,6 @@ complete_operation(struct rule_dpif *rule)
     struct ofproto_dpif *ofproto = ofproto_dpif_cast(rule->up.ofproto);
 
     ofproto->backer->need_revalidate = REV_FLOW_TABLE;
-    ofoperation_complete(rule->up.pending, 0);
 }
 
 static struct rule_dpif *rule_dpif_cast(const struct rule *rule)
@@ -3408,7 +4006,7 @@ static struct rule_dpif *rule_dpif_cast(const struct rule *rule)
 static struct rule *
 rule_alloc(void)
 {
-    struct rule_dpif *rule = xmalloc(sizeof *rule);
+    struct rule_dpif *rule = xzalloc(sizeof *rule);
     return &rule->up;
 }
 
@@ -3419,23 +4017,139 @@ rule_dealloc(struct rule *rule_)
     free(rule);
 }
 
+static enum ofperr
+check_mask(struct ofproto_dpif *ofproto, const struct miniflow *flow)
+{
+    const struct odp_support *support;
+    uint16_t ct_state, ct_zone;
+    ovs_u128 ct_label;
+    uint32_t ct_mark;
+
+    support = &ofproto_dpif_get_support(ofproto)->odp;
+    ct_state = MINIFLOW_GET_U16(flow, ct_state);
+    if (support->ct_state && support->ct_zone && support->ct_mark
+        && support->ct_label && support->ct_state_nat) {
+        return ct_state & CS_UNSUPPORTED_MASK ? OFPERR_OFPBMC_BAD_MASK : 0;
+    }
+
+    ct_zone = MINIFLOW_GET_U16(flow, ct_zone);
+    ct_mark = MINIFLOW_GET_U32(flow, ct_mark);
+    ct_label = MINIFLOW_GET_U128(flow, ct_label);
+
+    if ((ct_state && !support->ct_state)
+        || (ct_state & CS_UNSUPPORTED_MASK)
+        || ((ct_state & (CS_SRC_NAT | CS_DST_NAT)) && !support->ct_state_nat)
+        || (ct_zone && !support->ct_zone)
+        || (ct_mark && !support->ct_mark)
+        || (!ovs_u128_is_zero(&ct_label) && !support->ct_label)) {
+        return OFPERR_OFPBMC_BAD_MASK;
+    }
+
+    return 0;
+}
+
+static enum ofperr
+check_actions(const struct ofproto_dpif *ofproto,
+              const struct rule_actions *const actions)
+{
+    const struct ofpact *ofpact;
+
+    OFPACT_FOR_EACH (ofpact, actions->ofpacts, actions->ofpacts_len) {
+        const struct odp_support *support;
+        const struct ofpact_conntrack *ct;
+        const struct ofpact *a;
+
+        if (ofpact->type != OFPACT_CT) {
+            continue;
+        }
+
+        ct = CONTAINER_OF(ofpact, struct ofpact_conntrack, ofpact);
+        support = &ofproto_dpif_get_support(ofproto)->odp;
+
+        if (!support->ct_state) {
+            return OFPERR_OFPBAC_BAD_TYPE;
+        }
+        if ((ct->zone_imm || ct->zone_src.field) && !support->ct_zone) {
+            return OFPERR_OFPBAC_BAD_ARGUMENT;
+        }
+
+        OFPACT_FOR_EACH(a, ct->actions, ofpact_ct_get_action_len(ct)) {
+            const struct mf_field *dst = ofpact_get_mf_dst(a);
+
+            if (a->type == OFPACT_NAT && !support->ct_state_nat) {
+                /* The backer doesn't seem to support the NAT bits in
+                 * 'ct_state': assume that it doesn't support the NAT
+                 * action. */
+                return OFPERR_OFPBAC_BAD_TYPE;
+            }
+            if (dst && ((dst->id == MFF_CT_MARK && !support->ct_mark)
+                        || (dst->id == MFF_CT_LABEL && !support->ct_label))) {
+                return OFPERR_OFPBAC_BAD_SET_ARGUMENT;
+            }
+        }
+    }
+
+    return 0;
+}
+
+static enum ofperr
+rule_check(struct rule *rule)
+{
+    struct ofproto_dpif *ofproto = ofproto_dpif_cast(rule->ofproto);
+    enum ofperr err;
+
+    err = check_mask(ofproto, &rule->cr.match.mask->masks);
+    if (err) {
+        return err;
+    }
+    return check_actions(ofproto, rule->actions);
+}
+
 static enum ofperr
 rule_construct(struct rule *rule_)
     OVS_NO_THREAD_SAFETY_ANALYSIS
 {
     struct rule_dpif *rule = rule_dpif_cast(rule_);
+    int error;
+
+    error = rule_check(rule_);
+    if (error) {
+        return error;
+    }
+
     ovs_mutex_init_adaptive(&rule->stats_mutex);
     rule->stats.n_packets = 0;
     rule->stats.n_bytes = 0;
     rule->stats.used = rule->up.modified;
+    rule->recirc_id = 0;
+    rule->new_rule = NULL;
+
     return 0;
 }
 
 static void
-rule_insert(struct rule *rule_)
+rule_insert(struct rule *rule_, struct rule *old_rule_, bool forward_stats)
     OVS_REQUIRES(ofproto_mutex)
 {
     struct rule_dpif *rule = rule_dpif_cast(rule_);
+
+    if (old_rule_ && forward_stats) {
+        struct rule_dpif *old_rule = rule_dpif_cast(old_rule_);
+
+        ovs_assert(!old_rule->new_rule);
+
+        /* Take a reference to the new rule, and refer all stats updates from
+         * the old rule to the new rule. */
+        rule_dpif_ref(rule);
+
+        ovs_mutex_lock(&old_rule->stats_mutex);
+        ovs_mutex_lock(&rule->stats_mutex);
+        old_rule->new_rule = rule;       /* Forward future stats. */
+        rule->stats = old_rule->stats;   /* Transfer stats to the new rule. */
+        ovs_mutex_unlock(&rule->stats_mutex);
+        ovs_mutex_unlock(&old_rule->stats_mutex);
+    }
+
     complete_operation(rule);
 }
 
@@ -3449,9 +4163,18 @@ rule_delete(struct rule *rule_)
 
 static void
 rule_destruct(struct rule *rule_)
+    OVS_NO_THREAD_SAFETY_ANALYSIS
 {
     struct rule_dpif *rule = rule_dpif_cast(rule_);
+
     ovs_mutex_destroy(&rule->stats_mutex);
+    /* Release reference to the new rule, if any. */
+    if (rule->new_rule) {
+        rule_dpif_unref(rule->new_rule);
+    }
+    if (rule->recirc_id) {
+        recirc_free_id(rule->recirc_id);
+    }
 }
 
 static void
@@ -3461,15 +4184,19 @@ rule_get_stats(struct rule *rule_, uint64_t *packets, uint64_t *bytes,
     struct rule_dpif *rule = rule_dpif_cast(rule_);
 
     ovs_mutex_lock(&rule->stats_mutex);
-    *packets = rule->stats.n_packets;
-    *bytes = rule->stats.n_bytes;
-    *used = rule->stats.used;
+    if (OVS_UNLIKELY(rule->new_rule)) {
+        rule_get_stats(&rule->new_rule->up, packets, bytes, used);
+    } else {
+        *packets = rule->stats.n_packets;
+        *bytes = rule->stats.n_bytes;
+        *used = rule->stats.used;
+    }
     ovs_mutex_unlock(&rule->stats_mutex);
 }
 
 static void
 rule_dpif_execute(struct rule_dpif *rule, const struct flow *flow,
-                  struct ofpbuf *packet)
+                  struct dp_packet *packet)
 {
     struct ofproto_dpif *ofproto = ofproto_dpif_cast(rule->up.ofproto);
 
@@ -3478,29 +4205,13 @@ rule_dpif_execute(struct rule_dpif *rule, const struct flow *flow,
 
 static enum ofperr
 rule_execute(struct rule *rule, const struct flow *flow,
-             struct ofpbuf *packet)
+             struct dp_packet *packet)
 {
     rule_dpif_execute(rule_dpif_cast(rule), flow, packet);
-    ofpbuf_delete(packet);
+    dp_packet_delete(packet);
     return 0;
 }
 
-static void
-rule_modify_actions(struct rule *rule_, bool reset_counters)
-    OVS_REQUIRES(ofproto_mutex)
-{
-    struct rule_dpif *rule = rule_dpif_cast(rule_);
-
-    if (reset_counters) {
-        ovs_mutex_lock(&rule->stats_mutex);
-        rule->stats.n_packets = 0;
-        rule->stats.n_bytes = 0;
-        ovs_mutex_unlock(&rule->stats_mutex);
-    }
-
-    complete_operation(rule);
-}
-
 static struct group_dpif *group_dpif_cast(const struct ofgroup *group)
 {
     return group ? CONTAINER_OF(group, struct group_dpif, up) : NULL;
@@ -3524,34 +4235,46 @@ static void
 group_construct_stats(struct group_dpif *group)
     OVS_REQUIRES(group->stats_mutex)
 {
+    struct ofputil_bucket *bucket;
+    const struct ovs_list *buckets;
+
     group->packet_count = 0;
     group->byte_count = 0;
-    if (!group->bucket_stats) {
-        group->bucket_stats = xcalloc(group->up.n_buckets,
-                                      sizeof *group->bucket_stats);
-    } else {
-        memset(group->bucket_stats, 0, group->up.n_buckets *
-               sizeof *group->bucket_stats);
+
+    group_dpif_get_buckets(group, &buckets);
+    LIST_FOR_EACH (bucket, list_node, buckets) {
+        bucket->stats.packet_count = 0;
+        bucket->stats.byte_count = 0;
+    }
+}
+
+void
+group_dpif_credit_stats(struct group_dpif *group,
+                        struct ofputil_bucket *bucket,
+                        const struct dpif_flow_stats *stats)
+{
+    ovs_mutex_lock(&group->stats_mutex);
+    group->packet_count += stats->n_packets;
+    group->byte_count += stats->n_bytes;
+    if (bucket) {
+        bucket->stats.packet_count += stats->n_packets;
+        bucket->stats.byte_count += stats->n_bytes;
+    } else { /* Credit to all buckets */
+        const struct ovs_list *buckets;
+
+        group_dpif_get_buckets(group, &buckets);
+        LIST_FOR_EACH (bucket, list_node, buckets) {
+            bucket->stats.packet_count += stats->n_packets;
+            bucket->stats.byte_count += stats->n_bytes;
+        }
     }
+    ovs_mutex_unlock(&group->stats_mutex);
 }
 
 static enum ofperr
 group_construct(struct ofgroup *group_)
 {
     struct group_dpif *group = group_dpif_cast(group_);
-    const struct ofputil_bucket *bucket;
-
-    /* Prevent group chaining because our locking structure makes it hard to
-     * implement deadlock-free.  (See xlate_group_resource_check().) */
-    LIST_FOR_EACH (bucket, list_node, &group->up.buckets) {
-        const struct ofpact *a;
-
-        OFPACT_FOR_EACH (a, bucket->ofpacts, bucket->ofpacts_len) {
-            if (a->type == OFPACT_GROUP) {
-                return OFPERR_OFPGMFC_CHAINING_UNSUPPORTED;
-            }
-        }
-    }
 
     ovs_mutex_init_adaptive(&group->stats_mutex);
     ovs_mutex_lock(&group->stats_mutex);
@@ -3560,37 +4283,17 @@ group_construct(struct ofgroup *group_)
     return 0;
 }
 
-static void
-group_destruct__(struct group_dpif *group)
-    OVS_REQUIRES(group->stats_mutex)
-{
-    free(group->bucket_stats);
-    group->bucket_stats = NULL;
-}
-
 static void
 group_destruct(struct ofgroup *group_)
 {
     struct group_dpif *group = group_dpif_cast(group_);
-    ovs_mutex_lock(&group->stats_mutex);
-    group_destruct__(group);
-    ovs_mutex_unlock(&group->stats_mutex);
     ovs_mutex_destroy(&group->stats_mutex);
 }
 
 static enum ofperr
-group_modify(struct ofgroup *group_, struct ofgroup *victim_)
+group_modify(struct ofgroup *group_)
 {
     struct ofproto_dpif *ofproto = ofproto_dpif_cast(group_->ofproto);
-    struct group_dpif *group = group_dpif_cast(group_);
-    struct group_dpif *victim = group_dpif_cast(victim_);
-
-    ovs_mutex_lock(&group->stats_mutex);
-    if (victim->up.n_buckets < group->up.n_buckets) {
-        group_destruct__(group);
-    }
-    group_construct_stats(group);
-    ovs_mutex_unlock(&group->stats_mutex);
 
     ofproto->backer->need_revalidate = REV_FLOW_TABLE;
 
@@ -3601,42 +4304,46 @@ static enum ofperr
 group_get_stats(const struct ofgroup *group_, struct ofputil_group_stats *ogs)
 {
     struct group_dpif *group = group_dpif_cast(group_);
+    struct ofputil_bucket *bucket;
+    const struct ovs_list *buckets;
+    struct bucket_counter *bucket_stats;
 
     ovs_mutex_lock(&group->stats_mutex);
     ogs->packet_count = group->packet_count;
     ogs->byte_count = group->byte_count;
-    memcpy(ogs->bucket_stats, group->bucket_stats,
-           group->up.n_buckets * sizeof *group->bucket_stats);
+
+    group_dpif_get_buckets(group, &buckets);
+    bucket_stats = ogs->bucket_stats;
+    LIST_FOR_EACH (bucket, list_node, buckets) {
+        bucket_stats->packet_count = bucket->stats.packet_count;
+        bucket_stats->byte_count = bucket->stats.byte_count;
+        bucket_stats++;
+    }
     ovs_mutex_unlock(&group->stats_mutex);
 
     return 0;
 }
 
+/* If the group exists, this function increments the groups's reference count.
+ *
+ * Make sure to call group_dpif_unref() after no longer needing to maintain
+ * a reference to the group. */
 bool
 group_dpif_lookup(struct ofproto_dpif *ofproto, uint32_t group_id,
                   struct group_dpif **group)
-    OVS_TRY_RDLOCK(true, (*group)->up.rwlock)
 {
     struct ofgroup *ofgroup;
     bool found;
 
-    *group = NULL;
     found = ofproto_group_lookup(&ofproto->up, group_id, &ofgroup);
     *group = found ?  group_dpif_cast(ofgroup) : NULL;
 
     return found;
 }
 
-void
-group_dpif_release(struct group_dpif *group)
-    OVS_RELEASES(group->up.rwlock)
-{
-    ofproto_group_release(&group->up);
-}
-
 void
 group_dpif_get_buckets(const struct group_dpif *group,
-                       const struct list **buckets)
+                       const struct ovs_list **buckets)
 {
     *buckets = &group->up.buckets;
 }
@@ -3646,12 +4353,18 @@ group_dpif_get_type(const struct group_dpif *group)
 {
     return group->up.type;
 }
+
+const char *
+group_dpif_get_selection_method(const struct group_dpif *group)
+{
+    return group->up.props.selection_method;
+}
 \f
 /* Sends 'packet' out 'ofport'.
  * May modify 'packet'.
  * Returns 0 if successful, otherwise a positive errno value. */
 int
-ofproto_dpif_send_packet(const struct ofport_dpif *ofport, struct ofpbuf *packet)
+ofproto_dpif_send_packet(const struct ofport_dpif *ofport, struct dp_packet *packet)
 {
     struct ofproto_dpif *ofproto = ofproto_dpif_cast(ofport->up.ofproto);
     int error;
@@ -3660,17 +4373,40 @@ ofproto_dpif_send_packet(const struct ofport_dpif *ofport, struct ofpbuf *packet
 
     ovs_mutex_lock(&ofproto->stats_mutex);
     ofproto->stats.tx_packets++;
-    ofproto->stats.tx_bytes += ofpbuf_size(packet);
+    ofproto->stats.tx_bytes += dp_packet_size(packet);
     ovs_mutex_unlock(&ofproto->stats_mutex);
     return error;
 }
+
+uint64_t
+group_dpif_get_selection_method_param(const struct group_dpif *group)
+{
+    return group->up.props.selection_method_param;
+}
+
+const struct field_array *
+group_dpif_get_fields(const struct group_dpif *group)
+{
+    return &group->up.props.fields;
+}
 \f
+/* Return the version string of the datapath that backs up
+ * this 'ofproto'.
+ */
+static const char *
+get_datapath_version(const struct ofproto *ofproto_)
+{
+    struct ofproto_dpif *ofproto = ofproto_dpif_cast(ofproto_);
+
+    return ofproto->backer->dp_version_string;
+}
+
 static bool
 set_frag_handling(struct ofproto *ofproto_,
-                  enum ofp_config_flags frag_handling)
+                  enum ofputil_frag_handling frag_handling)
 {
     struct ofproto_dpif *ofproto = ofproto_dpif_cast(ofproto_);
-    if (frag_handling != OFPC_FRAG_REASM) {
+    if (frag_handling != OFPUTIL_FRAG_REASM) {
         ofproto->backer->need_revalidate = REV_RECONFIGURE;
         return true;
     } else {
@@ -3679,7 +4415,7 @@ set_frag_handling(struct ofproto *ofproto_,
 }
 
 static enum ofperr
-packet_out(struct ofproto *ofproto_, struct ofpbuf *packet,
+packet_out(struct ofproto *ofproto_, struct dp_packet *packet,
            const struct flow *flow,
            const struct ofpact *ofpacts, size_t ofpacts_len)
 {
@@ -3689,6 +4425,39 @@ packet_out(struct ofproto *ofproto_, struct ofpbuf *packet,
                                  ofpacts_len, packet);
     return 0;
 }
+
+static enum ofperr
+nxt_resume(struct ofproto *ofproto_,
+           const struct ofputil_packet_in_private *pin)
+{
+    struct ofproto_dpif *ofproto = ofproto_dpif_cast(ofproto_);
+
+    /* Translate pin into datapath actions. */
+    uint64_t odp_actions_stub[1024 / 8];
+    struct ofpbuf odp_actions = OFPBUF_STUB_INITIALIZER(odp_actions_stub);
+    enum slow_path_reason slow;
+    enum ofperr error = xlate_resume(ofproto, pin, &odp_actions, &slow);
+
+    /* Steal 'pin->packet' and put it into a dp_packet. */
+    struct dp_packet packet;
+    dp_packet_init(&packet, pin->public.packet_len);
+    dp_packet_put(&packet, pin->public.packet, pin->public.packet_len);
+
+    /* Execute the datapath actions on the packet. */
+    struct dpif_execute execute = {
+        .actions = odp_actions.data,
+        .actions_len = odp_actions.size,
+        .needs_help = (slow & SLOW_ACTION) != 0,
+        .packet = &packet,
+    };
+    dpif_execute(ofproto->backer->dpif, &execute);
+
+    /* Clean up. */
+    ofpbuf_uninit(&odp_actions);
+    dp_packet_uninit(&packet);
+
+    return error;
+}
 \f
 /* NetFlow. */
 
@@ -3762,6 +4531,36 @@ ofproto_unixctl_fdb_flush(struct unixctl_conn *conn, int argc,
     unixctl_command_reply(conn, "table successfully flushed");
 }
 
+static void
+ofproto_unixctl_mcast_snooping_flush(struct unixctl_conn *conn, int argc,
+                                     const char *argv[], void *aux OVS_UNUSED)
+{
+    struct ofproto_dpif *ofproto;
+
+    if (argc > 1) {
+        ofproto = ofproto_dpif_lookup(argv[1]);
+        if (!ofproto) {
+            unixctl_command_reply_error(conn, "no such bridge");
+            return;
+        }
+
+        if (!mcast_snooping_enabled(ofproto->ms)) {
+            unixctl_command_reply_error(conn, "multicast snooping is disabled");
+            return;
+        }
+        mcast_snooping_mdb_flush(ofproto->ms);
+    } else {
+        HMAP_FOR_EACH (ofproto, all_ofproto_dpifs_node, &all_ofproto_dpifs) {
+            if (!mcast_snooping_enabled(ofproto->ms)) {
+                continue;
+            }
+            mcast_snooping_mdb_flush(ofproto->ms);
+        }
+    }
+
+    unixctl_command_reply(conn, "table successfully flushed");
+}
+
 static struct ofport_dpif *
 ofbundle_get_a_port(const struct ofbundle *bundle)
 {
@@ -3786,7 +4585,7 @@ ofproto_unixctl_fdb_show(struct unixctl_conn *conn, int argc OVS_UNUSED,
     ds_put_cstr(&ds, " port  VLAN  MAC                Age\n");
     ovs_rwlock_rdlock(&ofproto->ml->rwlock);
     LIST_FOR_EACH (e, lru_node, &ofproto->ml->lrus) {
-        struct ofbundle *bundle = e->port.p;
+        struct ofbundle *bundle = mac_entry_get_port(ofproto->ml, e);
         char name[OFP_MAX_PORT_NAME_LEN];
 
         ofputil_port_to_string(ofbundle_get_a_port(bundle)->up.ofp_port,
@@ -3800,19 +4599,76 @@ ofproto_unixctl_fdb_show(struct unixctl_conn *conn, int argc OVS_UNUSED,
     ds_destroy(&ds);
 }
 
+static void
+ofproto_unixctl_mcast_snooping_show(struct unixctl_conn *conn,
+                                    int argc OVS_UNUSED,
+                                    const char *argv[],
+                                    void *aux OVS_UNUSED)
+{
+    struct ds ds = DS_EMPTY_INITIALIZER;
+    const struct ofproto_dpif *ofproto;
+    const struct ofbundle *bundle;
+    const struct mcast_group *grp;
+    struct mcast_group_bundle *b;
+    struct mcast_mrouter_bundle *mrouter;
+
+    ofproto = ofproto_dpif_lookup(argv[1]);
+    if (!ofproto) {
+        unixctl_command_reply_error(conn, "no such bridge");
+        return;
+    }
+
+    if (!mcast_snooping_enabled(ofproto->ms)) {
+        unixctl_command_reply_error(conn, "multicast snooping is disabled");
+        return;
+    }
+
+    ds_put_cstr(&ds, " port  VLAN  GROUP                Age\n");
+    ovs_rwlock_rdlock(&ofproto->ms->rwlock);
+    LIST_FOR_EACH (grp, group_node, &ofproto->ms->group_lru) {
+        LIST_FOR_EACH(b, bundle_node, &grp->bundle_lru) {
+            char name[OFP_MAX_PORT_NAME_LEN];
+
+            bundle = b->port;
+            ofputil_port_to_string(ofbundle_get_a_port(bundle)->up.ofp_port,
+                                   name, sizeof name);
+            ds_put_format(&ds, "%5s  %4d  ", name, grp->vlan);
+            ipv6_format_mapped(&grp->addr, &ds);
+            ds_put_format(&ds, "         %3d\n",
+                          mcast_bundle_age(ofproto->ms, b));
+        }
+    }
+
+    /* ports connected to multicast routers */
+    LIST_FOR_EACH(mrouter, mrouter_node, &ofproto->ms->mrouter_lru) {
+        char name[OFP_MAX_PORT_NAME_LEN];
+
+        bundle = mrouter->port;
+        ofputil_port_to_string(ofbundle_get_a_port(bundle)->up.ofp_port,
+                               name, sizeof name);
+        ds_put_format(&ds, "%5s  %4d  querier             %3d\n",
+                      name, mrouter->vlan,
+                      mcast_mrouter_age(ofproto->ms, mrouter));
+    }
+    ovs_rwlock_unlock(&ofproto->ms->rwlock);
+    unixctl_command_reply(conn, ds_cstr(&ds));
+    ds_destroy(&ds);
+}
+
 struct trace_ctx {
     struct xlate_out xout;
     struct xlate_in xin;
     const struct flow *key;
     struct flow flow;
-    struct flow_wildcards wc;
     struct ds *result;
+    struct flow_wildcards wc;
+    struct ofpbuf odp_actions;
 };
 
 static void
 trace_format_rule(struct ds *result, int level, const struct rule_dpif *rule)
 {
-    struct rule_actions *actions;
+    const struct rule_actions *actions;
     ovs_be64 cookie;
 
     ds_put_char_multiple(result, '\t', level);
@@ -3873,12 +4729,11 @@ static void
 trace_format_odp(struct ds *result, int level, const char *title,
                  struct trace_ctx *trace)
 {
-    struct ofpbuf *odp_actions = &trace->xout.odp_actions;
+    struct ofpbuf *odp_actions = &trace->odp_actions;
 
     ds_put_char_multiple(result, '\t', level);
     ds_put_format(result, "%s: ", title);
-    format_odp_actions(result, ofpbuf_data(odp_actions),
-                               ofpbuf_size(odp_actions));
+    format_odp_actions(result, odp_actions->data, odp_actions->size);
     ds_put_char(result, '\n');
 }
 
@@ -3890,37 +4745,70 @@ trace_format_megaflow(struct ds *result, int level, const char *title,
 
     ds_put_char_multiple(result, '\t', level);
     ds_put_format(result, "%s: ", title);
-    flow_wildcards_or(&trace->wc, &trace->xout.wc, &trace->wc);
     match_init(&match, trace->key, &trace->wc);
     match_format(&match, result, OFP_DEFAULT_PRIORITY);
     ds_put_char(result, '\n');
 }
 
+static void trace_report(struct xlate_in *, int recurse,
+                         const char *format, ...)
+    OVS_PRINTF_FORMAT(3, 4);
+static void trace_report_valist(struct xlate_in *, int recurse,
+                                const char *format, va_list args)
+    OVS_PRINTF_FORMAT(3, 0);
+
 static void
 trace_resubmit(struct xlate_in *xin, struct rule_dpif *rule, int recurse)
 {
     struct trace_ctx *trace = CONTAINER_OF(xin, struct trace_ctx, xin);
     struct ds *result = trace->result;
 
+    if (!recurse) {
+        if (rule == xin->ofproto->miss_rule) {
+            trace_report(xin, recurse,
+                         "No match, flow generates \"packet in\"s.");
+        } else if (rule == xin->ofproto->no_packet_in_rule) {
+            trace_report(xin, recurse, "No match, packets dropped because "
+                         "OFPPC_NO_PACKET_IN is set on in_port.");
+        } else if (rule == xin->ofproto->drop_frags_rule) {
+            trace_report(xin, recurse, "Packets dropped because they are IP "
+                         "fragments and the fragment handling mode is "
+                         "\"drop\".");
+        }
+    }
+
     ds_put_char(result, '\n');
-    trace_format_flow(result, recurse + 1, "Resubmitted flow", trace);
-    trace_format_regs(result, recurse + 1, "Resubmitted regs", trace);
-    trace_format_odp(result,  recurse + 1, "Resubmitted  odp", trace);
-    trace_format_megaflow(result, recurse + 1, "Resubmitted megaflow", trace);
-    trace_format_rule(result, recurse + 1, rule);
+    if (recurse) {
+        trace_format_flow(result, recurse, "Resubmitted flow", trace);
+        trace_format_regs(result, recurse, "Resubmitted regs", trace);
+        trace_format_odp(result,  recurse, "Resubmitted  odp", trace);
+        trace_format_megaflow(result, recurse, "Resubmitted megaflow", trace);
+    }
+    trace_format_rule(result, recurse, rule);
 }
 
 static void
-trace_report(struct xlate_in *xin, const char *s, int recurse)
+trace_report_valist(struct xlate_in *xin, int recurse,
+                    const char *format, va_list args)
 {
     struct trace_ctx *trace = CONTAINER_OF(xin, struct trace_ctx, xin);
     struct ds *result = trace->result;
 
     ds_put_char_multiple(result, '\t', recurse);
-    ds_put_cstr(result, s);
+    ds_put_format_valist(result, format, args);
     ds_put_char(result, '\n');
 }
 
+static void
+trace_report(struct xlate_in *xin, int recurse, const char *format, ...)
+{
+    va_list args;
+
+    va_start(args, format);
+    trace_report_valist(xin, recurse, format, args);
+    va_end(args);
+}
+
 /* Parses the 'argc' elements of 'argv', ignoring argv[0].  The following
  * forms are supported:
  *
@@ -3929,16 +4817,16 @@ trace_report(struct xlate_in *xin, const char *s, int recurse)
  *
  * On success, initializes '*ofprotop' and 'flow' and returns NULL.  On failure
  * returns a nonnull malloced error message. */
-static char * WARN_UNUSED_RESULT
+static char * OVS_WARN_UNUSED_RESULT
 parse_flow_and_packet(int argc, const char *argv[],
                       struct ofproto_dpif **ofprotop, struct flow *flow,
-                      struct ofpbuf **packetp)
+                      struct dp_packet **packetp)
 {
     const struct dpif_backer *backer = NULL;
     const char *error = NULL;
     char *m_err = NULL;
     struct simap port_names = SIMAP_INITIALIZER(&port_names);
-    struct ofpbuf *packet;
+    struct dp_packet *packet;
     struct ofpbuf odp_key;
     struct ofpbuf odp_mask;
 
@@ -3947,7 +4835,7 @@ parse_flow_and_packet(int argc, const char *argv[],
 
     /* Handle "-generate" or a hex string as the last argument. */
     if (!strcmp(argv[argc - 1], "-generate")) {
-        packet = ofpbuf_new(0);
+        packet = dp_packet_new(0);
         argc--;
     } else {
         error = eth_from_hex(argv[argc - 1], &packet);
@@ -4002,17 +4890,25 @@ parse_flow_and_packet(int argc, const char *argv[],
             goto exit;
         }
 
-        if (xlate_receive(backer, NULL, ofpbuf_data(&odp_key),
-                          ofpbuf_size(&odp_key), flow,
-                          ofprotop, NULL, NULL, NULL, NULL)) {
+        if (odp_flow_key_to_flow(odp_key.data, odp_key.size, flow) == ODP_FIT_ERROR) {
+            error = "Failed to parse datapath flow key";
+            goto exit;
+        }
+
+        *ofprotop = xlate_lookup_ofproto(backer, flow,
+                                         &flow->in_port.ofp_port);
+        if (*ofprotop == NULL) {
             error = "Invalid datapath flow";
             goto exit;
         }
+
+        vsp_adjust_flow(*ofprotop, flow, NULL);
+
     } else {
         char *err = parse_ofp_exact_flow(flow, NULL, argv[argc - 1], NULL);
 
         if (err) {
-            m_err = xasprintf("Bad flow syntax: %s", err);
+            m_err = xasprintf("Bad openflow flow syntax: %s", err);
             free(err);
             goto exit;
         } else {
@@ -4031,14 +4927,13 @@ parse_flow_and_packet(int argc, const char *argv[],
 
     /* Generate a packet, if requested. */
     if (packet) {
-        if (!ofpbuf_size(packet)) {
+        if (!dp_packet_size(packet)) {
             flow_compose(packet, flow);
         } else {
-            struct pkt_metadata md = pkt_metadata_from_flow(flow);
-
             /* Use the metadata from the flow and the packet argument
              * to reconstruct the flow. */
-            flow_extract(packet, &md, flow);
+            pkt_metadata_from_flow(&packet->md, flow);
+            flow_extract(packet, flow);
         }
     }
 
@@ -4047,7 +4942,7 @@ exit:
         m_err = xstrdup(error);
     }
     if (m_err) {
-        ofpbuf_delete(packet);
+        dp_packet_delete(packet);
         packet = NULL;
     }
     *packetp = packet;
@@ -4062,7 +4957,7 @@ ofproto_unixctl_trace(struct unixctl_conn *conn, int argc, const char *argv[],
                       void *aux OVS_UNUSED)
 {
     struct ofproto_dpif *ofproto;
-    struct ofpbuf *packet;
+    struct dp_packet *packet;
     char *error;
     struct flow flow;
 
@@ -4074,7 +4969,7 @@ ofproto_unixctl_trace(struct unixctl_conn *conn, int argc, const char *argv[],
         ofproto_trace(ofproto, &flow, packet, NULL, 0, &result);
         unixctl_command_reply(conn, ds_cstr(&result));
         ds_destroy(&result);
-        ofpbuf_delete(packet);
+        dp_packet_delete(packet);
     } else {
         unixctl_command_reply_error(conn, error);
         free(error);
@@ -4089,7 +4984,7 @@ ofproto_unixctl_trace_actions(struct unixctl_conn *conn, int argc,
     struct ofproto_dpif *ofproto;
     bool enforce_consistency;
     struct ofpbuf ofpacts;
-    struct ofpbuf *packet;
+    struct dp_packet *packet;
     struct ds result;
     struct flow flow;
     uint16_t in_port;
@@ -4103,7 +4998,7 @@ ofproto_unixctl_trace_actions(struct unixctl_conn *conn, int argc,
     ofpbuf_init(&ofpacts, 0);
 
     /* Parse actions. */
-    error = parse_ofpacts(argv[--argc], &ofpacts, &usable_protocols);
+    error = ofpacts_parse_actions(argv[--argc], &ofpacts, &usable_protocols);
     if (error) {
         unixctl_command_reply_error(conn, error);
         free(error);
@@ -4130,7 +5025,7 @@ ofproto_unixctl_trace_actions(struct unixctl_conn *conn, int argc,
 
     /* Do the same checks as handle_packet_out() in ofproto.c.
      *
-     * We pass a 'table_id' of 0 to ofproto_check_ofpacts(), which isn't
+     * We pass a 'table_id' of 0 to ofpacts_check(), which isn't
      * strictly correct because these actions aren't in any table, but it's OK
      * because it 'table_id' is used only to check goto_table instructions, but
      * packet-outs take a list of actions and therefore it can't include
@@ -4144,13 +5039,18 @@ ofproto_unixctl_trace_actions(struct unixctl_conn *conn, int argc,
         goto exit;
     }
     if (enforce_consistency) {
-        retval = ofpacts_check_consistency(ofpbuf_data(&ofpacts), ofpbuf_size(&ofpacts),
-                                           &flow, u16_to_ofp(ofproto->up.max_ports),
-                                           0, 0, usable_protocols);
+        retval = ofpacts_check_consistency(ofpacts.data, ofpacts.size, &flow,
+                                           u16_to_ofp(ofproto->up.max_ports),
+                                           0, ofproto->up.n_tables,
+                                           usable_protocols);
     } else {
-        retval = ofpacts_check(ofpbuf_data(&ofpacts), ofpbuf_size(&ofpacts), &flow,
-                               u16_to_ofp(ofproto->up.max_ports), 0, 0,
-                               &usable_protocols);
+        retval = ofpacts_check(ofpacts.data, ofpacts.size, &flow,
+                               u16_to_ofp(ofproto->up.max_ports), 0,
+                               ofproto->up.n_tables, &usable_protocols);
+    }
+    if (!retval) {
+        retval = ofproto_check_ofpacts(&ofproto->up, ofpacts.data,
+                                       ofpacts.size);
     }
 
     if (retval) {
@@ -4161,12 +5061,12 @@ ofproto_unixctl_trace_actions(struct unixctl_conn *conn, int argc,
     }
 
     ofproto_trace(ofproto, &flow, packet,
-                  ofpbuf_data(&ofpacts), ofpbuf_size(&ofpacts), &result);
+                  ofpacts.data, ofpacts.size, &result);
     unixctl_command_reply(conn, ds_cstr(&result));
 
 exit:
     ds_destroy(&result);
-    ofpbuf_delete(packet);
+    dp_packet_delete(packet);
     ofpbuf_uninit(&ofpacts);
 }
 
@@ -4181,80 +5081,61 @@ exit:
  * trace, otherwise the actions are determined by a flow table lookup. */
 static void
 ofproto_trace(struct ofproto_dpif *ofproto, struct flow *flow,
-              const struct ofpbuf *packet,
+              const struct dp_packet *packet,
               const struct ofpact ofpacts[], size_t ofpacts_len,
               struct ds *ds)
 {
-    struct rule_dpif *rule;
     struct trace_ctx trace;
+    enum xlate_error error;
 
     ds_put_format(ds, "Bridge: %s\n", ofproto->up.name);
     ds_put_cstr(ds, "Flow: ");
     flow_format(ds, flow);
     ds_put_char(ds, '\n');
 
-    flow_wildcards_init_catchall(&trace.wc);
-    if (ofpacts) {
-        rule = NULL;
-    } else {
-        rule_dpif_lookup(ofproto, flow, &trace.wc, &rule);
-
-        trace_format_rule(ds, 0, rule);
-        if (rule == ofproto->miss_rule) {
-            ds_put_cstr(ds, "\nNo match, flow generates \"packet in\"s.\n");
-        } else if (rule == ofproto->no_packet_in_rule) {
-            ds_put_cstr(ds, "\nNo match, packets dropped because "
-                        "OFPPC_NO_PACKET_IN is set on in_port.\n");
-        } else if (rule == ofproto->drop_frags_rule) {
-            ds_put_cstr(ds, "\nPackets dropped because they are IP fragments "
-                        "and the fragment handling mode is \"drop\".\n");
-        }
-    }
-
-    if (rule || ofpacts) {
-        trace.result = ds;
-        trace.key = flow; /* Original flow key, used for megaflow. */
-        trace.flow = *flow; /* May be modified by actions. */
-        xlate_in_init(&trace.xin, ofproto, flow, rule, ntohs(flow->tcp_flags),
-                      packet);
-        if (ofpacts) {
-            trace.xin.ofpacts = ofpacts;
-            trace.xin.ofpacts_len = ofpacts_len;
-        }
-        trace.xin.resubmit_hook = trace_resubmit;
-        trace.xin.report_hook = trace_report;
+    ofpbuf_init(&trace.odp_actions, 0);
 
-        xlate_actions(&trace.xin, &trace.xout);
+    trace.result = ds;
+    trace.key = flow; /* Original flow key, used for megaflow. */
+    trace.flow = *flow; /* May be modified by actions. */
+    xlate_in_init(&trace.xin, ofproto, flow, flow->in_port.ofp_port, NULL,
+                  ntohs(flow->tcp_flags), packet, &trace.wc,
+                  &trace.odp_actions);
+    trace.xin.ofpacts = ofpacts;
+    trace.xin.ofpacts_len = ofpacts_len;
+    trace.xin.resubmit_hook = trace_resubmit;
+    trace.xin.report_hook = trace_report_valist;
 
-        ds_put_char(ds, '\n');
-        trace_format_flow(ds, 0, "Final flow", &trace);
-        trace_format_megaflow(ds, 0, "Megaflow", &trace);
+    error = xlate_actions(&trace.xin, &trace.xout);
+    ds_put_char(ds, '\n');
+    trace_format_flow(ds, 0, "Final flow", &trace);
+    trace_format_megaflow(ds, 0, "Megaflow", &trace);
 
-        ds_put_cstr(ds, "Datapath actions: ");
-        format_odp_actions(ds, ofpbuf_data(&trace.xout.odp_actions),
-                           ofpbuf_size(&trace.xout.odp_actions));
+    ds_put_cstr(ds, "Datapath actions: ");
+    format_odp_actions(ds, trace.odp_actions.data, trace.odp_actions.size);
 
-        if (trace.xout.slow) {
-            enum slow_path_reason slow;
+    if (error != XLATE_OK) {
+        ds_put_format(ds, "\nTranslation failed (%s), packet is dropped.\n",
+                      xlate_strerror(error));
+    } else if (trace.xout.slow) {
+        enum slow_path_reason slow;
 
-            ds_put_cstr(ds, "\nThis flow is handled by the userspace "
-                        "slow path because it:");
+        ds_put_cstr(ds, "\nThis flow is handled by the userspace "
+                    "slow path because it:");
 
-            slow = trace.xout.slow;
-            while (slow) {
-                enum slow_path_reason bit = rightmost_1bit(slow);
+        slow = trace.xout.slow;
+        while (slow) {
+            enum slow_path_reason bit = rightmost_1bit(slow);
 
-                ds_put_format(ds, "\n\t- %s.",
-                              slow_path_reason_to_explanation(bit));
+            ds_put_format(ds, "\n\t- %s.",
+                          slow_path_reason_to_explanation(bit));
 
-                slow &= ~bit;
-            }
+            slow &= ~bit;
         }
-
-        xlate_out_uninit(&trace.xout);
     }
 
-    rule_dpif_unref(rule);
+    xlate_out_uninit(&trace.xout);
+    ofpbuf_uninit(&trace.odp_actions);
 }
 
 /* Store the current ofprotos in 'ofproto_shash'.  Returns a sorted list
@@ -4384,38 +5265,23 @@ ofproto_unixctl_dpif_show(struct unixctl_conn *conn, int argc OVS_UNUSED,
     ds_destroy(&ds);
 }
 
-static bool
-ofproto_dpif_contains_flow(const struct ofproto_dpif *ofproto,
-                           const struct nlattr *key, size_t key_len)
-{
-    struct ofproto_dpif *ofp;
-    struct flow flow;
-
-    xlate_receive(ofproto->backer, NULL, key, key_len, &flow, &ofp,
-                  NULL, NULL, NULL, NULL);
-    return ofp == ofproto;
-}
-
 static void
 ofproto_unixctl_dpif_dump_flows(struct unixctl_conn *conn,
                                 int argc OVS_UNUSED, const char *argv[],
                                 void *aux OVS_UNUSED)
 {
-    struct ds ds = DS_EMPTY_INITIALIZER;
-    const struct dpif_flow_stats *stats;
     const struct ofproto_dpif *ofproto;
-    struct dpif_flow_dump flow_dump;
-    const struct nlattr *actions;
-    const struct nlattr *mask;
-    const struct nlattr *key;
-    size_t actions_len;
-    size_t mask_len;
-    size_t key_len;
+
+    struct ds ds = DS_EMPTY_INITIALIZER;
     bool verbosity = false;
+
     struct dpif_port dpif_port;
     struct dpif_port_dump port_dump;
     struct hmap portno_names;
-    void *state = NULL;
+
+    struct dpif_flow_dump *flow_dump;
+    struct dpif_flow_dump_thread *flow_dump_thread;
+    struct dpif_flow f;
     int error;
 
     ofproto = ofproto_dpif_lookup(argv[argc - 1]);
@@ -4434,30 +5300,31 @@ ofproto_unixctl_dpif_dump_flows(struct unixctl_conn *conn,
     }
 
     ds_init(&ds);
-    error = dpif_flow_dump_start(&flow_dump, ofproto->backer->dpif);
-    if (error) {
-        goto exit;
-    }
-    dpif_flow_dump_state_init(ofproto->backer->dpif, &state);
-    while (dpif_flow_dump_next(&flow_dump, state, &key, &key_len,
-                               &mask, &mask_len, &actions, &actions_len,
-                               &stats)) {
-        if (!ofproto_dpif_contains_flow(ofproto, key, key_len)) {
+    flow_dump = dpif_flow_dump_create(ofproto->backer->dpif, false);
+    flow_dump_thread = dpif_flow_dump_thread_create(flow_dump);
+    while (dpif_flow_dump_next(flow_dump_thread, &f, 1)) {
+        struct flow flow;
+
+        if (odp_flow_key_to_flow(f.key, f.key_len, &flow) == ODP_FIT_ERROR
+            || xlate_lookup_ofproto(ofproto->backer, &flow, NULL) != ofproto) {
             continue;
         }
 
-        odp_flow_format(key, key_len, mask, mask_len, &portno_names, &ds,
-                        verbosity);
+        if (verbosity) {
+            odp_format_ufid(&f.ufid, &ds);
+            ds_put_cstr(&ds, " ");
+        }
+        odp_flow_format(f.key, f.key_len, f.mask, f.mask_len,
+                        &portno_names, &ds, verbosity);
         ds_put_cstr(&ds, ", ");
-        dpif_flow_stats_format(stats, &ds);
+        dpif_flow_stats_format(&f.stats, &ds);
         ds_put_cstr(&ds, ", actions:");
-        format_odp_actions(&ds, actions, actions_len);
+        format_odp_actions(&ds, f.actions, f.actions_len);
         ds_put_char(&ds, '\n');
     }
-    dpif_flow_dump_state_uninit(ofproto->backer->dpif, state);
-    error = dpif_flow_dump_done(&flow_dump);
+    dpif_flow_dump_thread_destroy(flow_dump_thread);
+    error = dpif_flow_dump_destroy(flow_dump);
 
-exit:
     if (error) {
         ds_clear(&ds);
         ds_put_format(&ds, "dpif/dump_flows failed: %s", ovs_strerror(errno));
@@ -4471,7 +5338,38 @@ exit:
 }
 
 static void
-ofproto_dpif_unixctl_init(void)
+ofproto_revalidate_all_backers(void)
+{
+    const struct shash_node **backers;
+    int i;
+
+    backers = shash_sort(&all_dpif_backers);
+    for (i = 0; i < shash_count(&all_dpif_backers); i++) {
+        struct dpif_backer *backer = backers[i]->data;
+        backer->need_revalidate = REV_RECONFIGURE;
+    }
+    free(backers);
+}
+
+static void
+disable_tnl_push_pop(struct unixctl_conn *conn OVS_UNUSED, int argc OVS_UNUSED,
+                     const char *argv[], void *aux OVS_UNUSED)
+{
+    if (!strcasecmp(argv[1], "off")) {
+        ofproto_use_tnl_push_pop = false;
+        unixctl_command_reply(conn, "Tunnel push-pop off");
+        ofproto_revalidate_all_backers();
+    } else if (!strcasecmp(argv[1], "on")) {
+        ofproto_use_tnl_push_pop = true;
+        unixctl_command_reply(conn, "Tunnel push-pop on");
+        ofproto_revalidate_all_backers();
+    } else {
+        unixctl_command_reply_error(conn, "Invalid argument");
+    }
+}
+
+static void
+ofproto_unixctl_init(void)
 {
     static bool registered;
     if (registered) {
@@ -4491,20 +5389,27 @@ ofproto_dpif_unixctl_init(void)
                              ofproto_unixctl_fdb_flush, NULL);
     unixctl_command_register("fdb/show", "bridge", 1, 1,
                              ofproto_unixctl_fdb_show, NULL);
+    unixctl_command_register("mdb/flush", "[bridge]", 0, 1,
+                             ofproto_unixctl_mcast_snooping_flush, NULL);
+    unixctl_command_register("mdb/show", "bridge", 1, 1,
+                             ofproto_unixctl_mcast_snooping_show, NULL);
     unixctl_command_register("dpif/dump-dps", "", 0, 0,
                              ofproto_unixctl_dpif_dump_dps, NULL);
     unixctl_command_register("dpif/show", "", 0, 0, ofproto_unixctl_dpif_show,
                              NULL);
     unixctl_command_register("dpif/dump-flows", "[-m] bridge", 1, 2,
                              ofproto_unixctl_dpif_dump_flows, NULL);
-}
 
+    unixctl_command_register("ofproto/tnl-push-pop", "[on]|[off]", 1, 1,
+                             disable_tnl_push_pop, NULL);
+}
 
-/* Returns true if 'rule' is an internal rule, false otherwise. */
+/* Returns true if 'table' is the table used for internal rules,
+ * false otherwise. */
 bool
-rule_is_internal(const struct rule *rule)
+table_is_internal(uint8_t table_id)
 {
-    return rule->table_id == TBL_INTERNAL;
+    return table_id == TBL_INTERNAL;
 }
 \f
 /* Linux VLAN device support (e.g. "eth0.10" for VLAN 10.)
@@ -4653,11 +5558,13 @@ vsp_vlandev_to_realdev(const struct ofproto_dpif *ofproto,
 /* Given 'flow', a flow representing a packet received on 'ofproto', checks
  * whether 'flow->in_port' represents a Linux VLAN device.  If so, changes
  * 'flow->in_port' to the "real" device backing the VLAN device, sets
- * 'flow->vlan_tci' to the VLAN VID, and returns true.  Otherwise (which is
- * always the case unless VLAN splinters are enabled), returns false without
- * making any changes. */
+ * 'flow->vlan_tci' to the VLAN VID, and returns true.  Optionally pushes the
+ * appropriate VLAN on 'packet' if provided.  Otherwise (which is always the
+ * case unless VLAN splinters are enabled), returns false without making any
+ * changes. */
 bool
-vsp_adjust_flow(const struct ofproto_dpif *ofproto, struct flow *flow)
+vsp_adjust_flow(const struct ofproto_dpif *ofproto, struct flow *flow,
+                struct dp_packet *packet)
     OVS_EXCLUDED(ofproto->vsp_mutex)
 {
     ofp_port_t realdev;
@@ -4679,6 +5586,15 @@ vsp_adjust_flow(const struct ofproto_dpif *ofproto, struct flow *flow)
      * the VLAN device's VLAN ID. */
     flow->in_port.ofp_port = realdev;
     flow->vlan_tci = htons((vid & VLAN_VID_MASK) | VLAN_CFI);
+
+    if (packet) {
+        /* Make the packet resemble the flow, so that it gets sent to an
+         * OpenFlow controller properly, so that it looks correct for sFlow,
+         * and so that flow_extract() will get the correct vlan_tci if it is
+         * called on 'packet'. */
+        eth_push_vlan(packet, htons(ETH_TYPE_VLAN), flow->vlan_tci);
+    }
+
     return true;
 }
 
@@ -4733,7 +5649,7 @@ vsp_add(struct ofport_dpif *port, ofp_port_t realdev_ofp_port, int vid)
 static odp_port_t
 ofp_port_to_odp_port(const struct ofproto_dpif *ofproto, ofp_port_t ofp_port)
 {
-    const struct ofport_dpif *ofport = get_ofp_port(ofproto, ofp_port);
+    const struct ofport_dpif *ofport = ofp_port_to_ofport(ofproto, ofp_port);
     return ofport ? ofport->odp_port : ODPP_NONE;
 }
 
@@ -4768,49 +5684,30 @@ odp_port_to_ofp_port(const struct ofproto_dpif *ofproto, odp_port_t odp_port)
     }
 }
 
-uint32_t
-ofproto_dpif_alloc_recirc_id(struct ofproto_dpif *ofproto)
-{
-    struct dpif_backer *backer = ofproto->backer;
-
-    return  recirc_id_alloc(backer->rid_pool);
-}
-
-void
-ofproto_dpif_free_recirc_id(struct ofproto_dpif *ofproto, uint32_t recirc_id)
-{
-    struct dpif_backer *backer = ofproto->backer;
-
-    recirc_id_free(backer->rid_pool, recirc_id);
-}
-
 int
 ofproto_dpif_add_internal_flow(struct ofproto_dpif *ofproto,
-                               struct match *match, int priority,
+                               const struct match *match, int priority,
+                               uint16_t idle_timeout,
                                const struct ofpbuf *ofpacts,
                                struct rule **rulep)
 {
-    struct ofputil_flow_mod fm;
+    struct ofproto_flow_mod ofm;
     struct rule_dpif *rule;
     int error;
 
-    fm.match = *match;
-    fm.priority = priority;
-    fm.new_cookie = htonll(0);
-    fm.cookie = htonll(0);
-    fm.cookie_mask = htonll(0);
-    fm.modify_cookie = false;
-    fm.table_id = TBL_INTERNAL;
-    fm.command = OFPFC_ADD;
-    fm.idle_timeout = 0;
-    fm.hard_timeout = 0;
-    fm.buffer_id = 0;
-    fm.out_port = 0;
-    fm.flags = OFPUTIL_FF_HIDDEN_FIELDS | OFPUTIL_FF_NO_READONLY;
-    fm.ofpacts = ofpbuf_data(ofpacts);
-    fm.ofpacts_len = ofpbuf_size(ofpacts);
-
-    error = ofproto_flow_mod(&ofproto->up, &fm);
+    ofm.fm = (struct ofputil_flow_mod) {
+        .match = *match,
+        .priority = priority,
+        .table_id = TBL_INTERNAL,
+        .command = OFPFC_ADD,
+        .idle_timeout = idle_timeout,
+        .flags = OFPUTIL_FF_HIDDEN_FIELDS | OFPUTIL_FF_NO_READONLY,
+        .ofpacts = ofpacts->data,
+        .ofpacts_len = ofpacts->size,
+        .delete_reason = OVS_OFPRR_NONE,
+    };
+
+    error = ofproto_flow_mod(&ofproto->up, &ofm);
     if (error) {
         VLOG_ERR_RL(&rl, "failed to add internal flow (%s)",
                     ofperr_to_string(error));
@@ -4818,10 +5715,11 @@ ofproto_dpif_add_internal_flow(struct ofproto_dpif *ofproto,
         return error;
     }
 
-    rule = rule_dpif_lookup_in_table(ofproto, TBL_INTERNAL, &match->flow,
-                                     &match->wc);
+    rule = rule_dpif_lookup_in_table(ofproto,
+                                     ofproto_dpif_get_tables_version(ofproto),
+                                     TBL_INTERNAL, &ofm.fm.match.flow,
+                                     &ofm.fm.match.wc);
     if (rule) {
-        rule_dpif_unref(rule);
         *rulep = &rule->up;
     } else {
         OVS_NOT_REACHED();
@@ -4833,20 +5731,18 @@ int
 ofproto_dpif_delete_internal_flow(struct ofproto_dpif *ofproto,
                                   struct match *match, int priority)
 {
-    struct ofputil_flow_mod fm;
+    struct ofproto_flow_mod ofm;
     int error;
 
-    fm.match = *match;
-    fm.priority = priority;
-    fm.new_cookie = htonll(0);
-    fm.cookie = htonll(0);
-    fm.cookie_mask = htonll(0);
-    fm.modify_cookie = false;
-    fm.table_id = TBL_INTERNAL;
-    fm.flags = OFPUTIL_FF_HIDDEN_FIELDS | OFPUTIL_FF_NO_READONLY;
-    fm.command = OFPFC_DELETE_STRICT;
-
-    error = ofproto_flow_mod(&ofproto->up, &fm);
+    ofm.fm = (struct ofputil_flow_mod) {
+        .match = *match,
+        .priority = priority,
+        .table_id = TBL_INTERNAL,
+        .flags = OFPUTIL_FF_HIDDEN_FIELDS | OFPUTIL_FF_NO_READONLY,
+        .command = OFPFC_DELETE_STRICT,
+    };
+
+    error = ofproto_flow_mod(&ofproto->up, &ofm);
     if (error) {
         VLOG_ERR_RL(&rl, "failed to delete internal flow (%s)",
                     ofperr_to_string(error));
@@ -4856,6 +5752,12 @@ ofproto_dpif_delete_internal_flow(struct ofproto_dpif *ofproto,
     return 0;
 }
 
+const struct uuid *
+ofproto_dpif_get_uuid(const struct ofproto_dpif *ofproto)
+{
+    return &ofproto->uuid;
+}
+
 const struct ofproto_class ofproto_dpif_class = {
     init,
     enumerate_types,
@@ -4873,8 +5775,8 @@ const struct ofproto_class ofproto_dpif_class = {
     NULL,                       /* get_memory_usage. */
     type_get_memory_usage,
     flush,
-    get_features,
-    get_tables,
+    query_tables,
+    set_tables_version,
     port_alloc,
     port_construct,
     port_destruct,
@@ -4891,6 +5793,7 @@ const struct ofproto_class ofproto_dpif_class = {
     port_poll,
     port_poll_wait,
     port_is_lacp_current,
+    port_get_lacp_stats,
     NULL,                       /* rule_choose_table */
     rule_alloc,
     rule_construct,
@@ -4900,22 +5803,35 @@ const struct ofproto_class ofproto_dpif_class = {
     rule_dealloc,
     rule_get_stats,
     rule_execute,
-    rule_modify_actions,
     set_frag_handling,
     packet_out,
+    nxt_resume,
     set_netflow,
     get_netflow_ids,
     set_sflow,
     set_ipfix,
     set_cfm,
+    cfm_status_changed,
     get_cfm_status,
+    set_lldp,
+    get_lldp_status,
+    set_aa,
+    aa_mapping_set,
+    aa_mapping_unset,
+    aa_vlan_get_queued,
+    aa_vlan_get_queue_size,
     set_bfd,
+    bfd_status_changed,
     get_bfd_status,
     set_stp,
     get_stp_status,
     set_stp_port,
     get_stp_port_status,
     get_stp_port_stats,
+    set_rstp,
+    get_rstp_status,
+    set_rstp_port,
+    get_rstp_port_status,
     set_queues,
     bundle_set,
     bundle_remove,
@@ -4925,6 +5841,8 @@ const struct ofproto_class ofproto_dpif_class = {
     is_mirror_output_bundle,
     forward_bpdu_changed,
     set_mac_table_config,
+    set_mcast_snooping,
+    set_mcast_snooping_port,
     set_realdev,
     NULL,                       /* meter_get_features */
     NULL,                       /* meter_set */
@@ -4936,4 +5854,5 @@ const struct ofproto_class ofproto_dpif_class = {
     group_dealloc,              /* group_dealloc */
     group_modify,               /* group_modify */
     group_get_stats,            /* group_get_stats */
+    get_datapath_version,       /* get_datapath_version */
 };