dpif_packet: Rename to dp_packet
[cascardo/ovs.git] / lib / dpif-netdev.c
index f184a35..70ef97b 100644 (file)
@@ -33,6 +33,7 @@
 
 #include "cmap.h"
 #include "csum.h"
+#include "dp-packet.h"
 #include "dpif.h"
 #include "dpif-provider.h"
 #include "dummy.h"
@@ -54,7 +55,6 @@
 #include "ofpbuf.h"
 #include "ovs-numa.h"
 #include "ovs-rcu.h"
-#include "packet-dpif.h"
 #include "packets.h"
 #include "poll-loop.h"
 #include "pvector.h"
@@ -66,7 +66,7 @@
 #include "tnl-arp-cache.h"
 #include "unixctl.h"
 #include "util.h"
-#include "vlog.h"
+#include "openvswitch/vlog.h"
 
 VLOG_DEFINE_THIS_MODULE(dpif_netdev);
 
@@ -93,7 +93,7 @@ struct netdev_flow_key {
     uint32_t hash;       /* Hash function differs for different users. */
     uint32_t len;        /* Length of the following miniflow (incl. map). */
     struct miniflow mf;
-    uint32_t buf[FLOW_MAX_PACKET_U32S - MINI_N_INLINE];
+    uint64_t buf[FLOW_MAX_PACKET_U64S - MINI_N_INLINE];
 };
 
 /* Exact match cache for frequently used flows
@@ -177,7 +177,6 @@ static bool dpcls_lookup(const struct dpcls *cls,
  *
  *    dp_netdev_mutex (global)
  *    port_mutex
- *    flow_mutex
  */
 struct dp_netdev {
     const struct dpif_class *const class;
@@ -186,20 +185,6 @@ struct dp_netdev {
     struct ovs_refcount ref_cnt;
     atomic_flag destroyed;
 
-    /* Flows.
-     *
-     * Writers of 'flow_table' must take the 'flow_mutex'.  Corresponding
-     * changes to 'cls' must be made while still holding the 'flow_mutex'.
-     */
-    struct ovs_mutex flow_mutex;
-    struct dpcls cls;
-    struct cmap flow_table OVS_GUARDED; /* Flow table. */
-
-    /* Statistics.
-     *
-     * ovsthread_stats is internally synchronized. */
-    struct ovsthread_stats stats; /* Contains 'struct dp_netdev_stats *'. */
-
     /* Ports.
      *
      * Protected by RCU.  Take the mutex to add or remove ports. */
@@ -241,15 +226,6 @@ enum dp_stat_type {
     DP_N_STATS
 };
 
-/* Contained by struct dp_netdev's 'stats' member.  */
-struct dp_netdev_stats {
-    struct ovs_mutex mutex;          /* Protects 'n'. */
-
-    /* Indexed by DP_STAT_*, protected by 'mutex'. */
-    unsigned long long int n[DP_N_STATS] OVS_GUARDED;
-};
-
-
 /* A port in a netdev-based datapath. */
 struct dp_netdev_port {
     struct cmap_node node;      /* Node in dp_netdev's 'ports'. */
@@ -261,15 +237,22 @@ struct dp_netdev_port {
     char *type;                 /* Port type as requested by user. */
 };
 
-\f
-/* A flow in dp_netdev's 'flow_table'.
+/* Contained by struct dp_netdev_flow's 'stats' member.  */
+struct dp_netdev_flow_stats {
+    long long int used;             /* Last used time, in monotonic msecs. */
+    long long int packet_count;     /* Number of packets matched. */
+    long long int byte_count;       /* Number of bytes matched. */
+    uint16_t tcp_flags;             /* Bitwise-OR of seen tcp_flags values. */
+};
+
+/* A flow in 'dp_netdev_pmd_thread's 'flow_table'.
  *
  *
  * Thread-safety
  * =============
  *
  * Except near the beginning or ending of its lifespan, rule 'rule' belongs to
- * its dp_netdev's classifier.  The text below calls this classifier 'cls'.
+ * its pmd thread's classifier.  The text below calls this classifier 'cls'.
  *
  * Motivation
  * ----------
@@ -303,8 +286,12 @@ struct dp_netdev_flow {
     bool dead;
 
     /* Hash table index by unmasked flow. */
-    const struct cmap_node node; /* In owning dp_netdev's 'flow_table'. */
+    const struct cmap_node node; /* In owning dp_netdev_pmd_thread's */
+                                 /* 'flow_table'. */
+    const ovs_u128 ufid;         /* Unique flow identifier. */
     const struct flow flow;      /* Unmasked flow that created this entry. */
+    const int pmd_id;            /* The 'core_id' of pmd thread owning this */
+                                 /* flow. */
 
     /* Number of references.
      * The classifier owns one reference.
@@ -312,10 +299,8 @@ struct dp_netdev_flow {
      * reference. */
     struct ovs_refcount ref_cnt;
 
-    /* Statistics.
-     *
-     * Reading or writing these members requires 'mutex'. */
-    struct ovsthread_stats stats; /* Contains "struct dp_netdev_flow_stats". */
+    /* Statistics. */
+    struct dp_netdev_flow_stats stats;
 
     /* Actions. */
     OVSRCU_TYPE(struct dp_netdev_actions *) actions;
@@ -327,16 +312,8 @@ struct dp_netdev_flow {
 
 static void dp_netdev_flow_unref(struct dp_netdev_flow *);
 static bool dp_netdev_flow_ref(struct dp_netdev_flow *);
-
-/* Contained by struct dp_netdev_flow's 'stats' member.  */
-struct dp_netdev_flow_stats {
-    struct ovs_mutex mutex;         /* Guards all the other members. */
-
-    long long int used OVS_GUARDED; /* Last used time, in monotonic msecs. */
-    long long int packet_count OVS_GUARDED; /* Number of packets matched. */
-    long long int byte_count OVS_GUARDED;   /* Number of bytes matched. */
-    uint16_t tcp_flags OVS_GUARDED; /* Bitwise-OR of seen tcp_flags values. */
-};
+static int dpif_netdev_flow_from_nlattrs(const struct nlattr *, uint32_t,
+                                         struct flow *);
 
 /* A set of datapath actions within a "struct dp_netdev_flow".
  *
@@ -358,20 +335,31 @@ struct dp_netdev_actions *dp_netdev_flow_get_actions(
     const struct dp_netdev_flow *);
 static void dp_netdev_actions_free(struct dp_netdev_actions *);
 
+/* Contained by struct dp_netdev_pmd_thread's 'stats' member.  */
+struct dp_netdev_pmd_stats {
+    /* Indexed by DP_STAT_*. */
+    unsigned long long int n[DP_N_STATS];
+};
+
 /* PMD: Poll modes drivers.  PMD accesses devices via polling to eliminate
  * the performance overhead of interrupt processing.  Therefore netdev can
  * not implement rx-wait for these devices.  dpif-netdev needs to poll
  * these device to check for recv buffer.  pmd-thread does polling for
- * devices assigned to itself thread.
+ * devices assigned to itself.
  *
  * DPDK used PMD for accessing NIC.
  *
  * Note, instance with cpu core id NON_PMD_CORE_ID will be reserved for
  * I/O of all non-pmd threads.  There will be no actual thread created
  * for the instance.
- **/
+ *
+ * Each struct has its own flow table and classifier.  Packets received
+ * from managed ports are looked up in the corresponding pmd thread's
+ * flow table, and are executed with the found actions.
+ * */
 struct dp_netdev_pmd_thread {
     struct dp_netdev *dp;
+    struct ovs_refcount ref_cnt;    /* Every reference must be refcount'ed. */
     struct cmap_node node;          /* In 'dp->poll_threads'. */
 
     pthread_cond_t cond;            /* For synchronizing pmd thread reload. */
@@ -382,6 +370,19 @@ struct dp_netdev_pmd_thread {
      * need to be protected (e.g. by 'dp_netdev_mutex').  All other
      * instances will only be accessed by its own pmd thread. */
     struct emc_cache flow_cache;
+
+    /* Classifier and Flow-Table.
+     *
+     * Writers of 'flow_table' must take the 'flow_mutex'.  Corresponding
+     * changes to 'cls' must be made while still holding the 'flow_mutex'.
+     */
+    struct ovs_mutex flow_mutex;
+    struct dpcls cls;
+    struct cmap flow_table OVS_GUARDED; /* Flow table. */
+
+    /* Statistics. */
+    struct dp_netdev_pmd_stats stats;
+
     struct latch exit_latch;        /* For terminating the pmd thread. */
     atomic_uint change_seq;         /* For reloading pmd ports. */
     pthread_t thread;
@@ -406,7 +407,6 @@ static int get_port_by_name(struct dp_netdev *dp, const char *devname,
                             struct dp_netdev_port **portp);
 static void dp_netdev_free(struct dp_netdev *)
     OVS_REQUIRES(dp_netdev_mutex);
-static void dp_netdev_flow_flush(struct dp_netdev *);
 static int do_add_port(struct dp_netdev *dp, const char *devname,
                        const char *type, odp_port_t port_no)
     OVS_REQUIRES(dp->port_mutex);
@@ -415,24 +415,31 @@ static void do_del_port(struct dp_netdev *dp, struct dp_netdev_port *)
 static int dpif_netdev_open(const struct dpif_class *, const char *name,
                             bool create, struct dpif **);
 static void dp_netdev_execute_actions(struct dp_netdev_pmd_thread *pmd,
-                                      struct dpif_packet **, int c,
+                                      struct dp_packet **, int c,
                                       bool may_steal,
                                       const struct nlattr *actions,
                                       size_t actions_len);
 static void dp_netdev_input(struct dp_netdev_pmd_thread *,
-                            struct dpif_packet **, int cnt);
+                            struct dp_packet **, int cnt);
 
 static void dp_netdev_disable_upcall(struct dp_netdev *);
 void dp_netdev_pmd_reload_done(struct dp_netdev_pmd_thread *pmd);
 static void dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd,
                                     struct dp_netdev *dp, int index,
                                     int core_id, int numa_id);
+static void dp_netdev_destroy_pmd(struct dp_netdev_pmd_thread *pmd);
 static void dp_netdev_set_nonpmd(struct dp_netdev *dp);
-static struct dp_netdev_pmd_thread *dp_netdev_get_nonpmd(struct dp_netdev *dp);
+static struct dp_netdev_pmd_thread *dp_netdev_get_pmd(struct dp_netdev *dp,
+                                                      int core_id);
+static struct dp_netdev_pmd_thread *
+dp_netdev_pmd_get_next(struct dp_netdev *dp, struct cmap_position *pos);
 static void dp_netdev_destroy_all_pmds(struct dp_netdev *dp);
 static void dp_netdev_del_pmds_on_numa(struct dp_netdev *dp, int numa_id);
 static void dp_netdev_set_pmds_on_numa(struct dp_netdev *dp, int numa_id);
 static void dp_netdev_reset_pmd_threads(struct dp_netdev *dp);
+static bool dp_netdev_pmd_try_ref(struct dp_netdev_pmd_thread *pmd);
+static void dp_netdev_pmd_unref(struct dp_netdev_pmd_thread *pmd);
+static void dp_netdev_pmd_flow_flush(struct dp_netdev_pmd_thread *pmd);
 
 static inline bool emc_entry_alive(struct emc_entry *ce);
 static void emc_clear_entry(struct emc_entry *ce);
@@ -600,12 +607,6 @@ create_dp_netdev(const char *name, const struct dpif_class *class,
     ovs_refcount_init(&dp->ref_cnt);
     atomic_flag_clear(&dp->destroyed);
 
-    ovs_mutex_init(&dp->flow_mutex);
-    dpcls_init(&dp->cls);
-    cmap_init(&dp->flow_table);
-
-    ovsthread_stats_init(&dp->stats);
-
     ovs_mutex_init(&dp->port_mutex);
     cmap_init(&dp->ports);
     dp->port_seq = seq_create();
@@ -682,8 +683,6 @@ dp_netdev_free(struct dp_netdev *dp)
     OVS_REQUIRES(dp_netdev_mutex)
 {
     struct dp_netdev_port *port;
-    struct dp_netdev_stats *bucket;
-    int i;
 
     shash_find_and_delete(&dp_netdevs, dp->name);
 
@@ -692,22 +691,12 @@ dp_netdev_free(struct dp_netdev *dp)
     ovs_mutex_destroy(&dp->non_pmd_mutex);
     ovsthread_key_delete(dp->per_pmd_key);
 
-    dp_netdev_flow_flush(dp);
     ovs_mutex_lock(&dp->port_mutex);
     CMAP_FOR_EACH (port, node, &dp->ports) {
         do_del_port(dp, port);
     }
     ovs_mutex_unlock(&dp->port_mutex);
 
-    OVSTHREAD_STATS_FOR_EACH_BUCKET (bucket, i, &dp->stats) {
-        ovs_mutex_destroy(&bucket->mutex);
-        free_cacheline(bucket);
-    }
-    ovsthread_stats_destroy(&dp->stats);
-
-    dpcls_destroy(&dp->cls);
-    cmap_destroy(&dp->flow_table);
-    ovs_mutex_destroy(&dp->flow_mutex);
     seq_destroy(dp->port_seq);
     cmap_destroy(&dp->ports);
 
@@ -761,18 +750,14 @@ static int
 dpif_netdev_get_stats(const struct dpif *dpif, struct dpif_dp_stats *stats)
 {
     struct dp_netdev *dp = get_dp_netdev(dpif);
-    struct dp_netdev_stats *bucket;
-    size_t i;
-
-    stats->n_flows = cmap_count(&dp->flow_table);
+    struct dp_netdev_pmd_thread *pmd;
 
-    stats->n_hit = stats->n_missed = stats->n_lost = 0;
-    OVSTHREAD_STATS_FOR_EACH_BUCKET (bucket, i, &dp->stats) {
-        ovs_mutex_lock(&bucket->mutex);
-        stats->n_hit += bucket->n[DP_STAT_HIT];
-        stats->n_missed += bucket->n[DP_STAT_MISS];
-        stats->n_lost += bucket->n[DP_STAT_LOST];
-        ovs_mutex_unlock(&bucket->mutex);
+    stats->n_flows = stats->n_hit = stats->n_missed = stats->n_lost = 0;
+    CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
+        stats->n_flows += cmap_count(&pmd->flow_table);
+        stats->n_hit += pmd->stats.n[DP_STAT_HIT];
+        stats->n_missed += pmd->stats.n[DP_STAT_MISS];
+        stats->n_lost += pmd->stats.n[DP_STAT_LOST];
     }
     stats->n_masks = UINT32_MAX;
     stats->n_mask_hit = UINT64_MAX;
@@ -1136,15 +1121,6 @@ dpif_netdev_port_query_by_name(const struct dpif *dpif, const char *devname,
 static void
 dp_netdev_flow_free(struct dp_netdev_flow *flow)
 {
-    struct dp_netdev_flow_stats *bucket;
-    size_t i;
-
-    OVSTHREAD_STATS_FOR_EACH_BUCKET (bucket, i, &flow->stats) {
-        ovs_mutex_destroy(&bucket->mutex);
-        free_cacheline(bucket);
-    }
-    ovsthread_stats_destroy(&flow->stats);
-
     dp_netdev_actions_free(dp_netdev_flow_get_actions(flow));
     free(flow);
 }
@@ -1156,37 +1132,48 @@ static void dp_netdev_flow_unref(struct dp_netdev_flow *flow)
     }
 }
 
+static uint32_t
+dp_netdev_flow_hash(const ovs_u128 *ufid)
+{
+    return ufid->u32[0];
+}
+
 static void
-dp_netdev_remove_flow(struct dp_netdev *dp, struct dp_netdev_flow *flow)
-    OVS_REQUIRES(dp->flow_mutex)
+dp_netdev_pmd_remove_flow(struct dp_netdev_pmd_thread *pmd,
+                          struct dp_netdev_flow *flow)
+    OVS_REQUIRES(pmd->flow_mutex)
 {
     struct cmap_node *node = CONST_CAST(struct cmap_node *, &flow->node);
 
-    dpcls_remove(&dp->cls, &flow->cr);
-    cmap_remove(&dp->flow_table, node, flow_hash(&flow->flow, 0));
+    dpcls_remove(&pmd->cls, &flow->cr);
+    cmap_remove(&pmd->flow_table, node, dp_netdev_flow_hash(&flow->ufid));
     flow->dead = true;
 
     dp_netdev_flow_unref(flow);
 }
 
 static void
-dp_netdev_flow_flush(struct dp_netdev *dp)
+dp_netdev_pmd_flow_flush(struct dp_netdev_pmd_thread *pmd)
 {
     struct dp_netdev_flow *netdev_flow;
 
-    ovs_mutex_lock(&dp->flow_mutex);
-    CMAP_FOR_EACH (netdev_flow, node, &dp->flow_table) {
-        dp_netdev_remove_flow(dp, netdev_flow);
+    ovs_mutex_lock(&pmd->flow_mutex);
+    CMAP_FOR_EACH (netdev_flow, node, &pmd->flow_table) {
+        dp_netdev_pmd_remove_flow(pmd, netdev_flow);
     }
-    ovs_mutex_unlock(&dp->flow_mutex);
+    ovs_mutex_unlock(&pmd->flow_mutex);
 }
 
 static int
 dpif_netdev_flow_flush(struct dpif *dpif)
 {
     struct dp_netdev *dp = get_dp_netdev(dpif);
+    struct dp_netdev_pmd_thread *pmd;
+
+    CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
+        dp_netdev_pmd_flow_flush(pmd);
+    }
 
-    dp_netdev_flow_flush(dp);
     return 0;
 }
 
@@ -1356,8 +1343,8 @@ static inline void
 netdev_flow_mask_init(struct netdev_flow_key *mask,
                       const struct match *match)
 {
-    const uint32_t *mask_u32 = (const uint32_t *) &match->wc.masks;
-    uint32_t *dst = mask->mf.inline_values;
+    const uint64_t *mask_u64 = (const uint64_t *) &match->wc.masks;
+    uint64_t *dst = mask->mf.inline_values;
     uint64_t map, mask_map = 0;
     uint32_t hash = 0;
     int n;
@@ -1369,10 +1356,10 @@ netdev_flow_mask_init(struct netdev_flow_key *mask,
         uint64_t rm1bit = rightmost_1bit(map);
         int i = raw_ctz(map);
 
-        if (mask_u32[i]) {
+        if (mask_u64[i]) {
             mask_map |= rm1bit;
-            *dst++ = mask_u32[i];
-            hash = hash_add(hash, mask_u32[i]);
+            *dst++ = mask_u64[i];
+            hash = hash_add64(hash, mask_u64[i]);
         }
         map -= rm1bit;
     }
@@ -1380,12 +1367,11 @@ netdev_flow_mask_init(struct netdev_flow_key *mask,
     mask->mf.values_inline = true;
     mask->mf.map = mask_map;
 
-    hash = hash_add(hash, mask_map);
-    hash = hash_add(hash, mask_map >> 32);
+    hash = hash_add64(hash, mask_map);
 
     n = dst - mask->mf.inline_values;
 
-    mask->hash = hash_finish(hash, n * 4);
+    mask->hash = hash_finish(hash, n * 8);
     mask->len = netdev_flow_key_size(n);
 }
 
@@ -1395,23 +1381,23 @@ netdev_flow_key_init_masked(struct netdev_flow_key *dst,
                             const struct flow *flow,
                             const struct netdev_flow_key *mask)
 {
-    uint32_t *dst_u32 = dst->mf.inline_values;
-    const uint32_t *mask_u32 = mask->mf.inline_values;
+    uint64_t *dst_u64 = dst->mf.inline_values;
+    const uint64_t *mask_u64 = mask->mf.inline_values;
     uint32_t hash = 0;
-    uint32_t value;
+    uint64_t value;
 
     dst->len = mask->len;
     dst->mf.values_inline = true;
     dst->mf.map = mask->mf.map;
 
     FLOW_FOR_EACH_IN_MAP(value, flow, mask->mf.map) {
-        *dst_u32 = value & *mask_u32++;
-        hash = hash_add(hash, *dst_u32++);
+        *dst_u64 = value & *mask_u64++;
+        hash = hash_add64(hash, *dst_u64++);
     }
-    dst->hash = hash_finish(hash, (dst_u32 - dst->mf.inline_values) * 4);
+    dst->hash = hash_finish(hash, (dst_u64 - dst->mf.inline_values) * 8);
 }
 
-/* Iterate through all netdev_flow_key u32 values specified by 'MAP' */
+/* Iterate through all netdev_flow_key u64 values specified by 'MAP' */
 #define NETDEV_FLOW_KEY_FOR_EACH_IN_MAP(VALUE, KEY, MAP)           \
     for (struct mf_for_each_in_map_aux aux__                       \
              = { (KEY)->mf.inline_values, (KEY)->mf.map, MAP };    \
@@ -1424,15 +1410,15 @@ static inline uint32_t
 netdev_flow_key_hash_in_mask(const struct netdev_flow_key *key,
                              const struct netdev_flow_key *mask)
 {
-    const uint32_t *p = mask->mf.inline_values;
+    const uint64_t *p = mask->mf.inline_values;
     uint32_t hash = 0;
-    uint32_t key_u32;
+    uint64_t key_u64;
 
-    NETDEV_FLOW_KEY_FOR_EACH_IN_MAP(key_u32, key, mask->mf.map) {
-        hash = hash_add(hash, key_u32 & *p++);
+    NETDEV_FLOW_KEY_FOR_EACH_IN_MAP(key_u64, key, mask->mf.map) {
+        hash = hash_add64(hash, key_u64 & *p++);
     }
 
-    return hash_finish(hash, (p - mask->mf.inline_values) * 4);
+    return hash_finish(hash, (p - mask->mf.inline_values) * 8);
 }
 
 static inline bool
@@ -1518,27 +1504,40 @@ emc_lookup(struct emc_cache *cache, const struct netdev_flow_key *key)
 }
 
 static struct dp_netdev_flow *
-dp_netdev_lookup_flow(const struct dp_netdev *dp,
-                      const struct netdev_flow_key *key)
+dp_netdev_pmd_lookup_flow(const struct dp_netdev_pmd_thread *pmd,
+                          const struct netdev_flow_key *key)
 {
     struct dp_netdev_flow *netdev_flow;
     struct dpcls_rule *rule;
 
-    dpcls_lookup(&dp->cls, key, &rule, 1);
+    dpcls_lookup(&pmd->cls, key, &rule, 1);
     netdev_flow = dp_netdev_flow_cast(rule);
 
     return netdev_flow;
 }
 
 static struct dp_netdev_flow *
-dp_netdev_find_flow(const struct dp_netdev *dp, const struct flow *flow)
+dp_netdev_pmd_find_flow(const struct dp_netdev_pmd_thread *pmd,
+                        const ovs_u128 *ufidp, const struct nlattr *key,
+                        size_t key_len)
 {
     struct dp_netdev_flow *netdev_flow;
+    struct flow flow;
+    ovs_u128 ufid;
+
+    /* If a UFID is not provided, determine one based on the key. */
+    if (!ufidp && key && key_len
+        && !dpif_netdev_flow_from_nlattrs(key, key_len, &flow)) {
+        dpif_flow_hash(pmd->dp->dpif, &flow, sizeof flow, &ufid);
+        ufidp = &ufid;
+    }
 
-    CMAP_FOR_EACH_WITH_HASH (netdev_flow, node, flow_hash(flow, 0),
-                             &dp->flow_table) {
-        if (flow_equal(&netdev_flow->flow, flow)) {
-            return netdev_flow;
+    if (ufidp) {
+        CMAP_FOR_EACH_WITH_HASH (netdev_flow, node, dp_netdev_flow_hash(ufidp),
+                                 &pmd->flow_table) {
+            if (ovs_u128_equal(&netdev_flow->ufid, ufidp)) {
+                return netdev_flow;
+            }
         }
     }
 
@@ -1549,38 +1548,54 @@ static void
 get_dpif_flow_stats(const struct dp_netdev_flow *netdev_flow,
                     struct dpif_flow_stats *stats)
 {
-    struct dp_netdev_flow_stats *bucket;
-    size_t i;
-
-    memset(stats, 0, sizeof *stats);
-    OVSTHREAD_STATS_FOR_EACH_BUCKET (bucket, i, &netdev_flow->stats) {
-        ovs_mutex_lock(&bucket->mutex);
-        stats->n_packets += bucket->packet_count;
-        stats->n_bytes += bucket->byte_count;
-        stats->used = MAX(stats->used, bucket->used);
-        stats->tcp_flags |= bucket->tcp_flags;
-        ovs_mutex_unlock(&bucket->mutex);
-    }
+    stats->n_packets = netdev_flow->stats.packet_count;
+    stats->n_bytes = netdev_flow->stats.byte_count;
+    stats->used = netdev_flow->stats.used;
+    stats->tcp_flags = netdev_flow->stats.tcp_flags;
 }
 
+/* Converts to the dpif_flow format, using 'key_buf' and 'mask_buf' for
+ * storing the netlink-formatted key/mask. 'key_buf' may be the same as
+ * 'mask_buf'. Actions will be returned without copying, by relying on RCU to
+ * protect them. */
 static void
 dp_netdev_flow_to_dpif_flow(const struct dp_netdev_flow *netdev_flow,
-                            struct ofpbuf *buffer, struct dpif_flow *flow)
+                            struct ofpbuf *key_buf, struct ofpbuf *mask_buf,
+                            struct dpif_flow *flow, bool terse)
 {
-    struct flow_wildcards wc;
-    struct dp_netdev_actions *actions;
+    if (terse) {
+        memset(flow, 0, sizeof *flow);
+    } else {
+        struct flow_wildcards wc;
+        struct dp_netdev_actions *actions;
+        size_t offset;
 
-    miniflow_expand(&netdev_flow->cr.mask->mf, &wc.masks);
-    odp_flow_key_from_mask(buffer, &wc.masks, &netdev_flow->flow,
-                           odp_to_u32(wc.masks.in_port.odp_port),
-                           SIZE_MAX, true);
-    flow->mask = ofpbuf_data(buffer);
-    flow->mask_len = ofpbuf_size(buffer);
+        miniflow_expand(&netdev_flow->cr.mask->mf, &wc.masks);
 
-    actions = dp_netdev_flow_get_actions(netdev_flow);
-    flow->actions = actions->actions;
-    flow->actions_len = actions->size;
+        /* Key */
+        offset = ofpbuf_size(key_buf);
+        flow->key = ofpbuf_tail(key_buf);
+        odp_flow_key_from_flow(key_buf, &netdev_flow->flow, &wc.masks,
+                               netdev_flow->flow.in_port.odp_port, true);
+        flow->key_len = ofpbuf_size(key_buf) - offset;
 
+        /* Mask */
+        offset = ofpbuf_size(mask_buf);
+        flow->mask = ofpbuf_tail(mask_buf);
+        odp_flow_key_from_mask(mask_buf, &wc.masks, &netdev_flow->flow,
+                               odp_to_u32(wc.masks.in_port.odp_port),
+                               SIZE_MAX, true);
+        flow->mask_len = ofpbuf_size(mask_buf) - offset;
+
+        /* Actions */
+        actions = dp_netdev_flow_get_actions(netdev_flow);
+        flow->actions = actions->actions;
+        flow->actions_len = actions->size;
+    }
+
+    flow->ufid = netdev_flow->ufid;
+    flow->ufid_present = true;
+    flow->pmd_id = netdev_flow->pmd_id;
     get_dpif_flow_stats(netdev_flow, &flow->stats);
 }
 
@@ -1681,29 +1696,34 @@ dpif_netdev_flow_get(const struct dpif *dpif, const struct dpif_flow_get *get)
 {
     struct dp_netdev *dp = get_dp_netdev(dpif);
     struct dp_netdev_flow *netdev_flow;
-    struct flow key;
-    int error;
+    struct dp_netdev_pmd_thread *pmd;
+    int pmd_id = get->pmd_id == PMD_ID_NULL ? NON_PMD_CORE_ID : get->pmd_id;
+    int error = 0;
 
-    error = dpif_netdev_flow_from_nlattrs(get->key, get->key_len, &key);
-    if (error) {
-        return error;
+    pmd = dp_netdev_get_pmd(dp, pmd_id);
+    if (!pmd) {
+        return EINVAL;
     }
 
-    netdev_flow = dp_netdev_find_flow(dp, &key);
-
+    netdev_flow = dp_netdev_pmd_find_flow(pmd, get->ufid, get->key,
+                                          get->key_len);
     if (netdev_flow) {
-        dp_netdev_flow_to_dpif_flow(netdev_flow, get->buffer, get->flow);
-     } else {
+        dp_netdev_flow_to_dpif_flow(netdev_flow, get->buffer, get->buffer,
+                                    get->flow, false);
+    } else {
         error = ENOENT;
     }
+    dp_netdev_pmd_unref(pmd);
+
 
     return error;
 }
 
 static struct dp_netdev_flow *
-dp_netdev_flow_add(struct dp_netdev *dp, struct match *match,
+dp_netdev_flow_add(struct dp_netdev_pmd_thread *pmd,
+                   struct match *match, const ovs_u128 *ufid,
                    const struct nlattr *actions, size_t actions_len)
-    OVS_REQUIRES(dp->flow_mutex)
+    OVS_REQUIRES(pmd->flow_mutex)
 {
     struct dp_netdev_flow *flow;
     struct netdev_flow_key mask;
@@ -1714,17 +1734,19 @@ dp_netdev_flow_add(struct dp_netdev *dp, struct match *match,
 
     /* Do not allocate extra space. */
     flow = xmalloc(sizeof *flow - sizeof flow->cr.flow.mf + mask.len);
+    memset(&flow->stats, 0, sizeof flow->stats);
     flow->dead = false;
+    *CONST_CAST(int *, &flow->pmd_id) = pmd->core_id;
     *CONST_CAST(struct flow *, &flow->flow) = match->flow;
+    *CONST_CAST(ovs_u128 *, &flow->ufid) = *ufid;
     ovs_refcount_init(&flow->ref_cnt);
-    ovsthread_stats_init(&flow->stats);
     ovsrcu_set(&flow->actions, dp_netdev_actions_create(actions, actions_len));
 
-    cmap_insert(&dp->flow_table,
-                CONST_CAST(struct cmap_node *, &flow->node),
-                flow_hash(&flow->flow, 0));
     netdev_flow_key_init_masked(&flow->cr.flow, &match->flow, &mask);
-    dpcls_insert(&dp->cls, &flow->cr, &mask);
+    dpcls_insert(&pmd->cls, &flow->cr, &mask);
+
+    cmap_insert(&pmd->flow_table, CONST_CAST(struct cmap_node *, &flow->node),
+                dp_netdev_flow_hash(&flow->ufid));
 
     if (OVS_UNLIKELY(VLOG_IS_DBG_ENABLED())) {
         struct match match;
@@ -1734,6 +1756,8 @@ dp_netdev_flow_add(struct dp_netdev *dp, struct match *match,
         miniflow_expand(&flow->cr.mask->mf, &match.wc.masks);
 
         ds_put_cstr(&ds, "flow_add: ");
+        odp_format_ufid(ufid, &ds);
+        ds_put_cstr(&ds, " ");
         match_format(&match, &ds, OFP_DEFAULT_PRIORITY);
         ds_put_cstr(&ds, ", actions:");
         format_odp_actions(&ds, actions, actions_len);
@@ -1746,29 +1770,16 @@ dp_netdev_flow_add(struct dp_netdev *dp, struct match *match,
     return flow;
 }
 
-static void
-clear_stats(struct dp_netdev_flow *netdev_flow)
-{
-    struct dp_netdev_flow_stats *bucket;
-    size_t i;
-
-    OVSTHREAD_STATS_FOR_EACH_BUCKET (bucket, i, &netdev_flow->stats) {
-        ovs_mutex_lock(&bucket->mutex);
-        bucket->used = 0;
-        bucket->packet_count = 0;
-        bucket->byte_count = 0;
-        bucket->tcp_flags = 0;
-        ovs_mutex_unlock(&bucket->mutex);
-    }
-}
-
 static int
 dpif_netdev_flow_put(struct dpif *dpif, const struct dpif_flow_put *put)
 {
     struct dp_netdev *dp = get_dp_netdev(dpif);
     struct dp_netdev_flow *netdev_flow;
     struct netdev_flow_key key;
+    struct dp_netdev_pmd_thread *pmd;
     struct match match;
+    ovs_u128 ufid;
+    int pmd_id = put->pmd_id == PMD_ID_NULL ? NON_PMD_CORE_ID : put->pmd_id;
     int error;
 
     error = dpif_netdev_flow_from_nlattrs(put->key, put->key_len, &match.flow);
@@ -1782,20 +1793,32 @@ dpif_netdev_flow_put(struct dpif *dpif, const struct dpif_flow_put *put)
         return error;
     }
 
+    pmd = dp_netdev_get_pmd(dp, pmd_id);
+    if (!pmd) {
+        return EINVAL;
+    }
+
     /* Must produce a netdev_flow_key for lookup.
      * This interface is no longer performance critical, since it is not used
      * for upcall processing any more. */
     netdev_flow_key_from_flow(&key, &match.flow);
 
-    ovs_mutex_lock(&dp->flow_mutex);
-    netdev_flow = dp_netdev_lookup_flow(dp, &key);
+    if (put->ufid) {
+        ufid = *put->ufid;
+    } else {
+        dpif_flow_hash(dpif, &match.flow, sizeof match.flow, &ufid);
+    }
+
+    ovs_mutex_lock(&pmd->flow_mutex);
+    netdev_flow = dp_netdev_pmd_lookup_flow(pmd, &key);
     if (!netdev_flow) {
         if (put->flags & DPIF_FP_CREATE) {
-            if (cmap_count(&dp->flow_table) < MAX_FLOWS) {
+            if (cmap_count(&pmd->flow_table) < MAX_FLOWS) {
                 if (put->stats) {
                     memset(put->stats, 0, sizeof *put->stats);
                 }
-                dp_netdev_flow_add(dp, &match, put->actions, put->actions_len);
+                dp_netdev_flow_add(pmd, &match, &ufid, put->actions,
+                                   put->actions_len);
                 error = 0;
             } else {
                 error = EFBIG;
@@ -1819,7 +1842,7 @@ dpif_netdev_flow_put(struct dpif *dpif, const struct dpif_flow_put *put)
                 get_dpif_flow_stats(netdev_flow, put->stats);
             }
             if (put->flags & DPIF_FP_ZERO_STATS) {
-                clear_stats(netdev_flow);
+                memset(&netdev_flow->stats, 0, sizeof netdev_flow->stats);
             }
 
             ovsrcu_postpone(dp_netdev_actions_free, old_actions);
@@ -1830,7 +1853,8 @@ dpif_netdev_flow_put(struct dpif *dpif, const struct dpif_flow_put *put)
             error = EINVAL;
         }
     }
-    ovs_mutex_unlock(&dp->flow_mutex);
+    ovs_mutex_unlock(&pmd->flow_mutex);
+    dp_netdev_pmd_unref(pmd);
 
     return error;
 }
@@ -1840,32 +1864,37 @@ dpif_netdev_flow_del(struct dpif *dpif, const struct dpif_flow_del *del)
 {
     struct dp_netdev *dp = get_dp_netdev(dpif);
     struct dp_netdev_flow *netdev_flow;
-    struct flow key;
-    int error;
+    struct dp_netdev_pmd_thread *pmd;
+    int pmd_id = del->pmd_id == PMD_ID_NULL ? NON_PMD_CORE_ID : del->pmd_id;
+    int error = 0;
 
-    error = dpif_netdev_flow_from_nlattrs(del->key, del->key_len, &key);
-    if (error) {
-        return error;
+    pmd = dp_netdev_get_pmd(dp, pmd_id);
+    if (!pmd) {
+        return EINVAL;
     }
 
-    ovs_mutex_lock(&dp->flow_mutex);
-    netdev_flow = dp_netdev_find_flow(dp, &key);
+    ovs_mutex_lock(&pmd->flow_mutex);
+    netdev_flow = dp_netdev_pmd_find_flow(pmd, del->ufid, del->key,
+                                          del->key_len);
     if (netdev_flow) {
         if (del->stats) {
             get_dpif_flow_stats(netdev_flow, del->stats);
         }
-        dp_netdev_remove_flow(dp, netdev_flow);
+        dp_netdev_pmd_remove_flow(pmd, netdev_flow);
     } else {
         error = ENOENT;
     }
-    ovs_mutex_unlock(&dp->flow_mutex);
+    ovs_mutex_unlock(&pmd->flow_mutex);
+    dp_netdev_pmd_unref(pmd);
 
     return error;
 }
 
 struct dpif_netdev_flow_dump {
     struct dpif_flow_dump up;
-    struct cmap_position pos;
+    struct cmap_position poll_thread_pos;
+    struct cmap_position flow_pos;
+    struct dp_netdev_pmd_thread *cur_pmd;
     int status;
     struct ovs_mutex mutex;
 };
@@ -1877,14 +1906,13 @@ dpif_netdev_flow_dump_cast(struct dpif_flow_dump *dump)
 }
 
 static struct dpif_flow_dump *
-dpif_netdev_flow_dump_create(const struct dpif *dpif_)
+dpif_netdev_flow_dump_create(const struct dpif *dpif_, bool terse)
 {
     struct dpif_netdev_flow_dump *dump;
 
-    dump = xmalloc(sizeof *dump);
+    dump = xzalloc(sizeof *dump);
     dpif_flow_dump_init(&dump->up, dpif_);
-    memset(&dump->pos, 0, sizeof dump->pos);
-    dump->status = 0;
+    dump->up.terse = terse;
     ovs_mutex_init(&dump->mutex);
 
     return &dump->up;
@@ -1941,26 +1969,58 @@ dpif_netdev_flow_dump_next(struct dpif_flow_dump_thread *thread_,
     struct dpif_netdev_flow_dump_thread *thread
         = dpif_netdev_flow_dump_thread_cast(thread_);
     struct dpif_netdev_flow_dump *dump = thread->dump;
-    struct dpif_netdev *dpif = dpif_netdev_cast(thread->up.dpif);
     struct dp_netdev_flow *netdev_flows[FLOW_DUMP_MAX_BATCH];
-    struct dp_netdev *dp = get_dp_netdev(&dpif->dpif);
     int n_flows = 0;
     int i;
 
     ovs_mutex_lock(&dump->mutex);
     if (!dump->status) {
-        for (n_flows = 0; n_flows < MIN(max_flows, FLOW_DUMP_MAX_BATCH);
-             n_flows++) {
-            struct cmap_node *node;
+        struct dpif_netdev *dpif = dpif_netdev_cast(thread->up.dpif);
+        struct dp_netdev *dp = get_dp_netdev(&dpif->dpif);
+        struct dp_netdev_pmd_thread *pmd = dump->cur_pmd;
+        int flow_limit = MIN(max_flows, FLOW_DUMP_MAX_BATCH);
+
+        /* First call to dump_next(), extracts the first pmd thread.
+         * If there is no pmd thread, returns immediately. */
+        if (!pmd) {
+            pmd = dp_netdev_pmd_get_next(dp, &dump->poll_thread_pos);
+            if (!pmd) {
+                ovs_mutex_unlock(&dump->mutex);
+                return n_flows;
 
-            node = cmap_next_position(&dp->flow_table, &dump->pos);
-            if (!node) {
-                dump->status = EOF;
-                break;
             }
-            netdev_flows[n_flows] = CONTAINER_OF(node, struct dp_netdev_flow,
-                                                 node);
         }
+
+        do {
+            for (n_flows = 0; n_flows < flow_limit; n_flows++) {
+                struct cmap_node *node;
+
+                node = cmap_next_position(&pmd->flow_table, &dump->flow_pos);
+                if (!node) {
+                    break;
+                }
+                netdev_flows[n_flows] = CONTAINER_OF(node,
+                                                     struct dp_netdev_flow,
+                                                     node);
+            }
+            /* When finishing dumping the current pmd thread, moves to
+             * the next. */
+            if (n_flows < flow_limit) {
+                memset(&dump->flow_pos, 0, sizeof dump->flow_pos);
+                dp_netdev_pmd_unref(pmd);
+                pmd = dp_netdev_pmd_get_next(dp, &dump->poll_thread_pos);
+                if (!pmd) {
+                    dump->status = EOF;
+                    break;
+                }
+            }
+            /* Keeps the reference to next caller. */
+            dump->cur_pmd = pmd;
+
+            /* If the current dump is empty, do not exit the loop, since the
+             * remaining pmds could have flows to be dumped.  Just dumps again
+             * on the new 'pmd'. */
+        } while (!n_flows);
     }
     ovs_mutex_unlock(&dump->mutex);
 
@@ -1969,34 +2029,12 @@ dpif_netdev_flow_dump_next(struct dpif_flow_dump_thread *thread_,
         struct odputil_keybuf *keybuf = &thread->keybuf[i];
         struct dp_netdev_flow *netdev_flow = netdev_flows[i];
         struct dpif_flow *f = &flows[i];
-        struct dp_netdev_actions *dp_actions;
-        struct flow_wildcards wc;
-        struct ofpbuf buf;
+        struct ofpbuf key, mask;
 
-        miniflow_expand(&netdev_flow->cr.mask->mf, &wc.masks);
-
-        /* Key. */
-        ofpbuf_use_stack(&buf, keybuf, sizeof *keybuf);
-        odp_flow_key_from_flow(&buf, &netdev_flow->flow, &wc.masks,
-                               netdev_flow->flow.in_port.odp_port, true);
-        f->key = ofpbuf_data(&buf);
-        f->key_len = ofpbuf_size(&buf);
-
-        /* Mask. */
-        ofpbuf_use_stack(&buf, maskbuf, sizeof *maskbuf);
-        odp_flow_key_from_mask(&buf, &wc.masks, &netdev_flow->flow,
-                               odp_to_u32(wc.masks.in_port.odp_port),
-                               SIZE_MAX, true);
-        f->mask = ofpbuf_data(&buf);
-        f->mask_len = ofpbuf_size(&buf);
-
-        /* Actions. */
-        dp_actions = dp_netdev_flow_get_actions(netdev_flow);
-        f->actions = dp_actions->actions;
-        f->actions_len = dp_actions->size;
-
-        /* Stats. */
-        get_dpif_flow_stats(netdev_flow, &f->stats);
+        ofpbuf_use_stack(&key, keybuf, sizeof *keybuf);
+        ofpbuf_use_stack(&mask, maskbuf, sizeof *maskbuf);
+        dp_netdev_flow_to_dpif_flow(netdev_flow, &key, &mask, f,
+                                    dump->up.terse);
     }
 
     return n_flows;
@@ -2008,7 +2046,7 @@ dpif_netdev_execute(struct dpif *dpif, struct dpif_execute *execute)
 {
     struct dp_netdev *dp = get_dp_netdev(dpif);
     struct dp_netdev_pmd_thread *pmd;
-    struct dpif_packet packet, *pp;
+    struct dp_packet packet, *pp;
 
     if (ofpbuf_size(execute->packet) < ETH_HEADER_LEN ||
         ofpbuf_size(execute->packet) > UINT16_MAX) {
@@ -2021,20 +2059,24 @@ dpif_netdev_execute(struct dpif *dpif, struct dpif_execute *execute)
 
     /* Tries finding the 'pmd'.  If NULL is returned, that means
      * the current thread is a non-pmd thread and should use
-     * dp_netdev_get_nonpmd(). */
+     * dp_netdev_get_pmd(dp, NON_PMD_CORE_ID). */
     pmd = ovsthread_getspecific(dp->per_pmd_key);
     if (!pmd) {
-        pmd = dp_netdev_get_nonpmd(dp);
+        pmd = dp_netdev_get_pmd(dp, NON_PMD_CORE_ID);
     }
 
     /* If the current thread is non-pmd thread, acquires
      * the 'non_pmd_mutex'. */
     if (pmd->core_id == NON_PMD_CORE_ID) {
         ovs_mutex_lock(&dp->non_pmd_mutex);
+        ovs_mutex_lock(&dp->port_mutex);
     }
+
     dp_netdev_execute_actions(pmd, &pp, 1, false, execute->actions,
                               execute->actions_len);
     if (pmd->core_id == NON_PMD_CORE_ID) {
+        dp_netdev_pmd_unref(pmd);
+        ovs_mutex_unlock(&dp->port_mutex);
         ovs_mutex_unlock(&dp->non_pmd_mutex);
     }
 
@@ -2188,7 +2230,7 @@ dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd,
                            struct dp_netdev_port *port,
                            struct netdev_rxq *rxq)
 {
-    struct dpif_packet *packets[NETDEV_MAX_RX_BATCH];
+    struct dp_packet *packets[NETDEV_MAX_RX_BATCH];
     int error, cnt;
 
     error = netdev_rxq_recv(rxq, packets, &cnt);
@@ -2216,7 +2258,8 @@ dpif_netdev_run(struct dpif *dpif)
 {
     struct dp_netdev_port *port;
     struct dp_netdev *dp = get_dp_netdev(dpif);
-    struct dp_netdev_pmd_thread *non_pmd = dp_netdev_get_nonpmd(dp);
+    struct dp_netdev_pmd_thread *non_pmd = dp_netdev_get_pmd(dp,
+                                                             NON_PMD_CORE_ID);
     uint64_t new_tnl_seq;
 
     ovs_mutex_lock(&dp->non_pmd_mutex);
@@ -2230,6 +2273,8 @@ dpif_netdev_run(struct dpif *dpif)
         }
     }
     ovs_mutex_unlock(&dp->non_pmd_mutex);
+    dp_netdev_pmd_unref(non_pmd);
+
     tnl_arp_cache_run();
     new_tnl_seq = seq_read(tnl_conf_seq);
 
@@ -2413,18 +2458,23 @@ dp_netdev_pmd_reload_done(struct dp_netdev_pmd_thread *pmd)
     ovs_mutex_unlock(&pmd->cond_mutex);
 }
 
-/* Returns the pointer to the dp_netdev_pmd_thread for non-pmd threads. */
+/* Finds and refs the dp_netdev_pmd_thread on core 'core_id'.  Returns
+ * the pointer if succeeds, otherwise, NULL.
+ *
+ * Caller must unrefs the returned reference.  */
 static struct dp_netdev_pmd_thread *
-dp_netdev_get_nonpmd(struct dp_netdev *dp)
+dp_netdev_get_pmd(struct dp_netdev *dp, int core_id)
 {
     struct dp_netdev_pmd_thread *pmd;
     const struct cmap_node *pnode;
 
-    pnode = cmap_find(&dp->poll_threads, hash_int(NON_PMD_CORE_ID, 0));
-    ovs_assert(pnode);
+    pnode = cmap_find(&dp->poll_threads, hash_int(core_id, 0));
+    if (!pnode) {
+        return NULL;
+    }
     pmd = CONTAINER_OF(pnode, struct dp_netdev_pmd_thread, node);
 
-    return pmd;
+    return dp_netdev_pmd_try_ref(pmd) ? pmd : NULL;
 }
 
 /* Sets the 'struct dp_netdev_pmd_thread' for non-pmd threads. */
@@ -2438,6 +2488,41 @@ dp_netdev_set_nonpmd(struct dp_netdev *dp)
                             OVS_NUMA_UNSPEC);
 }
 
+/* Caller must have valid pointer to 'pmd'. */
+static bool
+dp_netdev_pmd_try_ref(struct dp_netdev_pmd_thread *pmd)
+{
+    return ovs_refcount_try_ref_rcu(&pmd->ref_cnt);
+}
+
+static void
+dp_netdev_pmd_unref(struct dp_netdev_pmd_thread *pmd)
+{
+    if (pmd && ovs_refcount_unref(&pmd->ref_cnt) == 1) {
+        ovsrcu_postpone(dp_netdev_destroy_pmd, pmd);
+    }
+}
+
+/* Given cmap position 'pos', tries to ref the next node.  If try_ref()
+ * fails, keeps checking for next node until reaching the end of cmap.
+ *
+ * Caller must unrefs the returned reference. */
+static struct dp_netdev_pmd_thread *
+dp_netdev_pmd_get_next(struct dp_netdev *dp, struct cmap_position *pos)
+{
+    struct dp_netdev_pmd_thread *next;
+
+    do {
+        struct cmap_node *node;
+
+        node = cmap_next_position(&dp->poll_threads, pos);
+        next = node ? CONTAINER_OF(node, struct dp_netdev_pmd_thread, node)
+            : NULL;
+    } while (next && !dp_netdev_pmd_try_ref(next));
+
+    return next;
+}
+
 /* Configures the 'pmd' based on the input argument. */
 static void
 dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct dp_netdev *dp,
@@ -2447,10 +2532,15 @@ dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct dp_netdev *dp,
     pmd->index = index;
     pmd->core_id = core_id;
     pmd->numa_id = numa_id;
+
+    ovs_refcount_init(&pmd->ref_cnt);
     latch_init(&pmd->exit_latch);
     atomic_init(&pmd->change_seq, PMD_INITIAL_SEQ);
     xpthread_cond_init(&pmd->cond, NULL);
     ovs_mutex_init(&pmd->cond_mutex);
+    ovs_mutex_init(&pmd->flow_mutex);
+    dpcls_init(&pmd->cls);
+    cmap_init(&pmd->flow_table);
     /* init the 'flow_cache' since there is no
      * actual thread created for NON_PMD_CORE_ID. */
     if (core_id == NON_PMD_CORE_ID) {
@@ -2460,13 +2550,26 @@ dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct dp_netdev *dp,
                 hash_int(core_id, 0));
 }
 
-/* Stops the pmd thread, removes it from the 'dp->poll_threads'
- * and destroys the struct. */
+static void
+dp_netdev_destroy_pmd(struct dp_netdev_pmd_thread *pmd)
+{
+    dp_netdev_pmd_flow_flush(pmd);
+    dpcls_destroy(&pmd->cls);
+    cmap_destroy(&pmd->flow_table);
+    ovs_mutex_destroy(&pmd->flow_mutex);
+    latch_destroy(&pmd->exit_latch);
+    xpthread_cond_destroy(&pmd->cond);
+    ovs_mutex_destroy(&pmd->cond_mutex);
+    free(pmd);
+}
+
+/* Stops the pmd thread, removes it from the 'dp->poll_threads',
+ * and unrefs the struct. */
 static void
 dp_netdev_del_pmd(struct dp_netdev_pmd_thread *pmd)
 {
     /* Uninit the 'flow_cache' since there is
-     * no actual thread uninit it. */
+     * no actual thread uninit it for NON_PMD_CORE_ID. */
     if (pmd->core_id == NON_PMD_CORE_ID) {
         emc_cache_uninit(&pmd->flow_cache);
     } else {
@@ -2476,10 +2579,7 @@ dp_netdev_del_pmd(struct dp_netdev_pmd_thread *pmd)
         xpthread_join(pmd->thread, NULL);
     }
     cmap_remove(&pmd->dp->poll_threads, &pmd->node, hash_int(pmd->core_id, 0));
-    latch_destroy(&pmd->exit_latch);
-    xpthread_cond_destroy(&pmd->cond);
-    ovs_mutex_destroy(&pmd->cond_mutex);
-    free(pmd);
+    dp_netdev_pmd_unref(pmd);
 }
 
 /* Destroys all pmd threads. */
@@ -2551,14 +2651,6 @@ dp_netdev_set_pmds_on_numa(struct dp_netdev *dp, int numa_id)
 }
 
 \f
-static void *
-dp_netdev_flow_stats_new_cb(void)
-{
-    struct dp_netdev_flow_stats *bucket = xzalloc_cacheline(sizeof *bucket);
-    ovs_mutex_init(&bucket->mutex);
-    return bucket;
-}
-
 /* Called after pmd threads config change.  Restarts pmd threads with
  * new configuration. */
 static void
@@ -2582,53 +2674,35 @@ dpif_netdev_get_datapath_version(void)
 }
 
 static void
-dp_netdev_flow_used(struct dp_netdev_flow *netdev_flow,
-                    int cnt, int size,
+dp_netdev_flow_used(struct dp_netdev_flow *netdev_flow, int cnt, int size,
                     uint16_t tcp_flags)
 {
     long long int now = time_msec();
-    struct dp_netdev_flow_stats *bucket;
-
-    bucket = ovsthread_stats_bucket_get(&netdev_flow->stats,
-                                        dp_netdev_flow_stats_new_cb);
-
-    ovs_mutex_lock(&bucket->mutex);
-    bucket->used = MAX(now, bucket->used);
-    bucket->packet_count += cnt;
-    bucket->byte_count += size;
-    bucket->tcp_flags |= tcp_flags;
-    ovs_mutex_unlock(&bucket->mutex);
-}
 
-static void *
-dp_netdev_stats_new_cb(void)
-{
-    struct dp_netdev_stats *bucket = xzalloc_cacheline(sizeof *bucket);
-    ovs_mutex_init(&bucket->mutex);
-    return bucket;
+    netdev_flow->stats.used = MAX(now, netdev_flow->stats.used);
+    netdev_flow->stats.packet_count += cnt;
+    netdev_flow->stats.byte_count += size;
+    netdev_flow->stats.tcp_flags |= tcp_flags;
 }
 
 static void
-dp_netdev_count_packet(struct dp_netdev *dp, enum dp_stat_type type, int cnt)
+dp_netdev_count_packet(struct dp_netdev_pmd_thread *pmd,
+                       enum dp_stat_type type, int cnt)
 {
-    struct dp_netdev_stats *bucket;
-
-    bucket = ovsthread_stats_bucket_get(&dp->stats, dp_netdev_stats_new_cb);
-    ovs_mutex_lock(&bucket->mutex);
-    bucket->n[type] += cnt;
-    ovs_mutex_unlock(&bucket->mutex);
+    pmd->stats.n[type] += cnt;
 }
 
 static int
-dp_netdev_upcall(struct dp_netdev *dp, struct dpif_packet *packet_,
-                 struct flow *flow, struct flow_wildcards *wc,
+dp_netdev_upcall(struct dp_netdev_pmd_thread *pmd, struct dp_packet *packet_,
+                 struct flow *flow, struct flow_wildcards *wc, ovs_u128 *ufid,
                  enum dpif_upcall_type type, const struct nlattr *userdata,
                  struct ofpbuf *actions, struct ofpbuf *put_actions)
 {
+    struct dp_netdev *dp = pmd->dp;
     struct ofpbuf *packet = &packet_->ofpbuf;
 
     if (type == DPIF_UC_MISS) {
-        dp_netdev_count_packet(dp, DP_STAT_MISS, 1);
+        dp_netdev_count_packet(pmd, DP_STAT_MISS, 1);
     }
 
     if (OVS_UNLIKELY(!dp->upcall_cb)) {
@@ -2657,20 +2731,20 @@ dp_netdev_upcall(struct dp_netdev *dp, struct dpif_packet *packet_,
         ds_destroy(&ds);
     }
 
-    return dp->upcall_cb(packet, flow, type, userdata, actions, wc,
-                         put_actions, dp->upcall_aux);
+    return dp->upcall_cb(packet, flow, ufid, pmd->core_id, type, userdata,
+                         actions, wc, put_actions, dp->upcall_aux);
 }
 
 static inline uint32_t
-dpif_netdev_packet_get_dp_hash(struct dpif_packet *packet,
+dpif_netdev_packet_get_dp_hash(struct dp_packet *packet,
                                const struct miniflow *mf)
 {
     uint32_t hash;
 
-    hash = dpif_packet_get_dp_hash(packet);
+    hash = dp_packet_get_dp_hash(packet);
     if (OVS_UNLIKELY(!hash)) {
         hash = miniflow_hash_5tuple(mf, 0);
-        dpif_packet_set_dp_hash(packet, hash);
+        dp_packet_set_dp_hash(packet, hash);
     }
     return hash;
 }
@@ -2682,11 +2756,11 @@ struct packet_batch {
 
     struct dp_netdev_flow *flow;
 
-    struct dpif_packet *packets[NETDEV_MAX_RX_BATCH];
+    struct dp_packet *packets[NETDEV_MAX_RX_BATCH];
 };
 
 static inline void
-packet_batch_update(struct packet_batch *batch, struct dpif_packet *packet,
+packet_batch_update(struct packet_batch *batch, struct dp_packet *packet,
                     const struct miniflow *mf)
 {
     batch->tcp_flags |= miniflow_get_tcp_flags(mf);
@@ -2719,11 +2793,11 @@ packet_batch_execute(struct packet_batch *batch,
     dp_netdev_execute_actions(pmd, batch->packets, batch->packet_count, true,
                               actions->actions, actions->size);
 
-    dp_netdev_count_packet(pmd->dp, DP_STAT_HIT, batch->packet_count);
+    dp_netdev_count_packet(pmd, DP_STAT_HIT, batch->packet_count);
 }
 
 static inline bool
-dp_netdev_queue_batches(struct dpif_packet *pkt,
+dp_netdev_queue_batches(struct dp_packet *pkt,
                         struct dp_netdev_flow *flow, const struct miniflow *mf,
                         struct packet_batch *batches, size_t *n_batches,
                         size_t max_batches)
@@ -2758,9 +2832,9 @@ dp_netdev_queue_batches(struct dpif_packet *pkt,
 }
 
 static inline void
-dpif_packet_swap(struct dpif_packet **a, struct dpif_packet **b)
+dp_packet_swap(struct dp_packet **a, struct dp_packet **b)
 {
-    struct dpif_packet *tmp = *a;
+    struct dp_packet *tmp = *a;
     *a = *b;
     *b = tmp;
 }
@@ -2774,7 +2848,7 @@ dpif_packet_swap(struct dpif_packet **a, struct dpif_packet **b)
  * 'packets' array (they have been moved to the beginning of the vector).
  */
 static inline size_t
-emc_processing(struct dp_netdev_pmd_thread *pmd, struct dpif_packet **packets,
+emc_processing(struct dp_netdev_pmd_thread *pmd, struct dp_packet **packets,
                size_t cnt, struct netdev_flow_key *keys)
 {
     struct netdev_flow_key key;
@@ -2789,7 +2863,7 @@ emc_processing(struct dp_netdev_pmd_thread *pmd, struct dpif_packet **packets,
         struct dp_netdev_flow *flow;
 
         if (OVS_UNLIKELY(ofpbuf_size(&packets[i]->ofpbuf) < ETH_HEADER_LEN)) {
-            dpif_packet_delete(packets[i]);
+            dp_packet_delete(packets[i]);
             continue;
         }
 
@@ -2802,7 +2876,7 @@ emc_processing(struct dp_netdev_pmd_thread *pmd, struct dpif_packet **packets,
                                                   batches, &n_batches,
                                                   ARRAY_SIZE(batches)))) {
             if (i != notfound_cnt) {
-                dpif_packet_swap(&packets[i], &packets[notfound_cnt]);
+                dp_packet_swap(&packets[i], &packets[notfound_cnt]);
             }
 
             keys[notfound_cnt++] = key;
@@ -2818,7 +2892,7 @@ emc_processing(struct dp_netdev_pmd_thread *pmd, struct dpif_packet **packets,
 
 static inline void
 fast_path_processing(struct dp_netdev_pmd_thread *pmd,
-                     struct dpif_packet **packets, size_t cnt,
+                     struct dp_packet **packets, size_t cnt,
                      struct netdev_flow_key *keys)
 {
 #if !defined(__CHECKER__) && !defined(_WIN32)
@@ -2838,10 +2912,11 @@ fast_path_processing(struct dp_netdev_pmd_thread *pmd,
         /* Key length is needed in all the cases, hash computed on demand. */
         keys[i].len = netdev_flow_key_size(count_1bits(keys[i].mf.map));
     }
-    any_miss = !dpcls_lookup(&dp->cls, keys, rules, cnt);
+    any_miss = !dpcls_lookup(&pmd->cls, keys, rules, cnt);
     if (OVS_UNLIKELY(any_miss) && !fat_rwlock_tryrdlock(&dp->upcall_rwlock)) {
         uint64_t actions_stub[512 / 8], slow_stub[512 / 8];
         struct ofpbuf actions, put_actions;
+        ovs_u128 ufid;
 
         ofpbuf_use_stub(&actions, actions_stub, sizeof actions_stub);
         ofpbuf_use_stub(&put_actions, slow_stub, sizeof slow_stub);
@@ -2859,7 +2934,7 @@ fast_path_processing(struct dp_netdev_pmd_thread *pmd,
             /* It's possible that an earlier slow path execution installed
              * a rule covering this flow.  In this case, it's a lot cheaper
              * to catch it here than execute a miss. */
-            netdev_flow = dp_netdev_lookup_flow(dp, &keys[i]);
+            netdev_flow = dp_netdev_pmd_lookup_flow(pmd, &keys[i]);
             if (netdev_flow) {
                 rules[i] = &netdev_flow->cr;
                 continue;
@@ -2870,8 +2945,9 @@ fast_path_processing(struct dp_netdev_pmd_thread *pmd,
             ofpbuf_clear(&actions);
             ofpbuf_clear(&put_actions);
 
-            error = dp_netdev_upcall(dp, packets[i], &match.flow, &match.wc,
-                                     DPIF_UC_MISS, NULL, &actions,
+            dpif_flow_hash(dp->dpif, &match.flow, sizeof match.flow, &ufid);
+            error = dp_netdev_upcall(pmd, packets[i], &match.flow, &match.wc,
+                                     &ufid, DPIF_UC_MISS, NULL, &actions,
                                      &put_actions);
             if (OVS_UNLIKELY(error && error != ENOSPC)) {
                 continue;
@@ -2895,14 +2971,14 @@ fast_path_processing(struct dp_netdev_pmd_thread *pmd,
                  * mutex lock outside the loop, but that's an awful long time
                  * to be locking everyone out of making flow installs.  If we
                  * move to a per-core classifier, it would be reasonable. */
-                ovs_mutex_lock(&dp->flow_mutex);
-                netdev_flow = dp_netdev_lookup_flow(dp, &keys[i]);
+                ovs_mutex_lock(&pmd->flow_mutex);
+                netdev_flow = dp_netdev_pmd_lookup_flow(pmd, &keys[i]);
                 if (OVS_LIKELY(!netdev_flow)) {
-                    netdev_flow = dp_netdev_flow_add(dp, &match,
+                    netdev_flow = dp_netdev_flow_add(pmd, &match, &ufid,
                                                      ofpbuf_data(add_actions),
                                                      ofpbuf_size(add_actions));
                 }
-                ovs_mutex_unlock(&dp->flow_mutex);
+                ovs_mutex_unlock(&pmd->flow_mutex);
 
                 emc_insert(flow_cache, &keys[i], netdev_flow);
             }
@@ -2916,17 +2992,17 @@ fast_path_processing(struct dp_netdev_pmd_thread *pmd,
 
         for (i = 0; i < cnt; i++) {
             if (OVS_UNLIKELY(!rules[i])) {
-                dpif_packet_delete(packets[i]);
+                dp_packet_delete(packets[i]);
                 dropped_cnt++;
             }
         }
 
-        dp_netdev_count_packet(dp, DP_STAT_LOST, dropped_cnt);
+        dp_netdev_count_packet(pmd, DP_STAT_LOST, dropped_cnt);
     }
 
     n_batches = 0;
     for (i = 0; i < cnt; i++) {
-        struct dpif_packet *packet = packets[i];
+        struct dp_packet *packet = packets[i];
         struct dp_netdev_flow *flow;
 
         if (OVS_UNLIKELY(!rules[i])) {
@@ -2947,7 +3023,7 @@ fast_path_processing(struct dp_netdev_pmd_thread *pmd,
 
 static void
 dp_netdev_input(struct dp_netdev_pmd_thread *pmd,
-                struct dpif_packet **packets, int cnt)
+                struct dp_packet **packets, int cnt)
 {
 #if !defined(__CHECKER__) && !defined(_WIN32)
     const size_t PKT_ARRAY_SIZE = cnt;
@@ -2978,13 +3054,13 @@ dpif_netdev_register_upcall_cb(struct dpif *dpif, upcall_callback *cb,
 }
 
 static void
-dp_netdev_drop_packets(struct dpif_packet ** packets, int cnt, bool may_steal)
+dp_netdev_drop_packets(struct dp_packet ** packets, int cnt, bool may_steal)
 {
     if (may_steal) {
         int i;
 
         for (i = 0; i < cnt; i++) {
-            dpif_packet_delete(packets[i]);
+            dp_packet_delete(packets[i]);
         }
     }
 }
@@ -2992,7 +3068,7 @@ dp_netdev_drop_packets(struct dpif_packet ** packets, int cnt, bool may_steal)
 static int
 push_tnl_action(const struct dp_netdev *dp,
                    const struct nlattr *attr,
-                   struct dpif_packet **packets, int cnt)
+                   struct dp_packet **packets, int cnt)
 {
     struct dp_netdev_port *tun_port;
     const struct ovs_action_push_tnl *data;
@@ -3009,18 +3085,18 @@ push_tnl_action(const struct dp_netdev *dp,
 }
 
 static void
-dp_netdev_clone_pkt_batch(struct dpif_packet **tnl_pkt,
-                          struct dpif_packet **packets, int cnt)
+dp_netdev_clone_pkt_batch(struct dp_packet **tnl_pkt,
+                          struct dp_packet **packets, int cnt)
 {
     int i;
 
     for (i = 0; i < cnt; i++) {
-        tnl_pkt[i] = dpif_packet_clone(packets[i]);
+        tnl_pkt[i] = dp_packet_clone(packets[i]);
     }
 }
 
 static void
-dp_execute_cb(void *aux_, struct dpif_packet **packets, int cnt,
+dp_execute_cb(void *aux_, struct dp_packet **packets, int cnt,
               const struct nlattr *a, bool may_steal)
     OVS_NO_THREAD_SAFETY_ANALYSIS
 {
@@ -3043,7 +3119,7 @@ dp_execute_cb(void *aux_, struct dpif_packet **packets, int cnt,
 
     case OVS_ACTION_ATTR_TUNNEL_PUSH:
         if (*depth < MAX_RECIRC_DEPTH) {
-            struct dpif_packet *tnl_pkt[NETDEV_MAX_RX_BATCH];
+            struct dp_packet *tnl_pkt[NETDEV_MAX_RX_BATCH];
             int err;
 
             if (!may_steal) {
@@ -3069,7 +3145,7 @@ dp_execute_cb(void *aux_, struct dpif_packet **packets, int cnt,
 
             p = dp_netdev_lookup_port(dp, portno);
             if (p) {
-                struct dpif_packet *tnl_pkt[NETDEV_MAX_RX_BATCH];
+                struct dp_packet *tnl_pkt[NETDEV_MAX_RX_BATCH];
                 int err;
 
                 if (!may_steal) {
@@ -3100,6 +3176,7 @@ dp_execute_cb(void *aux_, struct dpif_packet **packets, int cnt,
             const struct nlattr *userdata;
             struct ofpbuf actions;
             struct flow flow;
+            ovs_u128 ufid;
 
             userdata = nl_attr_find_nested(a, OVS_USERSPACE_ATTR_USERDATA);
             ofpbuf_init(&actions, 0);
@@ -3110,15 +3187,16 @@ dp_execute_cb(void *aux_, struct dpif_packet **packets, int cnt,
                 ofpbuf_clear(&actions);
 
                 flow_extract(&packets[i]->ofpbuf, &packets[i]->md, &flow);
-                error = dp_netdev_upcall(dp, packets[i], &flow, NULL,
-                                         DPIF_UC_ACTION, userdata, &actions,
+                dpif_flow_hash(dp->dpif, &flow, sizeof flow, &ufid);
+                error = dp_netdev_upcall(pmd, packets[i], &flow, NULL, &ufid,
+                                         DPIF_UC_ACTION, userdata,&actions,
                                          NULL);
                 if (!error || error == ENOSPC) {
                     dp_netdev_execute_actions(pmd, &packets[i], 1, may_steal,
                                               ofpbuf_data(&actions),
                                               ofpbuf_size(&actions));
                 } else if (may_steal) {
-                    dpif_packet_delete(packets[i]);
+                    dp_packet_delete(packets[i]);
                 }
             }
             ofpbuf_uninit(&actions);
@@ -3133,15 +3211,15 @@ dp_execute_cb(void *aux_, struct dpif_packet **packets, int cnt,
 
             (*depth)++;
             for (i = 0; i < cnt; i++) {
-                struct dpif_packet *recirc_pkt;
+                struct dp_packet *recirc_pkt;
 
                 recirc_pkt = (may_steal) ? packets[i]
-                                    : dpif_packet_clone(packets[i]);
+                                    : dp_packet_clone(packets[i]);
 
                 recirc_pkt->md.recirc_id = nl_attr_get_u32(a);
 
                 /* Hash is private to each packet */
-                recirc_pkt->md.dp_hash = dpif_packet_get_dp_hash(packets[i]);
+                recirc_pkt->md.dp_hash = dp_packet_get_dp_hash(packets[i]);
 
                 dp_netdev_input(pmd, &recirc_pkt, 1);
             }
@@ -3171,7 +3249,7 @@ dp_execute_cb(void *aux_, struct dpif_packet **packets, int cnt,
 
 static void
 dp_netdev_execute_actions(struct dp_netdev_pmd_thread *pmd,
-                          struct dpif_packet **packets, int cnt,
+                          struct dp_packet **packets, int cnt,
                           bool may_steal,
                           const struct nlattr *actions, size_t actions_len)
 {
@@ -3457,12 +3535,12 @@ static inline bool
 dpcls_rule_matches_key(const struct dpcls_rule *rule,
                        const struct netdev_flow_key *target)
 {
-    const uint32_t *keyp = rule->flow.mf.inline_values;
-    const uint32_t *maskp = rule->mask->mf.inline_values;
-    uint32_t target_u32;
+    const uint64_t *keyp = rule->flow.mf.inline_values;
+    const uint64_t *maskp = rule->mask->mf.inline_values;
+    uint64_t target_u64;
 
-    NETDEV_FLOW_KEY_FOR_EACH_IN_MAP(target_u32, target, rule->flow.mf.map) {
-        if (OVS_UNLIKELY((target_u32 & *maskp++) != *keyp++)) {
+    NETDEV_FLOW_KEY_FOR_EACH_IN_MAP(target_u64, target, rule->flow.mf.map) {
+        if (OVS_UNLIKELY((target_u64 & *maskp++) != *keyp++)) {
             return false;
         }
     }