dpif-netdev: Move rxq management into functions.
[cascardo/ovs.git] / lib / dpif-netdev.c
index 9abb948..500e7cc 100644 (file)
@@ -43,6 +43,7 @@
 #include "flow.h"
 #include "cmap.h"
 #include "coverage.h"
+#include "hmapx.h"
 #include "latch.h"
 #include "list.h"
 #include "match.h"
@@ -221,9 +222,7 @@ struct dp_netdev {
      * 'struct dp_netdev_pmd_thread' in 'per_pmd_key'. */
     ovsthread_key_t per_pmd_key;
 
-    /* Number of rx queues for each dpdk interface and the cpu mask
-     * for pin of pmd threads. */
-    size_t n_dpdk_rxqs;
+    /* Cpu mask for pin of pmd threads. */
     char *pmd_cmask;
     uint64_t last_tnl_conf_seq;
 };
@@ -254,6 +253,8 @@ struct dp_netdev_port {
     struct netdev_rxq **rxq;
     struct ovs_refcount ref_cnt;
     char *type;                 /* Port type as requested by user. */
+    int latest_requested_n_rxq; /* Latest requested from netdev number
+                                   of rx queues. */
 };
 
 /* Contained by struct dp_netdev_flow's 'stats' member.  */
@@ -478,7 +479,9 @@ static void dp_netdev_execute_actions(struct dp_netdev_pmd_thread *pmd,
                                       const struct nlattr *actions,
                                       size_t actions_len);
 static void dp_netdev_input(struct dp_netdev_pmd_thread *,
-                            struct dp_packet **, int cnt);
+                            struct dp_packet **, int cnt, odp_port_t port_no);
+static void dp_netdev_recirculate(struct dp_netdev_pmd_thread *,
+                                  struct dp_packet **, int cnt);
 
 static void dp_netdev_disable_upcall(struct dp_netdev *);
 static void dp_netdev_pmd_reload_done(struct dp_netdev_pmd_thread *pmd);
@@ -494,6 +497,13 @@ dp_netdev_pmd_get_next(struct dp_netdev *dp, struct cmap_position *pos);
 static void dp_netdev_destroy_all_pmds(struct dp_netdev *dp);
 static void dp_netdev_del_pmds_on_numa(struct dp_netdev *dp, int numa_id);
 static void dp_netdev_set_pmds_on_numa(struct dp_netdev *dp, int numa_id);
+static void dp_netdev_pmd_clear_poll_list(struct dp_netdev_pmd_thread *pmd);
+static void dp_netdev_del_port_from_pmd(struct dp_netdev_port *port,
+                                        struct dp_netdev_pmd_thread *pmd);
+static void dp_netdev_del_port_from_all_pmds(struct dp_netdev *dp,
+                                             struct dp_netdev_port *port);
+static void
+dp_netdev_add_port_to_pmds(struct dp_netdev *dp, struct dp_netdev_port *port);
 static void
 dp_netdev_add_rxq_to_pmd(struct dp_netdev_pmd_thread *pmd,
                          struct dp_netdev_port *port, struct netdev_rxq *rx);
@@ -565,8 +575,9 @@ get_dp_netdev(const struct dpif *dpif)
 }
 \f
 enum pmd_info_type {
-    PMD_INFO_SHOW_STATS,  /* show how cpu cycles are spent */
-    PMD_INFO_CLEAR_STATS  /* set the cycles count to 0 */
+    PMD_INFO_SHOW_STATS,  /* Show how cpu cycles are spent. */
+    PMD_INFO_CLEAR_STATS, /* Set the cycles count to 0. */
+    PMD_INFO_SHOW_RXQ     /* Show poll-lists of pmd threads. */
 };
 
 static void
@@ -674,6 +685,35 @@ pmd_info_clear_stats(struct ds *reply OVS_UNUSED,
     }
 }
 
+static void
+pmd_info_show_rxq(struct ds *reply, struct dp_netdev_pmd_thread *pmd)
+{
+    if (pmd->core_id != NON_PMD_CORE_ID) {
+        struct rxq_poll *poll;
+        const char *prev_name = NULL;
+
+        ds_put_format(reply, "pmd thread numa_id %d core_id %u:\n",
+                      pmd->numa_id, pmd->core_id);
+
+        ovs_mutex_lock(&pmd->poll_mutex);
+        LIST_FOR_EACH (poll, node, &pmd->poll_list) {
+            const char *name = netdev_get_name(poll->port->netdev);
+
+            if (!prev_name || strcmp(name, prev_name)) {
+                if (prev_name) {
+                    ds_put_cstr(reply, "\n");
+                }
+                ds_put_format(reply, "\tport: %s\tqueue-id:",
+                              netdev_get_name(poll->port->netdev));
+            }
+            ds_put_format(reply, " %d", netdev_rxq_get_queue_id(poll->rx));
+            prev_name = name;
+        }
+        ovs_mutex_unlock(&pmd->poll_mutex);
+        ds_put_cstr(reply, "\n");
+    }
+}
+
 static void
 dpif_netdev_pmd_info(struct unixctl_conn *conn, int argc, const char *argv[],
                      void *aux)
@@ -700,22 +740,26 @@ dpif_netdev_pmd_info(struct unixctl_conn *conn, int argc, const char *argv[],
     }
 
     CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
-        unsigned long long stats[DP_N_STATS];
-        uint64_t cycles[PMD_N_CYCLES];
-        int i;
+        if (type == PMD_INFO_SHOW_RXQ) {
+            pmd_info_show_rxq(&reply, pmd);
+        } else {
+            unsigned long long stats[DP_N_STATS];
+            uint64_t cycles[PMD_N_CYCLES];
+            int i;
 
-        /* Read current stats and cycle counters */
-        for (i = 0; i < ARRAY_SIZE(stats); i++) {
-            atomic_read_relaxed(&pmd->stats.n[i], &stats[i]);
-        }
-        for (i = 0; i < ARRAY_SIZE(cycles); i++) {
-            atomic_read_relaxed(&pmd->cycles.n[i], &cycles[i]);
-        }
+            /* Read current stats and cycle counters */
+            for (i = 0; i < ARRAY_SIZE(stats); i++) {
+                atomic_read_relaxed(&pmd->stats.n[i], &stats[i]);
+            }
+            for (i = 0; i < ARRAY_SIZE(cycles); i++) {
+                atomic_read_relaxed(&pmd->cycles.n[i], &cycles[i]);
+            }
 
-        if (type == PMD_INFO_CLEAR_STATS) {
-            pmd_info_clear_stats(&reply, pmd, stats, cycles);
-        } else if (type == PMD_INFO_SHOW_STATS) {
-            pmd_info_show_stats(&reply, pmd, stats, cycles);
+            if (type == PMD_INFO_CLEAR_STATS) {
+                pmd_info_clear_stats(&reply, pmd, stats, cycles);
+            } else if (type == PMD_INFO_SHOW_STATS) {
+                pmd_info_show_stats(&reply, pmd, stats, cycles);
+            }
         }
     }
 
@@ -729,7 +773,8 @@ static int
 dpif_netdev_init(void)
 {
     static enum pmd_info_type show_aux = PMD_INFO_SHOW_STATS,
-                              clear_aux = PMD_INFO_CLEAR_STATS;
+                              clear_aux = PMD_INFO_CLEAR_STATS,
+                              poll_aux = PMD_INFO_SHOW_RXQ;
 
     unixctl_command_register("dpif-netdev/pmd-stats-show", "[dp]",
                              0, 1, dpif_netdev_pmd_info,
@@ -737,6 +782,9 @@ dpif_netdev_init(void)
     unixctl_command_register("dpif-netdev/pmd-stats-clear", "[dp]",
                              0, 1, dpif_netdev_pmd_info,
                              (void *)&clear_aux);
+    unixctl_command_register("dpif-netdev/pmd-rxq-show", "[dp]",
+                             0, 1, dpif_netdev_pmd_info,
+                             (void *)&poll_aux);
     return 0;
 }
 
@@ -864,7 +912,6 @@ create_dp_netdev(const char *name, const struct dpif_class *class,
     ovsthread_key_create(&dp->per_pmd_key, NULL);
 
     dp_netdev_set_nonpmd(dp);
-    dp->n_dpdk_rxqs = NR_QUEUE;
 
     ovs_mutex_lock(&dp->port_mutex);
     error = do_add_port(dp, name, "internal", ODPP_LOCAL);
@@ -927,15 +974,16 @@ dp_netdev_free(struct dp_netdev *dp)
     shash_find_and_delete(&dp_netdevs, dp->name);
 
     dp_netdev_destroy_all_pmds(dp);
-    cmap_destroy(&dp->poll_threads);
     ovs_mutex_destroy(&dp->non_pmd_mutex);
     ovsthread_key_delete(dp->per_pmd_key);
 
     ovs_mutex_lock(&dp->port_mutex);
     CMAP_FOR_EACH (port, node, &dp->ports) {
+        /* PMD threads are destroyed here. do_del_port() cannot quiesce */
         do_del_port(dp, port);
     }
     ovs_mutex_unlock(&dp->port_mutex);
+    cmap_destroy(&dp->poll_threads);
 
     seq_destroy(dp->port_seq);
     cmap_destroy(&dp->ports);
@@ -1091,7 +1139,8 @@ do_add_port(struct dp_netdev *dp, const char *devname, const char *type,
         /* There can only be ovs_numa_get_n_cores() pmd threads,
          * so creates a txq for each, and one extra for the non
          * pmd threads. */
-        error = netdev_set_multiq(netdev, n_cores + 1, dp->n_dpdk_rxqs);
+        error = netdev_set_multiq(netdev, n_cores + 1,
+                                  netdev_requested_n_rxq(netdev));
         if (error && (error != EOPNOTSUPP)) {
             VLOG_ERR("%s, cannot set multiq", devname);
             return errno;
@@ -1102,6 +1151,7 @@ do_add_port(struct dp_netdev *dp, const char *devname, const char *type,
     port->netdev = netdev;
     port->rxq = xmalloc(sizeof *port->rxq * netdev_n_rxq(netdev));
     port->type = xstrdup(type);
+    port->latest_requested_n_rxq = netdev_requested_n_rxq(netdev);
     for (i = 0; i < netdev_n_rxq(netdev); i++) {
         error = netdev_rxq_open(netdev, &port->rxq[i], i);
         if (error
@@ -1133,26 +1183,7 @@ do_add_port(struct dp_netdev *dp, const char *devname, const char *type,
     cmap_insert(&dp->ports, &port->node, hash_port_no(port_no));
 
     if (netdev_is_pmd(netdev)) {
-        int numa_id = netdev_get_numa_id(netdev);
-        struct dp_netdev_pmd_thread *pmd;
-
-        /* Cannot create pmd threads for invalid numa node. */
-        ovs_assert(ovs_numa_numa_id_is_valid(numa_id));
-
-        for (i = 0; i < netdev_n_rxq(netdev); i++) {
-            pmd = dp_netdev_less_loaded_pmd_on_numa(dp, numa_id);
-            if (!pmd) {
-                /* There is no pmd threads on this numa node. */
-                dp_netdev_set_pmds_on_numa(dp, numa_id);
-                /* Assigning of rx queues done. */
-                break;
-            }
-
-            ovs_mutex_lock(&pmd->poll_mutex);
-            dp_netdev_add_rxq_to_pmd(pmd, port, port->rxq[i]);
-            ovs_mutex_unlock(&pmd->poll_mutex);
-            dp_netdev_reload_pmd__(pmd);
-        }
+        dp_netdev_add_port_to_pmds(dp, port);
     }
     seq_change(dp->port_seq);
 
@@ -1340,29 +1371,7 @@ do_del_port(struct dp_netdev *dp, struct dp_netdev_port *port)
         if (!has_pmd_port_for_numa(dp, numa_id)) {
             dp_netdev_del_pmds_on_numa(dp, numa_id);
         } else {
-            struct dp_netdev_pmd_thread *pmd;
-            struct rxq_poll *poll, *next;
-
-            CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
-                if (pmd->numa_id == numa_id) {
-                    bool found = false;
-
-                    ovs_mutex_lock(&pmd->poll_mutex);
-                    LIST_FOR_EACH_SAFE (poll, next, node, &pmd->poll_list) {
-                        if (poll->port == port) {
-                            found = true;
-                            port_unref(poll->port);
-                            list_remove(&poll->node);
-                            pmd->poll_cnt--;
-                            free(poll);
-                        }
-                    }
-                    ovs_mutex_unlock(&pmd->poll_mutex);
-                    if (found) {
-                        dp_netdev_reload_pmd__(pmd);
-                    }
-                }
-            }
+            dp_netdev_del_port_from_all_pmds(dp, port);
         }
     }
 
@@ -2405,32 +2414,42 @@ dpif_netdev_operate(struct dpif *dpif, struct dpif_op **ops, size_t n_ops)
 /* Returns true if the configuration for rx queues or cpu mask
  * is changed. */
 static bool
-pmd_config_changed(const struct dp_netdev *dp, size_t rxqs, const char *cmask)
+pmd_config_changed(const struct dp_netdev *dp, const char *cmask)
 {
-    if (dp->n_dpdk_rxqs != rxqs) {
-        return true;
-    } else {
-        if (dp->pmd_cmask != NULL && cmask != NULL) {
-            return strcmp(dp->pmd_cmask, cmask);
-        } else {
-            return (dp->pmd_cmask != NULL || cmask != NULL);
+    struct dp_netdev_port *port;
+
+    CMAP_FOR_EACH (port, node, &dp->ports) {
+        struct netdev *netdev = port->netdev;
+        int requested_n_rxq = netdev_requested_n_rxq(netdev);
+        if (netdev_is_pmd(netdev)
+            && port->latest_requested_n_rxq != requested_n_rxq) {
+            return true;
         }
     }
+
+    if (dp->pmd_cmask != NULL && cmask != NULL) {
+        return strcmp(dp->pmd_cmask, cmask);
+    } else {
+        return (dp->pmd_cmask != NULL || cmask != NULL);
+    }
 }
 
 /* Resets pmd threads if the configuration for 'rxq's or cpu mask changes. */
 static int
-dpif_netdev_pmd_set(struct dpif *dpif, unsigned int n_rxqs, const char *cmask)
+dpif_netdev_pmd_set(struct dpif *dpif, const char *cmask)
 {
     struct dp_netdev *dp = get_dp_netdev(dpif);
 
-    if (pmd_config_changed(dp, n_rxqs, cmask)) {
+    if (pmd_config_changed(dp, cmask)) {
         struct dp_netdev_port *port;
 
         dp_netdev_destroy_all_pmds(dp);
 
         CMAP_FOR_EACH (port, node, &dp->ports) {
-            if (netdev_is_pmd(port->netdev)) {
+            struct netdev *netdev = port->netdev;
+            int requested_n_rxq = netdev_requested_n_rxq(netdev);
+            if (netdev_is_pmd(port->netdev)
+                && port->latest_requested_n_rxq != requested_n_rxq) {
                 int i, err;
 
                 /* Closes the existing 'rxq's. */
@@ -2442,14 +2461,14 @@ dpif_netdev_pmd_set(struct dpif *dpif, unsigned int n_rxqs, const char *cmask)
                 /* Sets the new rx queue config.  */
                 err = netdev_set_multiq(port->netdev,
                                         ovs_numa_get_n_cores() + 1,
-                                        n_rxqs);
+                                        requested_n_rxq);
                 if (err && (err != EOPNOTSUPP)) {
                     VLOG_ERR("Failed to set dpdk interface %s rx_queue to:"
                              " %u", netdev_get_name(port->netdev),
-                             n_rxqs);
+                             requested_n_rxq);
                     return err;
                 }
-
+                port->latest_requested_n_rxq = requested_n_rxq;
                 /* If the set_multiq() above succeeds, reopens the 'rxq's. */
                 port->rxq = xrealloc(port->rxq, sizeof *port->rxq
                                      * netdev_n_rxq(port->netdev));
@@ -2458,8 +2477,6 @@ dpif_netdev_pmd_set(struct dpif *dpif, unsigned int n_rxqs, const char *cmask)
                 }
             }
         }
-        dp->n_dpdk_rxqs = n_rxqs;
-
         /* Reconfigures the cpu mask. */
         ovs_numa_set_cpu_mask(cmask);
         free(dp->pmd_cmask);
@@ -2555,16 +2572,10 @@ dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd,
     error = netdev_rxq_recv(rxq, packets, &cnt);
     cycles_count_end(pmd, PMD_CYCLES_POLLING);
     if (!error) {
-        int i;
-
         *recirc_depth_get() = 0;
 
-        /* XXX: initialize md in netdev implementation. */
-        for (i = 0; i < cnt; i++) {
-            pkt_metadata_init(&packets[i]->md, port->port_no);
-        }
         cycles_count_start(pmd);
-        dp_netdev_input(pmd, packets, cnt);
+        dp_netdev_input(pmd, packets, cnt, port->port_no);
         cycles_count_end(pmd, PMD_CYCLES_PROCESSING);
     } else if (error != EAGAIN && error != EOPNOTSUPP) {
         static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
@@ -2678,8 +2689,9 @@ reload:
 
     /* List port/core affinity */
     for (i = 0; i < poll_cnt; i++) {
-       VLOG_INFO("Core %d processing port \'%s\'\n", pmd->core_id,
-                 netdev_get_name(poll_list[i].port->netdev));
+       VLOG_DBG("Core %d processing port \'%s\' with queue-id %d\n",
+                pmd->core_id, netdev_get_name(poll_list[i].port->netdev),
+                netdev_rxq_get_queue_id(poll_list[i].rx));
     }
 
     /* Signal here to make sure the pmd finishes
@@ -2881,8 +2893,6 @@ dp_netdev_destroy_pmd(struct dp_netdev_pmd_thread *pmd)
 static void
 dp_netdev_del_pmd(struct dp_netdev *dp, struct dp_netdev_pmd_thread *pmd)
 {
-    struct rxq_poll *poll;
-
     /* Uninit the 'flow_cache' since there is
      * no actual thread uninit it for NON_PMD_CORE_ID. */
     if (pmd->core_id == NON_PMD_CORE_ID) {
@@ -2895,10 +2905,7 @@ dp_netdev_del_pmd(struct dp_netdev *dp, struct dp_netdev_pmd_thread *pmd)
     }
 
     /* Unref all ports and free poll_list. */
-    LIST_FOR_EACH_POP (poll, node, &pmd->poll_list) {
-        port_unref(poll->port);
-        free(poll);
-    }
+    dp_netdev_pmd_clear_poll_list(pmd);
 
     /* Purges the 'pmd''s flows after stopping the thread, but before
      * destroying the flows, so that the flow stats can be collected. */
@@ -2914,10 +2921,24 @@ static void
 dp_netdev_destroy_all_pmds(struct dp_netdev *dp)
 {
     struct dp_netdev_pmd_thread *pmd;
+    struct dp_netdev_pmd_thread **pmd_list;
+    size_t k = 0, n_pmds;
+
+    n_pmds = cmap_count(&dp->poll_threads);
+    pmd_list = xcalloc(n_pmds, sizeof *pmd_list);
 
     CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
-        dp_netdev_del_pmd(dp, pmd);
+        /* We cannot call dp_netdev_del_pmd(), since it alters
+         * 'dp->poll_threads' (while we're iterating it) and it
+         * might quiesce. */
+        ovs_assert(k < n_pmds);
+        pmd_list[k++] = pmd;
+    }
+
+    for (size_t i = 0; i < k; i++) {
+        dp_netdev_del_pmd(dp, pmd_list[i]);
     }
+    free(pmd_list);
 }
 
 /* Deletes all pmd threads on numa node 'numa_id' and
@@ -2928,18 +2949,28 @@ dp_netdev_del_pmds_on_numa(struct dp_netdev *dp, int numa_id)
     struct dp_netdev_pmd_thread *pmd;
     int n_pmds_on_numa, n_pmds;
     int *free_idx, k = 0;
+    struct dp_netdev_pmd_thread **pmd_list;
 
     n_pmds_on_numa = get_n_pmd_threads_on_numa(dp, numa_id);
-    free_idx = xmalloc(n_pmds_on_numa * sizeof *free_idx);
+    free_idx = xcalloc(n_pmds_on_numa, sizeof *free_idx);
+    pmd_list = xcalloc(n_pmds_on_numa, sizeof *pmd_list);
 
     CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
+        /* We cannot call dp_netdev_del_pmd(), since it alters
+         * 'dp->poll_threads' (while we're iterating it) and it
+         * might quiesce. */
         if (pmd->numa_id == numa_id) {
             atomic_read_relaxed(&pmd->tx_qid, &free_idx[k]);
+            pmd_list[k] = pmd;
+            ovs_assert(k < n_pmds_on_numa);
             k++;
-            dp_netdev_del_pmd(dp, pmd);
         }
     }
 
+    for (int i = 0; i < k; i++) {
+        dp_netdev_del_pmd(dp, pmd_list[i]);
+    }
+
     n_pmds = get_n_pmd_threads(dp);
     CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
         int old_tx_qid;
@@ -2953,9 +2984,66 @@ dp_netdev_del_pmds_on_numa(struct dp_netdev *dp, int numa_id)
         }
     }
 
+    free(pmd_list);
     free(free_idx);
 }
 
+/* Deletes all rx queues from pmd->poll_list. */
+static void
+dp_netdev_pmd_clear_poll_list(struct dp_netdev_pmd_thread *pmd)
+{
+    struct rxq_poll *poll;
+
+    ovs_mutex_lock(&pmd->poll_mutex);
+    LIST_FOR_EACH_POP (poll, node, &pmd->poll_list) {
+        port_unref(poll->port);
+        free(poll);
+    }
+    pmd->poll_cnt = 0;
+    ovs_mutex_unlock(&pmd->poll_mutex);
+}
+
+/* Deletes all rx queues of 'port' from poll_list of pmd thread and
+ * reloads it if poll_list was changed. */
+static void
+dp_netdev_del_port_from_pmd(struct dp_netdev_port *port,
+                            struct dp_netdev_pmd_thread *pmd)
+{
+    struct rxq_poll *poll, *next;
+    bool found = false;
+
+    ovs_mutex_lock(&pmd->poll_mutex);
+    LIST_FOR_EACH_SAFE (poll, next, node, &pmd->poll_list) {
+        if (poll->port == port) {
+            found = true;
+            port_unref(poll->port);
+            list_remove(&poll->node);
+            pmd->poll_cnt--;
+            free(poll);
+        }
+    }
+    ovs_mutex_unlock(&pmd->poll_mutex);
+    if (found) {
+        dp_netdev_reload_pmd__(pmd);
+    }
+}
+
+/* Deletes all rx queues of 'port' from all pmd threads of dp and
+ * reloads them if needed. */
+static void
+dp_netdev_del_port_from_all_pmds(struct dp_netdev *dp,
+                                 struct dp_netdev_port *port)
+{
+    int numa_id = netdev_get_numa_id(port->netdev);
+    struct dp_netdev_pmd_thread *pmd;
+
+    CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
+        if (pmd->numa_id == numa_id) {
+            dp_netdev_del_port_from_pmd(port, pmd);
+       }
+    }
+}
+
 /* Returns PMD thread from this numa node with fewer rx queues to poll.
  * Returns NULL if there is no PMD threads on this numa node.
  * Can be called safely only by main thread. */
@@ -2992,6 +3080,45 @@ dp_netdev_add_rxq_to_pmd(struct dp_netdev_pmd_thread *pmd,
     pmd->poll_cnt++;
 }
 
+/* Distributes all rx queues of 'port' between all PMD threads and reloads
+ * them if needed. */
+static void
+dp_netdev_add_port_to_pmds(struct dp_netdev *dp, struct dp_netdev_port *port)
+{
+    int numa_id = netdev_get_numa_id(port->netdev);
+    struct dp_netdev_pmd_thread *pmd;
+    struct hmapx to_reload;
+    struct hmapx_node *node;
+    int i;
+
+    hmapx_init(&to_reload);
+    /* Cannot create pmd threads for invalid numa node. */
+    ovs_assert(ovs_numa_numa_id_is_valid(numa_id));
+
+    for (i = 0; i < netdev_n_rxq(port->netdev); i++) {
+        pmd = dp_netdev_less_loaded_pmd_on_numa(dp, numa_id);
+        if (!pmd) {
+            /* There is no pmd threads on this numa node. */
+            dp_netdev_set_pmds_on_numa(dp, numa_id);
+            /* Assigning of rx queues done. */
+            break;
+        }
+
+        ovs_mutex_lock(&pmd->poll_mutex);
+        dp_netdev_add_rxq_to_pmd(pmd, port, port->rxq[i]);
+        ovs_mutex_unlock(&pmd->poll_mutex);
+
+        hmapx_add(&to_reload, pmd);
+    }
+
+    HMAPX_FOR_EACH (node, &to_reload) {
+        pmd = (struct dp_netdev_pmd_thread *) node->data;
+        dp_netdev_reload_pmd__(pmd);
+    }
+
+    hmapx_destroy(&to_reload);
+}
+
 /* Checks the numa node id of 'netdev' and starts pmd threads for
  * the numa node. */
 static void
@@ -3284,48 +3411,61 @@ dp_netdev_queue_batches(struct dp_packet *pkt,
 }
 
 /* Try to process all ('cnt') the 'packets' using only the exact match cache
- * 'flow_cache'. If a flow is not found for a packet 'packets[i]', the
+ * 'pmd->flow_cache'. If a flow is not found for a packet 'packets[i]', the
  * miniflow is copied into 'keys' and the packet pointer is moved at the
  * beginning of the 'packets' array.
  *
  * The function returns the number of packets that needs to be processed in the
  * 'packets' array (they have been moved to the beginning of the vector).
+ *
+ * If 'md_is_valid' is false, the metadata in 'packets' is not valid and must be
+ * initialized by this function using 'port_no'.
  */
 static inline size_t
 emc_processing(struct dp_netdev_pmd_thread *pmd, struct dp_packet **packets,
                size_t cnt, struct netdev_flow_key *keys,
-               struct packet_batch batches[], size_t *n_batches)
+               struct packet_batch batches[], size_t *n_batches,
+               bool md_is_valid, odp_port_t port_no)
 {
     struct emc_cache *flow_cache = &pmd->flow_cache;
+    struct netdev_flow_key *key = &keys[0];
     size_t i, n_missed = 0, n_dropped = 0;
 
     for (i = 0; i < cnt; i++) {
         struct dp_netdev_flow *flow;
+        struct dp_packet *packet = packets[i];
 
-        if (OVS_UNLIKELY(dp_packet_size(packets[i]) < ETH_HEADER_LEN)) {
-            dp_packet_delete(packets[i]);
+        if (OVS_UNLIKELY(dp_packet_size(packet) < ETH_HEADER_LEN)) {
+            dp_packet_delete(packet);
             n_dropped++;
             continue;
         }
 
         if (i != cnt - 1) {
-            /* Prefetch next packet data */
+            /* Prefetch next packet data and metadata. */
             OVS_PREFETCH(dp_packet_data(packets[i+1]));
+            pkt_metadata_prefetch_init(&packets[i+1]->md);
         }
 
-        struct netdev_flow_key *key = &keys[n_missed];
-        miniflow_extract(packets[i], &key->mf);
+        if (!md_is_valid) {
+            pkt_metadata_init(&packet->md, port_no);
+        }
+        miniflow_extract(packet, &key->mf);
         key->len = 0; /* Not computed yet. */
-        key->hash = dpif_netdev_packet_get_rss_hash(packets[i], &key->mf);
+        key->hash = dpif_netdev_packet_get_rss_hash(packet, &key->mf);
 
         flow = emc_lookup(flow_cache, key);
         if (OVS_LIKELY(flow)) {
-            dp_netdev_queue_batches(packets[i], flow, &key->mf, batches,
+            dp_netdev_queue_batches(packet, flow, &key->mf, batches,
                                     n_batches);
         } else {
             /* Exact match cache missed. Group missed packets together at
              * the beginning of the 'packets' array.  */
-            packets[n_missed++] = packets[i];
+            packets[n_missed] = packet;
+            /* 'key[n_missed]' contains the key of the current packet and it
+             * must be returned to the caller. The next key should be extracted
+             * to 'keys[n_missed + 1]'. */
+            key = &keys[++n_missed];
         }
     }
 
@@ -3473,9 +3613,16 @@ fast_path_processing(struct dp_netdev_pmd_thread *pmd,
     dp_netdev_count_packet(pmd, DP_STAT_LOST, lost_cnt);
 }
 
+/* Packets enter the datapath from a port (or from recirculation) here.
+ *
+ * For performance reasons a caller may choose not to initialize the metadata
+ * in 'packets': in this case 'mdinit' is false and this function needs to
+ * initialize it using 'port_no'.  If the metadata in 'packets' is already
+ * valid, 'md_is_valid' must be true and 'port_no' will be ignored. */
 static void
-dp_netdev_input(struct dp_netdev_pmd_thread *pmd,
-                struct dp_packet **packets, int cnt)
+dp_netdev_input__(struct dp_netdev_pmd_thread *pmd,
+                  struct dp_packet **packets, int cnt,
+                  bool md_is_valid, odp_port_t port_no)
 {
 #if !defined(__CHECKER__) && !defined(_WIN32)
     const size_t PKT_ARRAY_SIZE = cnt;
@@ -3489,7 +3636,8 @@ dp_netdev_input(struct dp_netdev_pmd_thread *pmd,
     size_t newcnt, n_batches, i;
 
     n_batches = 0;
-    newcnt = emc_processing(pmd, packets, cnt, keys, batches, &n_batches);
+    newcnt = emc_processing(pmd, packets, cnt, keys, batches, &n_batches,
+                            md_is_valid, port_no);
     if (OVS_UNLIKELY(newcnt)) {
         fast_path_processing(pmd, packets, newcnt, keys, batches, &n_batches);
     }
@@ -3503,6 +3651,21 @@ dp_netdev_input(struct dp_netdev_pmd_thread *pmd,
     }
 }
 
+static void
+dp_netdev_input(struct dp_netdev_pmd_thread *pmd,
+                struct dp_packet **packets, int cnt,
+                odp_port_t port_no)
+{
+     dp_netdev_input__(pmd, packets, cnt, false, port_no);
+}
+
+static void
+dp_netdev_recirculate(struct dp_netdev_pmd_thread *pmd,
+                      struct dp_packet **packets, int cnt)
+{
+     dp_netdev_input__(pmd, packets, cnt, true, 0);
+}
+
 struct dp_netdev_execute_aux {
     struct dp_netdev_pmd_thread *pmd;
 };
@@ -3606,7 +3769,7 @@ dp_execute_cb(void *aux_, struct dp_packet **packets, int cnt,
             err = push_tnl_action(dp, a, packets, cnt);
             if (!err) {
                 (*depth)++;
-                dp_netdev_input(pmd, packets, cnt);
+                dp_netdev_recirculate(pmd, packets, cnt);
                 (*depth)--;
             } else {
                 dp_netdev_drop_packets(tnl_pkt, cnt, !may_steal);
@@ -3637,7 +3800,7 @@ dp_execute_cb(void *aux_, struct dp_packet **packets, int cnt,
                     }
 
                     (*depth)++;
-                    dp_netdev_input(pmd, packets, cnt);
+                    dp_netdev_recirculate(pmd, packets, cnt);
                     (*depth)--;
                 } else {
                     dp_netdev_drop_packets(tnl_pkt, cnt, !may_steal);
@@ -3695,7 +3858,7 @@ dp_execute_cb(void *aux_, struct dp_packet **packets, int cnt,
             }
 
             (*depth)++;
-            dp_netdev_input(pmd, packets, cnt);
+            dp_netdev_recirculate(pmd, packets, cnt);
             (*depth)--;
 
             return;