dpif-netdev: Create multiple tx/rx queues when adding dpdk interface.
authorAlex Wang <alexw@nicira.com>
Tue, 17 Jun 2014 17:52:20 +0000 (10:52 -0700)
committerAlex Wang <alexw@nicira.com>
Mon, 15 Sep 2014 18:43:48 +0000 (11:43 -0700)
Before this commit, ovs creates one tx and one rx queue for
each dpdk interface and uses only one poll thread for handling
I/O of all dpdk interfaces.  An upcoming patch will allow multiple
poll threads be created.  As a preparation, this commit changes
the dpif-netdev to create multiple tx/rx queues when the dpdk
interface is added.

Specifically, the number of rx queues will still be one per-dpdk
interface for this commit.  But upcoming work will allow user
create multiple rx queues.  The number of tx queues will be the
number of cpu cores on the machine.  Although not all the tx queues
will be used, each poll thread will have its own queue for
transmission on the dpdk interface.

Signed-off-by: Alex Wang <alexw@nicira.com>
Acked-by: Pravin B Shelar <pshelar@nicira.com>
lib/dpif-netdev.c
lib/netdev-dpdk.c

index 5486dd6..12e3571 100644 (file)
@@ -52,6 +52,7 @@
 #include "odp-util.h"
 #include "ofp-print.h"
 #include "ofpbuf.h"
+#include "ovs-numa.h"
 #include "ovs-rcu.h"
 #include "packet-dpif.h"
 #include "packets.h"
@@ -744,6 +745,21 @@ do_add_port(struct dp_netdev *dp, const char *devname, const char *type,
         return EINVAL;
     }
 
+    if (netdev_is_pmd(netdev)) {
+        int n_cores = ovs_numa_get_n_cores();
+
+        if (n_cores == OVS_CORE_UNSPEC) {
+            VLOG_ERR("%s, cannot get cpu core info", devname);
+            return ENOENT;
+        }
+        /* There can only be ovs_numa_get_n_cores() pmd threads,
+         * so creates a tx_q for each. */
+        error = netdev_set_multiq(netdev, n_cores, NR_QUEUE);
+        if (error) {
+            VLOG_ERR("%s, cannot set multiq", devname);
+            return errno;
+        }
+    }
     port = xzalloc(sizeof *port);
     port->port_no = port_no;
     port->netdev = netdev;
index 1c1c6e6..e4672f1 100644 (file)
@@ -154,6 +154,8 @@ struct dpdk_mp {
     struct list list_node OVS_GUARDED_BY(dpdk_mutex);
 };
 
+/* There should be one 'struct dpdk_tx_queue' created for
+ * each cpu core. */
 struct dpdk_tx_queue {
     rte_spinlock_t tx_lock;
     int count;
@@ -182,7 +184,7 @@ struct netdev_dpdk {
     int port_id;
     int max_packet_len;
 
-    struct dpdk_tx_queue tx_q[NR_QUEUE];
+    struct dpdk_tx_queue *tx_q;
 
     struct ovs_mutex mutex OVS_ACQ_AFTER(dpdk_mutex);
 
@@ -461,23 +463,30 @@ netdev_dpdk_alloc(void)
     return &netdev->up;
 }
 
+static void
+netdev_dpdk_set_txq(struct netdev_dpdk *netdev, unsigned int n_txqs)
+{
+    int i;
+
+    netdev->tx_q = dpdk_rte_mzalloc(n_txqs * sizeof *netdev->tx_q);
+    for (i = 0; i < n_txqs; i++) {
+        rte_spinlock_init(&netdev->tx_q[i].tx_lock);
+    }
+}
+
 static int
-netdev_dpdk_init(struct netdev *netdev_, unsigned int port_no) OVS_REQUIRES(dpdk_mutex)
+netdev_dpdk_init(struct netdev *netdev_, unsigned int port_no)
+    OVS_REQUIRES(dpdk_mutex)
 {
     struct netdev_dpdk *netdev = netdev_dpdk_cast(netdev_);
     int err = 0;
-    int i;
 
     ovs_mutex_init(&netdev->mutex);
 
     ovs_mutex_lock(&netdev->mutex);
 
-    for (i = 0; i < NR_QUEUE; i++) {
-        rte_spinlock_init(&netdev->tx_q[i].tx_lock);
-    }
-
+    netdev_dpdk_set_txq(netdev, NR_QUEUE);
     netdev->port_id = port_no;
-
     netdev->flags = 0;
     netdev->mtu = ETHER_MTU;
     netdev->max_packet_len = MTU_TO_MAX_LEN(netdev->mtu);
@@ -501,6 +510,9 @@ netdev_dpdk_init(struct netdev *netdev_, unsigned int port_no) OVS_REQUIRES(dpdk
     list_push_back(&dpdk_list, &netdev->list_node);
 
 unlock:
+    if (err) {
+        rte_free(netdev->tx_q);
+    }
     ovs_mutex_unlock(&netdev->mutex);
     return err;
 }
@@ -552,6 +564,7 @@ netdev_dpdk_destruct(struct netdev *netdev_)
     ovs_mutex_unlock(&dev->mutex);
 
     ovs_mutex_lock(&dpdk_mutex);
+    rte_free(dev->tx_q);
     list_remove(&dev->list_node);
     dpdk_mp_put(dev->dpdk_mp);
     ovs_mutex_unlock(&dpdk_mutex);
@@ -609,6 +622,10 @@ netdev_dpdk_set_multiq(struct netdev *netdev_, unsigned int n_txq,
     netdev->up.n_txq = n_txq;
     netdev->up.n_rxq = n_rxq;
     err = dpdk_eth_dev_init(netdev);
+    if (!err && netdev->up.n_txq != n_txq) {
+        rte_free(netdev->tx_q);
+        netdev_dpdk_set_txq(netdev, n_txq);
+    }
     ovs_mutex_unlock(&netdev->mutex);
 
     return err;
@@ -840,7 +857,7 @@ netdev_dpdk_send(struct netdev *netdev, int qid, struct dpif_packet **pkts,
         int next_tx_idx = 0;
         int dropped = 0;
 
-        qid = rte_lcore_id() % NR_QUEUE;
+        qid = rte_lcore_id();
 
         for (i = 0; i < cnt; i++) {
             int size = ofpbuf_size(&pkts[i]->ofpbuf);