netdev-dpdk: Fix DPDK rings broken by multi queue
authorDavid Verbeiren <david.verbeiren@intel.com>
Tue, 14 Oct 2014 17:01:49 +0000 (19:01 +0200)
committerPravin B Shelar <pshelar@nicira.com>
Tue, 4 Nov 2014 17:21:27 +0000 (09:21 -0800)
DPDK rings don't need one queue per PMD thread and don't support multiple
queues (set_multiq function is undefined). To fix operation with DPDK rings,
this patch ignores EOPNOTSUPP error on netdev_set_multiq() and provides, for
DPDK rings, a netdev send() function that ignores the provided queue id
(= PMD thread core id).

Suggested-by: Maryam Tahhan <maryam.tahhan@intel.com>
Signed-off-by: David Verbeiren <david.verbeiren@intel.com>
Acked-by: Pravin B Shelar <pshelar@nicira.com>
lib/dpif-netdev.c
lib/netdev-dpdk.c

index 2009206..4eaaa5e 100644 (file)
@@ -822,7 +822,7 @@ do_add_port(struct dp_netdev *dp, const char *devname, const char *type,
         /* There can only be ovs_numa_get_n_cores() pmd threads,
          * so creates a txq for each. */
         error = netdev_set_multiq(netdev, n_cores, dp->n_dpdk_rxqs);
-        if (error) {
+        if (error && (error != EOPNOTSUPP)) {
             VLOG_ERR("%s, cannot set multiq", devname);
             return errno;
         }
@@ -2089,7 +2089,7 @@ dpif_netdev_pmd_set(struct dpif *dpif, unsigned int n_rxqs, const char *cmask)
                 /* Sets the new rx queue config.  */
                 err = netdev_set_multiq(port->netdev, ovs_numa_get_n_cores(),
                                         n_rxqs);
-                if (err) {
+                if (err && (err != EOPNOTSUPP)) {
                     VLOG_ERR("Failed to set dpdk interface %s rx_queue to:"
                              " %u", netdev_get_name(port->netdev),
                              n_rxqs);
index 9c93768..6d37d0e 100644 (file)
@@ -202,6 +202,7 @@ struct netdev_dpdk {
 
     /* In dpdk_list. */
     struct list list_node OVS_GUARDED_BY(dpdk_mutex);
+    rte_spinlock_t dpdkr_tx_lock;
 };
 
 struct netdev_rxq_dpdk {
@@ -502,6 +503,7 @@ netdev_dpdk_init(struct netdev *netdev_, unsigned int port_no)
     netdev->flags = 0;
     netdev->mtu = ETHER_MTU;
     netdev->max_packet_len = MTU_TO_MAX_LEN(netdev->mtu);
+    rte_spinlock_init(&netdev->dpdkr_tx_lock);
 
     netdev->dpdk_mp = dpdk_mp_get(netdev->socket_id, netdev->mtu);
     if (!netdev->dpdk_mp) {
@@ -845,15 +847,16 @@ dpdk_do_tx_copy(struct netdev *netdev, int qid, struct dpif_packet ** pkts,
     }
 }
 
-static int
-netdev_dpdk_send(struct netdev *netdev, int qid, struct dpif_packet **pkts,
-                 int cnt, bool may_steal)
+static inline void
+netdev_dpdk_send__(struct netdev_dpdk *dev, int qid,
+                   struct dpif_packet **pkts, int cnt, bool may_steal)
 {
-    struct netdev_dpdk *dev = netdev_dpdk_cast(netdev);
-    int ret;
     int i;
 
-    if (!may_steal || pkts[0]->ofpbuf.source != OFPBUF_DPDK) {
+    if (OVS_UNLIKELY(!may_steal ||
+                     pkts[0]->ofpbuf.source != OFPBUF_DPDK)) {
+        struct netdev *netdev = &dev->up;
+
         dpdk_do_tx_copy(netdev, qid, pkts, cnt);
 
         if (may_steal) {
@@ -894,9 +897,16 @@ netdev_dpdk_send(struct netdev *netdev, int qid, struct dpif_packet **pkts,
             ovs_mutex_unlock(&dev->mutex);
         }
     }
-    ret = 0;
+}
+
+static int
+netdev_dpdk_eth_send(struct netdev *netdev, int qid,
+                     struct dpif_packet **pkts, int cnt, bool may_steal)
+{
+    struct netdev_dpdk *dev = netdev_dpdk_cast(netdev);
 
-    return ret;
+    netdev_dpdk_send__(dev, qid, pkts, cnt, may_steal);
+    return 0;
 }
 
 static int
@@ -1291,12 +1301,15 @@ dpdk_ring_create(const char dev_name[], unsigned int port_no,
         return ENOMEM;
     }
 
+    /* XXX: Add support for multiquque ring. */
     err = snprintf(ring_name, 10, "%s_tx", dev_name);
     if (err < 0) {
         return -err;
     }
 
-    ivshmem->cring_tx = rte_ring_create(ring_name, DPDK_RING_SIZE, SOCKET0, 0);
+    /* Create single consumer/producer rings, netdev does explicit locking. */
+    ivshmem->cring_tx = rte_ring_create(ring_name, DPDK_RING_SIZE, SOCKET0,
+                                        RING_F_SP_ENQ | RING_F_SC_DEQ);
     if (ivshmem->cring_tx == NULL) {
         rte_free(ivshmem);
         return ENOMEM;
@@ -1307,7 +1320,9 @@ dpdk_ring_create(const char dev_name[], unsigned int port_no,
         return -err;
     }
 
-    ivshmem->cring_rx = rte_ring_create(ring_name, DPDK_RING_SIZE, SOCKET0, 0);
+    /* Create single consumer/producer rings, netdev does explicit locking. */
+    ivshmem->cring_rx = rte_ring_create(ring_name, DPDK_RING_SIZE, SOCKET0,
+                                        RING_F_SP_ENQ | RING_F_SC_DEQ);
     if (ivshmem->cring_rx == NULL) {
         rte_free(ivshmem);
         return ENOMEM;
@@ -1354,6 +1369,19 @@ dpdk_ring_open(const char dev_name[], unsigned int *eth_port_id) OVS_REQUIRES(dp
     return dpdk_ring_create(dev_name, port_no, eth_port_id);
 }
 
+static int
+netdev_dpdk_ring_send(struct netdev *netdev, int qid OVS_UNUSED,
+                      struct dpif_packet **pkts, int cnt, bool may_steal)
+{
+    struct netdev_dpdk *dev = netdev_dpdk_cast(netdev);
+
+    /* DPDK Rings have a single TX queue, Therefore needs locking. */
+    rte_spinlock_lock(&dev->dpdkr_tx_lock);
+    netdev_dpdk_send__(dev, 0, pkts, cnt, may_steal);
+    rte_spinlock_unlock(&dev->dpdkr_tx_lock);
+    return 0;
+}
+
 static int
 netdev_dpdk_ring_construct(struct netdev *netdev)
 {
@@ -1378,7 +1406,7 @@ unlock_dpdk:
     return err;
 }
 
-#define NETDEV_DPDK_CLASS(NAME, INIT, CONSTRUCT, MULTIQ)      \
+#define NETDEV_DPDK_CLASS(NAME, INIT, CONSTRUCT, MULTIQ, SEND)      \
 {                                                             \
     NAME,                                                     \
     INIT,                       /* init */                    \
@@ -1395,7 +1423,7 @@ unlock_dpdk:
     netdev_dpdk_get_numa_id,    /* get_numa_id */             \
     MULTIQ,                     /* set_multiq */              \
                                                               \
-    netdev_dpdk_send,           /* send */                    \
+    SEND,                       /* send */                    \
     NULL,                       /* send_wait */               \
                                                               \
     netdev_dpdk_set_etheraddr,                                \
@@ -1481,14 +1509,16 @@ const struct netdev_class dpdk_class =
         "dpdk",
         dpdk_class_init,
         netdev_dpdk_construct,
-        netdev_dpdk_set_multiq);
+        netdev_dpdk_set_multiq,
+        netdev_dpdk_eth_send);
 
 const struct netdev_class dpdk_ring_class =
     NETDEV_DPDK_CLASS(
         "dpdkr",
         NULL,
         netdev_dpdk_ring_construct,
-        NULL);
+        NULL,
+        netdev_dpdk_ring_send);
 
 void
 netdev_dpdk_register(void)