dpif-netdev: Create multiple pmd threads by default.
authorAlex Wang <alexw@nicira.com>
Fri, 5 Sep 2014 21:14:20 +0000 (14:14 -0700)
committerAlex Wang <alexw@nicira.com>
Mon, 15 Sep 2014 18:43:49 +0000 (11:43 -0700)
With this commit, ovs by default will create one pmd thread
for each numa node and pin the pmd thread to available cpu
core on the numa node.

NON_PMD_CORE_ID (currently 0) is used to reserve a particular
cpu core for the I/O of all non-pmd threads.  No pmd thread
can be pinned to this reserved core.

As side-effects of this commit:

-  pmd thread will not be created, if there is no dpdk interface
   from the corresponding numa node added to ovs.

- the exact-match cache for non-pmd threads is removed from
  'struct dp_netdev'.  Instead, all non-pmd threads will use
  the exact-match cache defined in the 'struct dp_netdev_pmd_thread'
  for NON_PMD_CORE_ID.

- the rx packet processing functions are refactored to use
  'struct dp_netdev_pmd_thread' as input.

- the 'netdev_send()' function will be called with the proper
  queue id.

- both pmd and non-pmd threads can call the dpif_netdev_execute().
  so, use a per-thread key to help recognize the calling thread.

Signed-off-by: Alex Wang <alexw@nicira.com>
Acked-by: Pravin B Shelar <pshelar@nicira.com>
lib/dpif-netdev.c
lib/dpif-netdev.h
lib/netdev-dpdk.c

index 12e3571..3f69219 100644 (file)
@@ -159,7 +159,6 @@ struct emc_cache {
  *
  *    dp_netdev_mutex (global)
  *    port_mutex
- *    emc_mutex
  *    flow_mutex
  */
 struct dp_netdev {
@@ -196,17 +195,16 @@ struct dp_netdev {
     upcall_callback *upcall_cb;  /* Callback function for executing upcalls. */
     void *upcall_aux;
 
-    /* Forwarding threads. */
-    struct latch exit_latch;
-    struct pmd_thread *pmd_threads;
-    size_t n_pmd_threads;
-    int pmd_count;
-
-    /* Exact match cache for non-pmd devices.
-     * Pmd devices use instead each thread's flow_cache for this purpose.
-     * Protected by emc_mutex */
-    struct emc_cache flow_cache OVS_GUARDED;
-    struct ovs_mutex emc_mutex;
+    /* Stores all 'struct dp_netdev_pmd_thread's. */
+    struct cmap poll_threads;
+
+    /* Protects the access of the 'struct dp_netdev_pmd_thread'
+     * instance for non-pmd thread. */
+    struct ovs_mutex non_pmd_mutex;
+
+    /* Each pmd thread will store its pointer to
+     * 'struct dp_netdev_pmd_thread' in 'per_pmd_key'. */
+    ovsthread_key_t per_pmd_key;
 };
 
 static struct dp_netdev_port *dp_netdev_lookup_port(const struct dp_netdev *dp,
@@ -341,15 +339,25 @@ static void dp_netdev_actions_free(struct dp_netdev_actions *);
  *
  * DPDK used PMD for accessing NIC.
  *
- * A thread that receives packets from PMD ports, looks them up in the flow
- * table, and executes the actions it finds.
+ * Note, instance with cpu core id NON_PMD_CORE_ID will be reserved for
+ * I/O of all non-pmd threads.  There will be no actual thread created
+ * for the instance.
  **/
-struct pmd_thread {
+struct dp_netdev_pmd_thread {
     struct dp_netdev *dp;
+    struct cmap_node node;          /* In 'dp->poll_threads'. */
+    /* Per thread exact-match cache.  Note, the instance for cpu core
+     * NON_PMD_CORE_ID can be accessed by multiple threads, and thusly
+     * need to be protected (e.g. by 'dp_netdev_mutex').  All other
+     * instances will only be accessed by its own pmd thread. */
     struct emc_cache flow_cache;
+    struct latch exit_latch;        /* For terminating the pmd thread. */
+    atomic_uint change_seq;         /* For reloading pmd ports. */
     pthread_t thread;
-    int id;
-    atomic_uint change_seq;
+    int index;                      /* Idx of this pmd thread among pmd*/
+                                    /* threads on same numa node. */
+    int core_id;                    /* CPU core id of this pmd thread. */
+    int numa_id;                    /* numa node id of this pmd thread. */
 };
 
 #define PMD_INITIAL_SEQ 1
@@ -375,18 +383,22 @@ static void do_del_port(struct dp_netdev *dp, struct dp_netdev_port *)
     OVS_REQUIRES(dp->port_mutex);
 static int dpif_netdev_open(const struct dpif_class *, const char *name,
                             bool create, struct dpif **);
-static void dp_netdev_execute_actions(struct dp_netdev *dp,
+static void dp_netdev_execute_actions(struct dp_netdev_pmd_thread *pmd,
                                       struct dpif_packet **, int c,
                                       bool may_steal, struct pkt_metadata *,
-                                      struct emc_cache *flow_cache,
                                       const struct nlattr *actions,
                                       size_t actions_len);
-static void dp_netdev_input(struct dp_netdev *, struct emc_cache *,
+static void dp_netdev_input(struct dp_netdev_pmd_thread *,
                             struct dpif_packet **, int cnt,
                             struct pkt_metadata *);
-
-static void dp_netdev_set_pmd_threads(struct dp_netdev *, int n);
 static void dp_netdev_disable_upcall(struct dp_netdev *);
+static void dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd,
+                                    struct dp_netdev *dp, int index,
+                                    int core_id, int numa_id);
+static struct dp_netdev_pmd_thread *dp_netdev_get_nonpmd(struct dp_netdev *dp);
+static void dp_netdev_destroy_all_pmds(struct dp_netdev *dp);
+static void dp_netdev_del_pmds_on_numa(struct dp_netdev *dp, int numa_id);
+static void dp_netdev_set_pmds_on_numa(struct dp_netdev *dp, int numa_id);
 
 static void emc_clear_entry(struct emc_entry *ce);
 
@@ -525,6 +537,7 @@ create_dp_netdev(const char *name, const struct dpif_class *class,
     OVS_REQUIRES(dp_netdev_mutex)
 {
     struct dp_netdev *dp;
+    struct dp_netdev_pmd_thread *non_pmd;
     int error;
 
     dp = xzalloc(sizeof *dp);
@@ -544,7 +557,6 @@ create_dp_netdev(const char *name, const struct dpif_class *class,
     ovs_mutex_init(&dp->port_mutex);
     cmap_init(&dp->ports);
     dp->port_seq = seq_create();
-    latch_init(&dp->exit_latch);
     fat_rwlock_init(&dp->upcall_rwlock);
 
     /* Disable upcalls by default. */
@@ -552,6 +564,16 @@ create_dp_netdev(const char *name, const struct dpif_class *class,
     dp->upcall_aux = NULL;
     dp->upcall_cb = NULL;
 
+    cmap_init(&dp->poll_threads);
+    ovs_mutex_init_recursive(&dp->non_pmd_mutex);
+    ovsthread_key_create(&dp->per_pmd_key, NULL);
+
+    /* Reserves the core NON_PMD_CORE_ID for all non-pmd threads. */
+    ovs_numa_try_pin_core_specific(NON_PMD_CORE_ID);
+    non_pmd = xzalloc(sizeof *non_pmd);
+    dp_netdev_configure_pmd(non_pmd, dp, 0, NON_PMD_CORE_ID,
+                            OVS_NUMA_UNSPEC);
+
     ovs_mutex_lock(&dp->port_mutex);
     error = do_add_port(dp, name, "internal", ODPP_LOCAL);
     ovs_mutex_unlock(&dp->port_mutex);
@@ -560,9 +582,6 @@ create_dp_netdev(const char *name, const struct dpif_class *class,
         return error;
     }
 
-    ovs_mutex_init_recursive(&dp->emc_mutex);
-    emc_cache_init(&dp->flow_cache);
-
     *dpp = dp;
     return 0;
 }
@@ -604,8 +623,9 @@ dp_netdev_free(struct dp_netdev *dp)
 
     shash_find_and_delete(&dp_netdevs, dp->name);
 
-    dp_netdev_set_pmd_threads(dp, 0);
-    free(dp->pmd_threads);
+    dp_netdev_destroy_all_pmds(dp);
+    ovs_mutex_destroy(&dp->non_pmd_mutex);
+    ovsthread_key_delete(dp->per_pmd_key);
 
     dp_netdev_flow_flush(dp);
     ovs_mutex_lock(&dp->port_mutex);
@@ -626,10 +646,6 @@ dp_netdev_free(struct dp_netdev *dp)
     seq_destroy(dp->port_seq);
     cmap_destroy(&dp->ports);
     fat_rwlock_destroy(&dp->upcall_rwlock);
-    latch_destroy(&dp->exit_latch);
-
-    emc_cache_uninit(&dp->flow_cache);
-    ovs_mutex_destroy(&dp->emc_mutex);
 
     free(CONST_CAST(char *, dp->name));
     free(dp);
@@ -697,15 +713,22 @@ dpif_netdev_get_stats(const struct dpif *dpif, struct dpif_dp_stats *stats)
 }
 
 static void
-dp_netdev_reload_pmd_threads(struct dp_netdev *dp)
+dp_netdev_reload_pmd__(struct dp_netdev_pmd_thread *pmd)
 {
-    int i;
+    int old_seq;
+
+    atomic_add_relaxed(&pmd->change_seq, 1, &old_seq);
+}
 
-    for (i = 0; i < dp->n_pmd_threads; i++) {
-        struct pmd_thread *f = &dp->pmd_threads[i];
-        int old_seq;
+/* Causes all pmd threads to reload its tx/rx devices.
+ * Must be called after adding/removing ports. */
+static void
+dp_netdev_reload_pmds(struct dp_netdev *dp)
+{
+    struct dp_netdev_pmd_thread *pmd;
 
-        atomic_add_relaxed(&f->change_seq, 1, &old_seq);
+    CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
+        dp_netdev_reload_pmd__(pmd);
     }
 }
 
@@ -793,9 +816,8 @@ do_add_port(struct dp_netdev *dp, const char *devname, const char *type,
     port->sf = sf;
 
     if (netdev_is_pmd(netdev)) {
-        dp->pmd_count++;
-        dp_netdev_set_pmd_threads(dp, NR_PMD_THREADS);
-        dp_netdev_reload_pmd_threads(dp);
+        dp_netdev_set_pmds_on_numa(dp, netdev_get_numa_id(netdev));
+        dp_netdev_reload_pmds(dp);
     }
     ovs_refcount_init(&port->ref_cnt);
 
@@ -946,6 +968,39 @@ get_port_by_name(struct dp_netdev *dp,
     return ENOENT;
 }
 
+static int
+get_n_pmd_threads_on_numa(struct dp_netdev *dp, int numa_id)
+{
+    struct dp_netdev_pmd_thread *pmd;
+    int n_pmds = 0;
+
+    CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
+        if (pmd->numa_id == numa_id) {
+            n_pmds++;
+        }
+    }
+
+    return n_pmds;
+}
+
+/* Returns 'true' if there is a port with pmd netdev and the netdev
+ * is on numa node 'numa_id'. */
+static bool
+has_pmd_port_for_numa(struct dp_netdev *dp, int numa_id)
+{
+    struct dp_netdev_port *port;
+
+    CMAP_FOR_EACH (port, node, &dp->ports) {
+        if (netdev_is_pmd(port->netdev)
+            && netdev_get_numa_id(port->netdev) == numa_id) {
+            return true;
+        }
+    }
+
+    return false;
+}
+
+
 static void
 do_del_port(struct dp_netdev *dp, struct dp_netdev_port *port)
     OVS_REQUIRES(dp->port_mutex)
@@ -953,7 +1008,14 @@ do_del_port(struct dp_netdev *dp, struct dp_netdev_port *port)
     cmap_remove(&dp->ports, &port->node, hash_odp_port(port->port_no));
     seq_change(dp->port_seq);
     if (netdev_is_pmd(port->netdev)) {
-        dp_netdev_reload_pmd_threads(dp);
+        int numa_id = netdev_get_numa_id(port->netdev);
+
+        /* If there is no netdev on the numa node, deletes the pmd threads
+         * for that numa.  Else, just reloads the queues.  */
+        if (!has_pmd_port_for_numa(dp, numa_id)) {
+            dp_netdev_del_pmds_on_numa(dp, numa_id);
+        }
+        dp_netdev_reload_pmds(dp);
     }
 
     port_unref(port);
@@ -1710,8 +1772,10 @@ dpif_netdev_flow_dump_next(struct dpif_flow_dump_thread *thread_,
 
 static int
 dpif_netdev_execute(struct dpif *dpif, struct dpif_execute *execute)
+    OVS_NO_THREAD_SAFETY_ANALYSIS
 {
     struct dp_netdev *dp = get_dp_netdev(dpif);
+    struct dp_netdev_pmd_thread *pmd;
     struct dpif_packet packet, *pp;
     struct pkt_metadata *md = &execute->md;
 
@@ -1723,11 +1787,24 @@ dpif_netdev_execute(struct dpif *dpif, struct dpif_execute *execute)
     packet.ofpbuf = *execute->packet;
     pp = &packet;
 
-    ovs_mutex_lock(&dp->emc_mutex);
-    dp_netdev_execute_actions(dp, &pp, 1, false, md,
-                              &dp->flow_cache, execute->actions,
+    /* Tries finding the 'pmd'.  If NULL is returned, that means
+     * the current thread is a non-pmd thread and should use
+     * dp_netdev_get_nonpmd(). */
+    pmd = ovsthread_getspecific(dp->per_pmd_key);
+    if (!pmd) {
+        pmd = dp_netdev_get_nonpmd(dp);
+    }
+
+    /* If the current thread is non-pmd thread, acquires
+     * the 'non_pmd_mutex'. */
+    if (pmd->core_id == NON_PMD_CORE_ID) {
+        ovs_mutex_lock(&dp->non_pmd_mutex);
+    }
+    dp_netdev_execute_actions(pmd, &pp, 1, false, md, execute->actions,
                               execute->actions_len);
-    ovs_mutex_unlock(&dp->emc_mutex);
+    if (pmd->core_id == NON_PMD_CORE_ID) {
+        ovs_mutex_unlock(&dp->non_pmd_mutex);
+    }
 
     /* Even though may_steal is set to false, some actions could modify or
      * reallocate the ofpbuf memory. We need to pass those changes to the
@@ -1804,8 +1881,7 @@ dp_netdev_actions_free(struct dp_netdev_actions *actions)
 \f
 
 static void
-dp_netdev_process_rxq_port(struct dp_netdev *dp,
-                           struct emc_cache *flow_cache,
+dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd,
                            struct dp_netdev_port *port,
                            struct netdev_rxq *rxq)
 {
@@ -1817,7 +1893,7 @@ dp_netdev_process_rxq_port(struct dp_netdev *dp,
         struct pkt_metadata md = PKT_METADATA_INITIALIZER(port->port_no);
 
         *recirc_depth_get() = 0;
-        dp_netdev_input(dp, flow_cache, packets, cnt, &md);
+        dp_netdev_input(pmd, packets, cnt, &md);
     } else if (error != EAGAIN && error != EOPNOTSUPP) {
         static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
 
@@ -1831,19 +1907,19 @@ dpif_netdev_run(struct dpif *dpif)
 {
     struct dp_netdev_port *port;
     struct dp_netdev *dp = get_dp_netdev(dpif);
+    struct dp_netdev_pmd_thread *non_pmd = dp_netdev_get_nonpmd(dp);
 
-    ovs_mutex_lock(&dp->emc_mutex);
+    ovs_mutex_lock(&dp->non_pmd_mutex);
     CMAP_FOR_EACH (port, node, &dp->ports) {
         if (!netdev_is_pmd(port->netdev)) {
             int i;
 
             for (i = 0; i < netdev_n_rxq(port->netdev); i++) {
-                dp_netdev_process_rxq_port(dp, &dp->flow_cache, port,
-                                           port->rxq[i]);
+                dp_netdev_process_rxq_port(non_pmd, port, port->rxq[i]);
             }
         }
     }
-    ovs_mutex_unlock(&dp->emc_mutex);
+    ovs_mutex_unlock(&dp->non_pmd_mutex);
 }
 
 static void
@@ -1871,33 +1947,32 @@ struct rxq_poll {
 };
 
 static int
-pmd_load_queues(struct pmd_thread *f,
+pmd_load_queues(struct dp_netdev_pmd_thread *pmd,
                 struct rxq_poll **ppoll_list, int poll_cnt)
 {
-    struct dp_netdev *dp = f->dp;
     struct rxq_poll *poll_list = *ppoll_list;
     struct dp_netdev_port *port;
-    int id = f->id;
-    int index;
-    int i;
+    int n_pmds_on_numa, index, i;
 
     /* Simple scheduler for netdev rx polling. */
     for (i = 0; i < poll_cnt; i++) {
-         port_unref(poll_list[i].port);
+        port_unref(poll_list[i].port);
     }
 
     poll_cnt = 0;
+    n_pmds_on_numa = get_n_pmd_threads_on_numa(pmd->dp, pmd->numa_id);
     index = 0;
 
-    CMAP_FOR_EACH (port, node, &f->dp->ports) {
+    CMAP_FOR_EACH (port, node, &pmd->dp->ports) {
         /* Calls port_try_ref() to prevent the main thread
          * from deleting the port. */
         if (port_try_ref(port)) {
-            if (netdev_is_pmd(port->netdev)) {
+            if (netdev_is_pmd(port->netdev)
+                && netdev_get_numa_id(port->netdev) == pmd->numa_id) {
                 int i;
 
                 for (i = 0; i < netdev_n_rxq(port->netdev); i++) {
-                    if ((index % dp->n_pmd_threads) == id) {
+                    if ((index % n_pmds_on_numa) == pmd->index) {
                         poll_list = xrealloc(poll_list,
                                         sizeof *poll_list * (poll_cnt + 1));
 
@@ -1921,8 +1996,7 @@ pmd_load_queues(struct pmd_thread *f,
 static void *
 pmd_thread_main(void *f_)
 {
-    struct pmd_thread *f = f_;
-    struct dp_netdev *dp = f->dp;
+    struct dp_netdev_pmd_thread *pmd = f_;
     unsigned int lc = 0;
     struct rxq_poll *poll_list;
     unsigned int port_seq = PMD_INITIAL_SEQ;
@@ -1932,17 +2006,18 @@ pmd_thread_main(void *f_)
     poll_cnt = 0;
     poll_list = NULL;
 
-    pmd_thread_setaffinity_cpu(f->id);
+    /* Stores the pmd thread's 'pmd' to 'per_pmd_key'. */
+    ovsthread_setspecific(pmd->dp->per_pmd_key, pmd);
+    pmd_thread_setaffinity_cpu(pmd->core_id);
 reload:
-    emc_cache_init(&f->flow_cache);
-    poll_cnt = pmd_load_queues(f, &poll_list, poll_cnt);
+    emc_cache_init(&pmd->flow_cache);
+    poll_cnt = pmd_load_queues(pmd, &poll_list, poll_cnt);
 
     for (;;) {
         int i;
 
         for (i = 0; i < poll_cnt; i++) {
-            dp_netdev_process_rxq_port(dp, &f->flow_cache, poll_list[i].port,
-                                       poll_list[i].rx);
+            dp_netdev_process_rxq_port(pmd, poll_list[i].port, poll_list[i].rx);
         }
 
         if (lc++ > 1024) {
@@ -1952,7 +2027,7 @@ reload:
 
             ovsrcu_quiesce();
 
-            atomic_read_relaxed(&f->change_seq, &seq);
+            atomic_read_relaxed(&pmd->change_seq, &seq);
             if (seq != port_seq) {
                 port_seq = seq;
                 break;
@@ -1960,9 +2035,9 @@ reload:
         }
     }
 
-    emc_cache_uninit(&f->flow_cache);
+    emc_cache_uninit(&pmd->flow_cache);
 
-    if (!latch_is_set(&f->dp->exit_latch)){
+    if (!latch_is_set(&pmd->exit_latch)){
         goto reload;
     }
 
@@ -2004,40 +2079,124 @@ dpif_netdev_enable_upcall(struct dpif *dpif)
     dp_netdev_enable_upcall(dp);
 }
 
+/* Returns the pointer to the dp_netdev_pmd_thread for non-pmd threads. */
+static struct dp_netdev_pmd_thread *
+dp_netdev_get_nonpmd(struct dp_netdev *dp)
+{
+    struct dp_netdev_pmd_thread *pmd;
+    struct cmap_node *pnode;
+
+    pnode = cmap_find(&dp->poll_threads, hash_int(NON_PMD_CORE_ID, 0));
+    ovs_assert(pnode);
+    pmd = CONTAINER_OF(pnode, struct dp_netdev_pmd_thread, node);
+
+    return pmd;
+}
+
+/* Configures the 'pmd' based on the input argument. */
+static void
+dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct dp_netdev *dp,
+                        int index, int core_id, int numa_id)
+{
+    pmd->dp = dp;
+    pmd->index = index;
+    pmd->core_id = core_id;
+    pmd->numa_id = numa_id;
+    latch_init(&pmd->exit_latch);
+    atomic_init(&pmd->change_seq, PMD_INITIAL_SEQ);
+    /* init the 'flow_cache' since there is no
+     * actual thread created for NON_PMD_CORE_ID. */
+    if (core_id == NON_PMD_CORE_ID) {
+        emc_cache_init(&pmd->flow_cache);
+    }
+    cmap_insert(&dp->poll_threads, CONST_CAST(struct cmap_node *, &pmd->node),
+                hash_int(core_id, 0));
+}
+
+/* Stops the pmd thread, removes it from the 'dp->poll_threads'
+ * and destroys the struct. */
 static void
-dp_netdev_set_pmd_threads(struct dp_netdev *dp, int n)
+dp_netdev_del_pmd(struct dp_netdev_pmd_thread *pmd)
 {
-    int i;
+    /* Uninit the 'flow_cache' since there is
+     * no actual thread uninit it. */
+    if (pmd->core_id == NON_PMD_CORE_ID) {
+        emc_cache_uninit(&pmd->flow_cache);
+    } else {
+        latch_set(&pmd->exit_latch);
+        dp_netdev_reload_pmd__(pmd);
+        ovs_numa_unpin_core(pmd->core_id);
+        xpthread_join(pmd->thread, NULL);
+    }
+    cmap_remove(&pmd->dp->poll_threads, &pmd->node, hash_int(pmd->core_id, 0));
+    latch_destroy(&pmd->exit_latch);
+    free(pmd);
+}
 
-    if (n == dp->n_pmd_threads) {
-        return;
+/* Destroys all pmd threads. */
+static void
+dp_netdev_destroy_all_pmds(struct dp_netdev *dp)
+{
+    struct dp_netdev_pmd_thread *pmd;
+
+    CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
+        dp_netdev_del_pmd(pmd);
     }
+}
 
-    /* Stop existing threads. */
-    latch_set(&dp->exit_latch);
-    dp_netdev_reload_pmd_threads(dp);
-    for (i = 0; i < dp->n_pmd_threads; i++) {
-        struct pmd_thread *f = &dp->pmd_threads[i];
+/* Deletes all pmd threads on numa node 'numa_id'. */
+static void
+dp_netdev_del_pmds_on_numa(struct dp_netdev *dp, int numa_id)
+{
+    struct dp_netdev_pmd_thread *pmd;
 
-        xpthread_join(f->thread, NULL);
+    CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
+        if (pmd->numa_id == numa_id) {
+            dp_netdev_del_pmd(pmd);
+        }
     }
-    latch_poll(&dp->exit_latch);
-    free(dp->pmd_threads);
+}
 
-    /* Start new threads. */
-    dp->pmd_threads = xmalloc(n * sizeof *dp->pmd_threads);
-    dp->n_pmd_threads = n;
+/* Checks the numa node id of 'netdev' and starts pmd threads for
+ * the numa node. */
+static void
+dp_netdev_set_pmds_on_numa(struct dp_netdev *dp, int numa_id)
+{
+    int n_pmds;
 
-    for (i = 0; i < n; i++) {
-        struct pmd_thread *f = &dp->pmd_threads[i];
+    if (!ovs_numa_numa_id_is_valid(numa_id)) {
+        VLOG_ERR("Cannot create pmd threads due to numa id (%d)"
+                 "invalid", numa_id);
+        return ;
+    }
+
+    n_pmds = get_n_pmd_threads_on_numa(dp, numa_id);
+
+    /* If there are already pmd threads created for the numa node
+     * in which 'netdev' is on, do nothing.  Else, creates the
+     * pmd threads for the numa node. */
+    if (!n_pmds) {
+        int can_have, n_unpinned, i;
+
+        n_unpinned = ovs_numa_get_n_unpinned_cores_on_numa(numa_id);
+        if (!n_unpinned) {
+            VLOG_ERR("Cannot create pmd threads due to out of unpinned "
+                     "cores on numa node");
+            return;
+        }
 
-        f->dp = dp;
-        f->id = i;
-        atomic_init(&f->change_seq, PMD_INITIAL_SEQ);
+        /* Tries creating NR_PMD_THREADS pmd threads on the numa node. */
+        can_have = MIN(n_unpinned, NR_PMD_THREADS);
+        for (i = 0; i < can_have; i++) {
+            struct dp_netdev_pmd_thread *pmd = xzalloc(sizeof *pmd);
+            int core_id = ovs_numa_get_unpinned_core_on_numa(numa_id);
 
-        /* Each thread will distribute all devices rx-queues among
-         * themselves. */
-        f->thread = ovs_thread_create("pmd", pmd_thread_main, f);
+            dp_netdev_configure_pmd(pmd, dp, i, core_id, numa_id);
+            /* Each thread will distribute all devices rx-queues among
+             * themselves. */
+            pmd->thread = ovs_thread_create("pmd", pmd_thread_main, pmd);
+        }
+        VLOG_INFO("Created %d pmd threads on numa node %d", can_have, numa_id);
     }
 }
 
@@ -2177,8 +2336,8 @@ packet_batch_init(struct packet_batch *batch, struct dp_netdev_flow *flow,
 }
 
 static inline void
-packet_batch_execute(struct packet_batch *batch, struct dp_netdev *dp,
-                     struct emc_cache *flow_cache)
+packet_batch_execute(struct packet_batch *batch,
+                     struct dp_netdev_pmd_thread *pmd)
 {
     struct dp_netdev_actions *actions;
     struct dp_netdev_flow *flow = batch->flow;
@@ -2188,11 +2347,10 @@ packet_batch_execute(struct packet_batch *batch, struct dp_netdev *dp,
 
     actions = dp_netdev_flow_get_actions(flow);
 
-    dp_netdev_execute_actions(dp, batch->packets, batch->packet_count, true,
-                              &batch->md, flow_cache,
-                              actions->actions, actions->size);
+    dp_netdev_execute_actions(pmd, batch->packets, batch->packet_count, true,
+                              &batch->md, actions->actions, actions->size);
 
-    dp_netdev_count_packet(dp, DP_STAT_HIT, batch->packet_count);
+    dp_netdev_count_packet(pmd->dp, DP_STAT_HIT, batch->packet_count);
 }
 
 static inline bool
@@ -2247,12 +2405,13 @@ dpif_packet_swap(struct dpif_packet **a, struct dpif_packet **b)
  * 'packets' array (they have been moved to the beginning of the vector).
  */
 static inline size_t
-emc_processing(struct dp_netdev *dp, struct emc_cache *flow_cache,
-               struct dpif_packet **packets, size_t cnt,
-               struct pkt_metadata *md, struct netdev_flow_key *keys)
+emc_processing(struct dp_netdev_pmd_thread *pmd, struct dpif_packet **packets,
+               size_t cnt, struct pkt_metadata *md,
+               struct netdev_flow_key *keys)
 {
     struct netdev_flow_key key;
     struct packet_batch batches[4];
+    struct emc_cache *flow_cache = &pmd->flow_cache;
     size_t n_batches, i;
     size_t notfound_cnt = 0;
 
@@ -2285,14 +2444,14 @@ emc_processing(struct dp_netdev *dp, struct emc_cache *flow_cache,
     }
 
     for (i = 0; i < n_batches; i++) {
-        packet_batch_execute(&batches[i], dp, flow_cache);
+        packet_batch_execute(&batches[i], pmd);
     }
 
     return notfound_cnt;
 }
 
 static inline void
-fast_path_processing(struct dp_netdev *dp, struct emc_cache *flow_cache,
+fast_path_processing(struct dp_netdev_pmd_thread *pmd,
                      struct dpif_packet **packets, size_t cnt,
                      struct pkt_metadata *md, struct netdev_flow_key *keys)
 {
@@ -2305,6 +2464,8 @@ fast_path_processing(struct dp_netdev *dp, struct emc_cache *flow_cache,
     struct packet_batch batches[PKT_ARRAY_SIZE];
     const struct miniflow *mfs[PKT_ARRAY_SIZE]; /* NULL at bad packets. */
     struct cls_rule *rules[PKT_ARRAY_SIZE];
+    struct dp_netdev *dp = pmd->dp;
+    struct emc_cache *flow_cache = &pmd->flow_cache;
     size_t n_batches, i;
     bool any_miss;
 
@@ -2353,8 +2514,8 @@ fast_path_processing(struct dp_netdev *dp, struct emc_cache *flow_cache,
             /* We can't allow the packet batching in the next loop to execute
              * the actions.  Otherwise, if there are any slow path actions,
              * we'll send the packet up twice. */
-            dp_netdev_execute_actions(dp, &packets[i], 1, false, md,
-                                      flow_cache, ofpbuf_data(&actions),
+            dp_netdev_execute_actions(pmd, &packets[i], 1, false, md,
+                                      ofpbuf_data(&actions),
                                       ofpbuf_size(&actions));
 
             add_actions = ofpbuf_size(&put_actions)
@@ -2391,18 +2552,19 @@ fast_path_processing(struct dp_netdev *dp, struct emc_cache *flow_cache,
         }
 
         flow = dp_netdev_flow_cast(rules[i]);
-        emc_insert(flow_cache, mfs[i], dpif_packet_get_dp_hash(packet), flow);
+        emc_insert(flow_cache, mfs[i], dpif_packet_get_dp_hash(packet),
+                   flow);
         dp_netdev_queue_batches(packet, md, flow, mfs[i], batches, &n_batches,
                                 ARRAY_SIZE(batches));
     }
 
     for (i = 0; i < n_batches; i++) {
-        packet_batch_execute(&batches[i], dp, flow_cache);
+        packet_batch_execute(&batches[i], pmd);
     }
 }
 
 static void
-dp_netdev_input(struct dp_netdev *dp, struct emc_cache *flow_cache,
+dp_netdev_input(struct dp_netdev_pmd_thread *pmd,
                 struct dpif_packet **packets, int cnt, struct pkt_metadata *md)
 {
 #if !defined(__CHECKER__) && !defined(_WIN32)
@@ -2414,15 +2576,14 @@ dp_netdev_input(struct dp_netdev *dp, struct emc_cache *flow_cache,
     struct netdev_flow_key keys[PKT_ARRAY_SIZE];
     size_t newcnt;
 
-    newcnt = emc_processing(dp, flow_cache, packets, cnt, md, keys);
+    newcnt = emc_processing(pmd, packets, cnt, md, keys);
     if (OVS_UNLIKELY(newcnt)) {
-        fast_path_processing(dp, flow_cache, packets, newcnt, md, keys);
+        fast_path_processing(pmd, packets, newcnt, md, keys);
     }
 }
 
 struct dp_netdev_execute_aux {
-    struct dp_netdev *dp;
-    struct emc_cache *flow_cache;
+    struct dp_netdev_pmd_thread *pmd;
 };
 
 static void
@@ -2442,7 +2603,8 @@ dp_execute_cb(void *aux_, struct dpif_packet **packets, int cnt,
 {
     struct dp_netdev_execute_aux *aux = aux_;
     uint32_t *depth = recirc_depth_get();
-    struct dp_netdev *dp = aux->dp;
+    struct dp_netdev_pmd_thread *pmd= aux->pmd;
+    struct dp_netdev *dp= pmd->dp;
     int type = nl_attr_type(a);
     struct dp_netdev_port *p;
     int i;
@@ -2451,7 +2613,7 @@ dp_execute_cb(void *aux_, struct dpif_packet **packets, int cnt,
     case OVS_ACTION_ATTR_OUTPUT:
         p = dp_netdev_lookup_port(dp, u32_to_odp(nl_attr_get_u32(a)));
         if (OVS_LIKELY(p)) {
-            netdev_send(p->netdev, NETDEV_QID_NONE, packets, cnt, may_steal);
+            netdev_send(p->netdev, pmd->core_id, packets, cnt, may_steal);
         } else if (may_steal) {
             for (i = 0; i < cnt; i++) {
                 dpif_packet_delete(packets[i]);
@@ -2478,8 +2640,7 @@ dp_execute_cb(void *aux_, struct dpif_packet **packets, int cnt,
                                          DPIF_UC_ACTION, userdata, &actions,
                                          NULL);
                 if (!error || error == ENOSPC) {
-                    dp_netdev_execute_actions(dp, &packets[i], 1, false, md,
-                                              aux->flow_cache,
+                    dp_netdev_execute_actions(pmd, &packets[i], 1, false, md,
                                               ofpbuf_data(&actions),
                                               ofpbuf_size(&actions));
                 }
@@ -2541,7 +2702,7 @@ dp_execute_cb(void *aux_, struct dpif_packet **packets, int cnt,
                 /* Hash is private to each packet */
                 recirc_md.dp_hash = dpif_packet_get_dp_hash(packets[i]);
 
-                dp_netdev_input(dp, aux->flow_cache, &recirc_pkt, 1,
+                dp_netdev_input(pmd, &recirc_pkt, 1,
                                 &recirc_md);
             }
             (*depth)--;
@@ -2571,13 +2732,12 @@ dp_execute_cb(void *aux_, struct dpif_packet **packets, int cnt,
 }
 
 static void
-dp_netdev_execute_actions(struct dp_netdev *dp,
+dp_netdev_execute_actions(struct dp_netdev_pmd_thread *pmd,
                           struct dpif_packet **packets, int cnt,
                           bool may_steal, struct pkt_metadata *md,
-                          struct emc_cache *flow_cache,
                           const struct nlattr *actions, size_t actions_len)
 {
-    struct dp_netdev_execute_aux aux = {dp, flow_cache};
+    struct dp_netdev_execute_aux aux = {pmd};
 
     odp_execute_actions(&aux, packets, cnt, may_steal, md, actions,
                         actions_len, dp_execute_cb);
index fb9d0e2..d811507 100644 (file)
@@ -40,10 +40,9 @@ static inline void dp_packet_pad(struct ofpbuf *b)
     }
 }
 
-#define NETDEV_QID_NONE INT_MAX
-
 #define NR_QUEUE   1
 #define NR_PMD_THREADS 1
+#define NON_PMD_CORE_ID 0
 
 #ifdef  __cplusplus
 }
index 1081564..0a6a398 100644 (file)
@@ -493,8 +493,7 @@ netdev_dpdk_init(struct netdev *netdev_, unsigned int port_no)
 
     ovs_mutex_lock(&netdev->mutex);
 
-    /* XXX: need to discover device node at run time. */
-    netdev->socket_id = SOCKET0;
+    netdev->socket_id = rte_eth_dev_socket_id(port_no);
     netdev_dpdk_set_txq(netdev, NR_QUEUE);
     netdev->port_id = port_no;
     netdev->flags = 0;
@@ -860,8 +859,6 @@ netdev_dpdk_send(struct netdev *netdev, int qid, struct dpif_packet **pkts,
         int next_tx_idx = 0;
         int dropped = 0;
 
-        qid = rte_lcore_id();
-
         for (i = 0; i < cnt; i++) {
             int size = ofpbuf_size(&pkts[i]->ofpbuf);
             if (OVS_UNLIKELY(size > dev->max_packet_len)) {
@@ -1518,7 +1515,8 @@ pmd_thread_setaffinity_cpu(int cpu)
         return err;
     }
     /* lcore_id 0 is reseved for use by non pmd threads. */
-    RTE_PER_LCORE(_lcore_id) = cpu + 1;
+    ovs_assert(cpu);
+    RTE_PER_LCORE(_lcore_id) = cpu;
 
     return 0;
 }
@@ -1526,9 +1524,6 @@ pmd_thread_setaffinity_cpu(int cpu)
 void
 thread_set_nonpmd(void)
 {
-    /* We cannot have RTE_MAX_LCORE pmd threads, because lcore_id 0 is reserved
-     * for non pmd threads */
-    BUILD_ASSERT(NR_PMD_THREADS < RTE_MAX_LCORE);
     /* We have to use 0 to allow non pmd threads to perform certain DPDK
      * operations, like rte_eth_dev_configure(). */
     RTE_PER_LCORE(_lcore_id) = 0;