netdev-dpdk: add dpdk vhost-user ports
[cascardo/ovs.git] / lib / netdev-dpdk.c
index a4775fd..3af1ee7 100644 (file)
@@ -16,7 +16,6 @@
 
 #include <config.h>
 
-#include <stdio.h>
 #include <string.h>
 #include <signal.h>
 #include <stdlib.h>
 #include <sched.h>
 #include <stdlib.h>
 #include <unistd.h>
+#include <sys/stat.h>
 #include <stdio.h>
+#include <sys/types.h>
+#include <sys/stat.h>
 
+#include "dirs.h"
 #include "dp-packet.h"
 #include "dpif-netdev.h"
 #include "list.h"
@@ -68,27 +71,35 @@ static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 20);
 #define MBUF_SIZE(mtu)       (MTU_TO_MAX_LEN(mtu) + (512) + \
                              sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM)
 
-/* XXX: mempool size should be based on system resources. */
-#define NB_MBUF              (4096 * 64)
-#define MP_CACHE_SZ          (256 * 2)
+/* Max and min number of packets in the mempool.  OVS tries to allocate a
+ * mempool with MAX_NB_MBUF: if this fails (because the system doesn't have
+ * enough hugepages) we keep halving the number until the allocation succeeds
+ * or we reach MIN_NB_MBUF */
+
+#define MAX_NB_MBUF          (4096 * 64)
+#define MIN_NB_MBUF          (4096 * 4)
+#define MP_CACHE_SZ          RTE_MEMPOOL_CACHE_MAX_SIZE
+
+/* MAX_NB_MBUF can be divided by 2 many times, until MIN_NB_MBUF */
+BUILD_ASSERT_DECL(MAX_NB_MBUF % ROUND_DOWN_POW2(MAX_NB_MBUF/MIN_NB_MBUF) == 0);
+
+/* The smallest possible NB_MBUF that we're going to try should be a multiple
+ * of MP_CACHE_SZ. This is advised by DPDK documentation. */
+BUILD_ASSERT_DECL((MAX_NB_MBUF / ROUND_DOWN_POW2(MAX_NB_MBUF/MIN_NB_MBUF))
+                  % MP_CACHE_SZ == 0);
+
 #define SOCKET0              0
 
 #define NIC_PORT_RX_Q_SIZE 2048  /* Size of Physical NIC RX Queue, Max (n+32<=4096)*/
 #define NIC_PORT_TX_Q_SIZE 2048  /* Size of Physical NIC TX Queue, Max (n+32<=4096)*/
 
-/* XXX: Needs per NIC value for these constants. */
-#define RX_PTHRESH 32 /* Default values of RX prefetch threshold reg. */
-#define RX_HTHRESH 32 /* Default values of RX host threshold reg. */
-#define RX_WTHRESH 16 /* Default values of RX write-back threshold reg. */
-
-#define TX_PTHRESH 36 /* Default values of TX prefetch threshold reg. */
-#define TX_HTHRESH 0  /* Default values of TX host threshold reg. */
-#define TX_WTHRESH 0  /* Default values of TX write-back threshold reg. */
+char *cuse_dev_name = NULL;    /* Character device cuse_dev_name. */
+char *vhost_sock_dir = NULL;   /* Location of vhost-user sockets */
 
-#define MAX_PKT_BURST 32           /* Max burst size for RX/TX */
-
-/* Character device cuse_dev_name. */
-char *cuse_dev_name = NULL;
+/*
+ * Maximum amount of time in micro seconds to try and enqueue to vhost.
+ */
+#define VHOST_ENQ_RETRY_USECS 100
 
 static const struct rte_eth_conf port_conf = {
     .rxmode = {
@@ -103,8 +114,7 @@ static const struct rte_eth_conf port_conf = {
     .rx_adv_conf = {
         .rss_conf = {
             .rss_key = NULL,
-            .rss_hf = ETH_RSS_IPV4_TCP | ETH_RSS_IPV4 | ETH_RSS_IPV6
-                    | ETH_RSS_IPV4_UDP | ETH_RSS_IPV6_TCP | ETH_RSS_IPV6_UDP,
+            .rss_hf = ETH_RSS_IP | ETH_RSS_UDP | ETH_RSS_TCP,
         },
     },
     .txmode = {
@@ -112,26 +122,6 @@ static const struct rte_eth_conf port_conf = {
     },
 };
 
-static const struct rte_eth_rxconf rx_conf = {
-    .rx_thresh = {
-        .pthresh = RX_PTHRESH,
-        .hthresh = RX_HTHRESH,
-        .wthresh = RX_WTHRESH,
-    },
-};
-
-static const struct rte_eth_txconf tx_conf = {
-    .tx_thresh = {
-        .pthresh = TX_PTHRESH,
-        .hthresh = TX_HTHRESH,
-        .wthresh = TX_WTHRESH,
-    },
-    .tx_free_thresh = 0,
-    .tx_rs_thresh = 0,
-    .txq_flags = ETH_TXQ_FLAGS_NOMULTSEGS|ETH_TXQ_FLAGS_NOOFFLOADS,
-};
-
-enum { MAX_RX_QUEUE_LEN = 192 };
 enum { MAX_TX_QUEUE_LEN = 384 };
 enum { DPDK_RING_SIZE = 256 };
 BUILD_ASSERT_DECL(IS_POW2(DPDK_RING_SIZE));
@@ -139,7 +129,7 @@ enum { DRAIN_TSC = 200000ULL };
 
 enum dpdk_dev_type {
     DPDK_DEV_ETH = 0,
-    DPDK_DEV_VHOST = 1
+    DPDK_DEV_VHOST = 1,
 };
 
 static int rte_eal_init_ret = ENODEV;
@@ -156,7 +146,7 @@ static struct ovs_list dpdk_mp_list OVS_GUARDED_BY(dpdk_mutex)
 /* This mutex must be used by non pmd threads when allocating or freeing
  * mbufs through mempools. Since dpdk_queue_pkts() and dpdk_queue_flush() may
  * use mempools, a non pmd thread should hold this mutex while calling them */
-struct ovs_mutex nonpmd_mempool_mutex = OVS_MUTEX_INITIALIZER;
+static struct ovs_mutex nonpmd_mempool_mutex = OVS_MUTEX_INITIALIZER;
 
 struct dpdk_mp {
     struct rte_mempool *mp;
@@ -172,6 +162,10 @@ struct dpdk_tx_queue {
     bool flush_tx;                 /* Set to true to flush queue everytime */
                                    /* pkts are queued. */
     int count;
+    rte_spinlock_t tx_lock;        /* Protects the members and the NIC queue
+                                    * from concurrent access.  It is used only
+                                    * if the queue is shared among different
+                                    * pmd threads (see 'txq_needs_locking'). */
     uint64_t tsc;
     struct rte_mbuf *burst_pkts[MAX_TX_QUEUE_LEN];
 };
@@ -207,6 +201,8 @@ struct netdev_dpdk {
     int socket_id;
     int buf_size;
     struct netdev_stats stats;
+    /* Protects stats */
+    rte_spinlock_t stats_lock;
 
     uint8_t hwaddr[ETH_ADDR_LEN];
     enum netdev_flags flags;
@@ -214,12 +210,25 @@ struct netdev_dpdk {
     struct rte_eth_link link;
     int link_reset_cnt;
 
+    /* The user might request more txqs than the NIC has.  We remap those
+     * ('up.n_txq') on these ('real_n_txq').
+     * If the numbers match, 'txq_needs_locking' is false, otherwise it is
+     * true and we will take a spinlock on transmission */
+    int real_n_txq;
+    bool txq_needs_locking;
+
+    /* Spinlock for vhost transmission.  Other DPDK devices use spinlocks in
+     * dpdk_tx_queue */
+    rte_spinlock_t vhost_tx_lock;
+
     /* virtio-net structure for vhost device */
     OVSRCU_TYPE(struct virtio_net *) virtio_dev;
 
+    /* Identifier used to distinguish vhost devices from each other */
+    char vhost_id[PATH_MAX];
+
     /* In dpdk_list. */
     struct ovs_list list_node OVS_GUARDED_BY(dpdk_mutex);
-    rte_spinlock_t txq_lock;
 };
 
 struct netdev_rxq_dpdk {
@@ -310,6 +319,7 @@ dpdk_mp_get(int socket_id, int mtu) OVS_REQUIRES(dpdk_mutex)
 {
     struct dpdk_mp *dmp = NULL;
     char mp_name[RTE_MEMPOOL_NAMESIZE];
+    unsigned mp_size;
 
     LIST_FOR_EACH (dmp, list_node, &dpdk_mp_list) {
         if (dmp->socket_id == socket_id && dmp->mtu == mtu) {
@@ -323,20 +333,25 @@ dpdk_mp_get(int socket_id, int mtu) OVS_REQUIRES(dpdk_mutex)
     dmp->mtu = mtu;
     dmp->refcount = 1;
 
-    if (snprintf(mp_name, RTE_MEMPOOL_NAMESIZE, "ovs_mp_%d_%d", dmp->mtu,
-                 dmp->socket_id) < 0) {
-        return NULL;
-    }
+    mp_size = MAX_NB_MBUF;
+    do {
+        if (snprintf(mp_name, RTE_MEMPOOL_NAMESIZE, "ovs_mp_%d_%d_%u",
+                     dmp->mtu, dmp->socket_id, mp_size) < 0) {
+            return NULL;
+        }
 
-    dmp->mp = rte_mempool_create(mp_name, NB_MBUF, MBUF_SIZE(mtu),
-                                 MP_CACHE_SZ,
-                                 sizeof(struct rte_pktmbuf_pool_private),
-                                 rte_pktmbuf_pool_init, NULL,
-                                 ovs_rte_pktmbuf_init, NULL,
-                                 socket_id, 0);
+        dmp->mp = rte_mempool_create(mp_name, mp_size, MBUF_SIZE(mtu),
+                                     MP_CACHE_SZ,
+                                     sizeof(struct rte_pktmbuf_pool_private),
+                                     rte_pktmbuf_pool_init, NULL,
+                                     ovs_rte_pktmbuf_init, NULL,
+                                     socket_id, 0);
+    } while (!dmp->mp && rte_errno == ENOMEM && (mp_size /= 2) >= MIN_NB_MBUF);
 
     if (dmp->mp == NULL) {
         return NULL;
+    } else {
+        VLOG_DBG("Allocated \"%s\" mempool with %u mbufs", mp_name, mp_size );
     }
 
     list_push_back(&dpdk_mp_list, &dmp->list_node);
@@ -411,6 +426,7 @@ static int
 dpdk_eth_dev_init(struct netdev_dpdk *dev) OVS_REQUIRES(dpdk_mutex)
 {
     struct rte_pktmbuf_pool_private *mbp_priv;
+    struct rte_eth_dev_info info;
     struct ether_addr eth_addr;
     int diag;
     int i;
@@ -419,16 +435,21 @@ dpdk_eth_dev_init(struct netdev_dpdk *dev) OVS_REQUIRES(dpdk_mutex)
         return ENODEV;
     }
 
-    diag = rte_eth_dev_configure(dev->port_id, dev->up.n_rxq, dev->up.n_txq,
+    rte_eth_dev_info_get(dev->port_id, &info);
+    dev->up.n_rxq = MIN(info.max_rx_queues, dev->up.n_rxq);
+    dev->real_n_txq = MIN(info.max_tx_queues, dev->up.n_txq);
+
+    diag = rte_eth_dev_configure(dev->port_id, dev->up.n_rxq, dev->real_n_txq,
                                  &port_conf);
     if (diag) {
-        VLOG_ERR("eth dev config error %d",diag);
+        VLOG_ERR("eth dev config error %d. rxq:%d txq:%d", diag, dev->up.n_rxq,
+                 dev->real_n_txq);
         return -diag;
     }
 
-    for (i = 0; i < dev->up.n_txq; i++) {
+    for (i = 0; i < dev->real_n_txq; i++) {
         diag = rte_eth_tx_queue_setup(dev->port_id, i, NIC_PORT_TX_Q_SIZE,
-                                      dev->socket_id, &tx_conf);
+                                      dev->socket_id, NULL);
         if (diag) {
             VLOG_ERR("eth dev tx queue setup error %d",diag);
             return -diag;
@@ -438,7 +459,7 @@ dpdk_eth_dev_init(struct netdev_dpdk *dev) OVS_REQUIRES(dpdk_mutex)
     for (i = 0; i < dev->up.n_rxq; i++) {
         diag = rte_eth_rx_queue_setup(dev->port_id, i, NIC_PORT_RX_Q_SIZE,
                                       dev->socket_id,
-                                      &rx_conf, dev->dpdk_mp->mp);
+                                      NULL, dev->dpdk_mp->mp);
         if (diag) {
             VLOG_ERR("eth dev rx queue setup error %d",diag);
             return -diag;
@@ -485,17 +506,23 @@ netdev_dpdk_alloc(void)
 static void
 netdev_dpdk_alloc_txq(struct netdev_dpdk *netdev, unsigned int n_txqs)
 {
-    int i;
+    unsigned i;
 
     netdev->tx_q = dpdk_rte_mzalloc(n_txqs * sizeof *netdev->tx_q);
-    /* Each index is considered as a cpu core id, since there should
-     * be one tx queue for each cpu core. */
     for (i = 0; i < n_txqs; i++) {
         int numa_id = ovs_numa_get_numa_id(i);
 
-        /* If the corresponding core is not on the same numa node
-         * as 'netdev', flags the 'flush_tx'. */
-        netdev->tx_q[i].flush_tx = netdev->socket_id == numa_id;
+        if (!netdev->txq_needs_locking) {
+            /* Each index is considered as a cpu core id, since there should
+             * be one tx queue for each cpu core.  If the corresponding core
+             * is not on the same numa node as 'netdev', flags the
+             * 'flush_tx'. */
+            netdev->tx_q[i].flush_tx = netdev->socket_id == numa_id;
+        } else {
+            /* Queues are shared among CPUs. Always flush */
+            netdev->tx_q[i].flush_tx = true;
+        }
+        rte_spinlock_init(&netdev->tx_q[i].tx_lock);
     }
 }
 
@@ -511,6 +538,8 @@ netdev_dpdk_init(struct netdev *netdev_, unsigned int port_no,
     ovs_mutex_init(&netdev->mutex);
     ovs_mutex_lock(&netdev->mutex);
 
+    rte_spinlock_init(&netdev->stats_lock);
+
     /* If the 'sid' is negative, it means that the kernel fails
      * to obtain the pci numa info.  In that situation, always
      * use 'SOCKET0'. */
@@ -526,7 +555,6 @@ netdev_dpdk_init(struct netdev *netdev_, unsigned int port_no,
     netdev->flags = 0;
     netdev->mtu = ETHER_MTU;
     netdev->max_packet_len = MTU_TO_MAX_LEN(netdev->mtu);
-    rte_spinlock_init(&netdev->txq_lock);
 
     netdev->dpdk_mp = dpdk_mp_get(netdev->socket_id, netdev->mtu);
     if (!netdev->dpdk_mp) {
@@ -536,13 +564,14 @@ netdev_dpdk_init(struct netdev *netdev_, unsigned int port_no,
 
     netdev_->n_txq = NR_QUEUE;
     netdev_->n_rxq = NR_QUEUE;
+    netdev->real_n_txq = NR_QUEUE;
 
     if (type == DPDK_DEV_ETH) {
-           netdev_dpdk_alloc_txq(netdev, NR_QUEUE);
-           err = dpdk_eth_dev_init(netdev);
-           if (err) {
-                   goto unlock;
-           }
+        netdev_dpdk_alloc_txq(netdev, NR_QUEUE);
+        err = dpdk_eth_dev_init(netdev);
+        if (err) {
+            goto unlock;
+        }
     }
 
     list_push_back(&dpdk_list, &netdev->list_node);
@@ -566,23 +595,56 @@ dpdk_dev_parse_name(const char dev_name[], const char prefix[],
     }
 
     cport = dev_name + strlen(prefix);
-    *port_no = strtol(cport, 0, 0); /* string must be null terminated */
+    *port_no = strtol(cport, NULL, 0); /* string must be null terminated */
     return 0;
 }
 
 static int
-netdev_dpdk_vhost_construct(struct netdev *netdev_)
+vhost_construct_helper(struct netdev *netdev_)
 {
-    int err;
+    struct netdev_dpdk *netdev = netdev_dpdk_cast(netdev_);
 
     if (rte_eal_init_ret) {
         return rte_eal_init_ret;
     }
 
+    rte_spinlock_init(&netdev->vhost_tx_lock);
+    return netdev_dpdk_init(netdev_, -1, DPDK_DEV_VHOST);
+}
+
+static int
+netdev_dpdk_vhost_cuse_construct(struct netdev *netdev_)
+{
+    struct netdev_dpdk *netdev = netdev_dpdk_cast(netdev_);
+    int err;
+
     ovs_mutex_lock(&dpdk_mutex);
-    err = netdev_dpdk_init(netdev_, -1, DPDK_DEV_VHOST);
+    strncpy(netdev->vhost_id, netdev->up.name, sizeof(netdev->vhost_id));
+    err = vhost_construct_helper(netdev_);
     ovs_mutex_unlock(&dpdk_mutex);
+    return err;
+}
+
+static int
+netdev_dpdk_vhost_user_construct(struct netdev *netdev_)
+{
+    struct netdev_dpdk *netdev = netdev_dpdk_cast(netdev_);
+    int err;
 
+    ovs_mutex_lock(&dpdk_mutex);
+    /* Take the name of the vhost-user port and append it to the location where
+     * the socket is to be created, then register the socket.
+     */
+    snprintf(netdev->vhost_id, sizeof(netdev->vhost_id), "%s/%s",
+            vhost_sock_dir, netdev_->name);
+    err = rte_vhost_driver_register(netdev->vhost_id);
+    if (err) {
+        VLOG_ERR("vhost-user socket device setup failure for socket %s\n",
+                 netdev->vhost_id);
+    }
+    VLOG_INFO("Socket %s created for vhost-user port %s\n", netdev->vhost_id, netdev_->name);
+    err = vhost_construct_helper(netdev_);
+    ovs_mutex_unlock(&dpdk_mutex);
     return err;
 }
 
@@ -657,7 +719,8 @@ netdev_dpdk_get_config(const struct netdev *netdev_, struct smap *args)
     ovs_mutex_lock(&dev->mutex);
 
     smap_add_format(args, "configured_rx_queues", "%d", netdev_->n_rxq);
-    smap_add_format(args, "configured_tx_queues", "%d", netdev_->n_txq);
+    smap_add_format(args, "requested_tx_queues", "%d", netdev_->n_txq);
+    smap_add_format(args, "configured_tx_queues", "%d", dev->real_n_txq);
     ovs_mutex_unlock(&dev->mutex);
 
     return 0;
@@ -694,8 +757,10 @@ netdev_dpdk_set_multiq(struct netdev *netdev_, unsigned int n_txq,
     netdev->up.n_rxq = n_rxq;
 
     rte_free(netdev->tx_q);
-    netdev_dpdk_alloc_txq(netdev, n_txq);
     err = dpdk_eth_dev_init(netdev);
+    netdev_dpdk_alloc_txq(netdev, netdev->real_n_txq);
+
+    netdev->txq_needs_locking = netdev->real_n_txq != netdev->up.n_txq;
 
     ovs_mutex_unlock(&netdev->mutex);
     ovs_mutex_unlock(&dpdk_mutex);
@@ -705,7 +770,7 @@ netdev_dpdk_set_multiq(struct netdev *netdev_, unsigned int n_txq,
 
 static int
 netdev_dpdk_vhost_set_multiq(struct netdev *netdev_, unsigned int n_txq,
-                       unsigned int n_rxq)
+                             unsigned int n_rxq)
 {
     struct netdev_dpdk *netdev = netdev_dpdk_cast(netdev_);
     int err = 0;
@@ -718,7 +783,8 @@ netdev_dpdk_vhost_set_multiq(struct netdev *netdev_, unsigned int n_txq,
     ovs_mutex_lock(&netdev->mutex);
 
     netdev->up.n_txq = n_txq;
-    netdev->up.n_rxq = n_rxq;
+    netdev->real_n_txq = 1;
+    netdev->up.n_rxq = 1;
 
     ovs_mutex_unlock(&netdev->mutex);
     ovs_mutex_unlock(&dpdk_mutex);
@@ -792,9 +858,9 @@ dpdk_queue_flush__(struct netdev_dpdk *dev, int qid)
         for (i = nb_tx; i < txq->count; i++) {
             rte_pktmbuf_free_seg(txq->burst_pkts[i]);
         }
-        ovs_mutex_lock(&dev->mutex);
+        rte_spinlock_lock(&dev->stats_lock);
         dev->stats.tx_dropped += txq->count-nb_tx;
-        ovs_mutex_unlock(&dev->mutex);
+        rte_spinlock_unlock(&dev->stats_lock);
     }
 
     txq->count = 0;
@@ -839,12 +905,15 @@ netdev_dpdk_vhost_rxq_recv(struct netdev_rxq *rxq_,
     nb_rx = rte_vhost_dequeue_burst(virtio_dev, qid,
                                     vhost_dev->dpdk_mp->mp,
                                     (struct rte_mbuf **)packets,
-                                    MAX_PKT_BURST);
+                                    NETDEV_MAX_BURST);
     if (!nb_rx) {
         return EAGAIN;
     }
 
+    rte_spinlock_lock(&vhost_dev->stats_lock);
     vhost_dev->stats.rx_packets += (uint64_t)nb_rx;
+    rte_spinlock_unlock(&vhost_dev->stats_lock);
+
     *c = (int) nb_rx;
     return 0;
 }
@@ -866,8 +935,7 @@ netdev_dpdk_rxq_recv(struct netdev_rxq *rxq_, struct dp_packet **packets,
 
     nb_rx = rte_eth_rx_burst(rx->port_id, rxq_->queue_id,
                              (struct rte_mbuf **) packets,
-                             MIN((int)NETDEV_MAX_RX_BATCH,
-                                 (int)MAX_RX_QUEUE_LEN));
+                             NETDEV_MAX_BURST);
     if (!nb_rx) {
         return EAGAIN;
     }
@@ -883,29 +951,68 @@ __netdev_dpdk_vhost_send(struct netdev *netdev, struct dp_packet **pkts,
 {
     struct netdev_dpdk *vhost_dev = netdev_dpdk_cast(netdev);
     struct virtio_net *virtio_dev = netdev_dpdk_get_virtio(vhost_dev);
-    int tx_pkts, i;
+    struct rte_mbuf **cur_pkts = (struct rte_mbuf **) pkts;
+    unsigned int total_pkts = cnt;
+    uint64_t start = 0;
 
     if (OVS_UNLIKELY(!is_vhost_running(virtio_dev))) {
-       ovs_mutex_lock(&vhost_dev->mutex);
-       vhost_dev->stats.tx_dropped+= cnt;
-       ovs_mutex_unlock(&vhost_dev->mutex);
-       goto out;
+        rte_spinlock_lock(&vhost_dev->stats_lock);
+        vhost_dev->stats.tx_dropped+= cnt;
+        rte_spinlock_unlock(&vhost_dev->stats_lock);
+        goto out;
     }
 
     /* There is vHost TX single queue, So we need to lock it for TX. */
-    rte_spinlock_lock(&vhost_dev->txq_lock);
-    tx_pkts = rte_vhost_enqueue_burst(virtio_dev, VIRTIO_RXQ,
-                                      (struct rte_mbuf **)pkts, cnt);
+    rte_spinlock_lock(&vhost_dev->vhost_tx_lock);
+
+    do {
+        unsigned int tx_pkts;
+
+        tx_pkts = rte_vhost_enqueue_burst(virtio_dev, VIRTIO_RXQ,
+                                          cur_pkts, cnt);
+        if (OVS_LIKELY(tx_pkts)) {
+            /* Packets have been sent.*/
+            cnt -= tx_pkts;
+            /* Prepare for possible next iteration.*/
+            cur_pkts = &cur_pkts[tx_pkts];
+        } else {
+            uint64_t timeout = VHOST_ENQ_RETRY_USECS * rte_get_timer_hz() / 1E6;
+            unsigned int expired = 0;
+
+            if (!start) {
+                start = rte_get_timer_cycles();
+            }
+
+            /*
+             * Unable to enqueue packets to vhost interface.
+             * Check available entries before retrying.
+             */
+            while (!rte_vring_available_entries(virtio_dev, VIRTIO_RXQ)) {
+                if (OVS_UNLIKELY((rte_get_timer_cycles() - start) > timeout)) {
+                    expired = 1;
+                    break;
+                }
+            }
+            if (expired) {
+                /* break out of main loop. */
+                break;
+            }
+        }
+    } while (cnt);
+    rte_spinlock_unlock(&vhost_dev->vhost_tx_lock);
 
-    vhost_dev->stats.tx_packets += tx_pkts;
-    vhost_dev->stats.tx_dropped += (cnt - tx_pkts);
-    rte_spinlock_unlock(&vhost_dev->txq_lock);
+    rte_spinlock_lock(&vhost_dev->stats_lock);
+    vhost_dev->stats.tx_packets += (total_pkts - cnt);
+    vhost_dev->stats.tx_dropped += cnt;
+    rte_spinlock_unlock(&vhost_dev->stats_lock);
 
 out:
     if (may_steal) {
-       for (i = 0; i < cnt; i++) {
-           dp_packet_delete(pkts[i]);
-       }
+        int i;
+
+        for (i = 0; i < total_pkts; i++) {
+            dp_packet_delete(pkts[i]);
+        }
     }
 }
 
@@ -944,8 +1051,14 @@ dpdk_do_tx_copy(struct netdev *netdev, int qid, struct dp_packet **pkts,
                 int cnt)
     OVS_NO_THREAD_SAFETY_ANALYSIS
 {
+#if !defined(__CHECKER__) && !defined(_WIN32)
+    const size_t PKT_ARRAY_SIZE = cnt;
+#else
+    /* Sparse or MSVC doesn't like variable length array. */
+    enum { PKT_ARRAY_SIZE = NETDEV_MAX_BURST };
+#endif
     struct netdev_dpdk *dev = netdev_dpdk_cast(netdev);
-    struct rte_mbuf *mbufs[cnt];
+    struct rte_mbuf *mbufs[PKT_ARRAY_SIZE];
     int dropped = 0;
     int newcnt = 0;
     int i;
@@ -985,9 +1098,9 @@ dpdk_do_tx_copy(struct netdev *netdev, int qid, struct dp_packet **pkts,
     }
 
     if (OVS_UNLIKELY(dropped)) {
-        ovs_mutex_lock(&dev->mutex);
+        rte_spinlock_lock(&dev->stats_lock);
         dev->stats.tx_dropped += dropped;
-        ovs_mutex_unlock(&dev->mutex);
+        rte_spinlock_unlock(&dev->stats_lock);
     }
 
     if (dev->type == DPDK_DEV_VHOST) {
@@ -1027,6 +1140,11 @@ netdev_dpdk_send__(struct netdev_dpdk *dev, int qid,
 {
     int i;
 
+    if (OVS_UNLIKELY(dev->txq_needs_locking)) {
+        qid = qid % dev->real_n_txq;
+        rte_spinlock_lock(&dev->tx_q[qid].tx_lock);
+    }
+
     if (OVS_UNLIKELY(!may_steal ||
                      pkts[0]->source != DPBUF_DPDK)) {
         struct netdev *netdev = &dev->up;
@@ -1044,6 +1162,7 @@ netdev_dpdk_send__(struct netdev_dpdk *dev, int qid,
 
         for (i = 0; i < cnt; i++) {
             int size = dp_packet_size(pkts[i]);
+
             if (OVS_UNLIKELY(size > dev->max_packet_len)) {
                 if (next_tx_idx != i) {
                     dpdk_queue_pkts(dev, qid,
@@ -1066,11 +1185,15 @@ netdev_dpdk_send__(struct netdev_dpdk *dev, int qid,
         }
 
         if (OVS_UNLIKELY(dropped)) {
-            ovs_mutex_lock(&dev->mutex);
+            rte_spinlock_lock(&dev->stats_lock);
             dev->stats.tx_dropped += dropped;
-            ovs_mutex_unlock(&dev->mutex);
+            rte_spinlock_unlock(&dev->stats_lock);
         }
     }
+
+    if (OVS_UNLIKELY(dev->txq_needs_locking)) {
+        rte_spinlock_unlock(&dev->tx_q[qid].tx_lock);
+    }
 }
 
 static int
@@ -1203,10 +1326,12 @@ netdev_dpdk_vhost_get_stats(const struct netdev *netdev,
     stats->rx_dropped += UINT64_MAX;
     stats->tx_bytes += UINT64_MAX;
 
+    rte_spinlock_lock(&dev->stats_lock);
     /* Supported Stats */
     stats->rx_packets += dev->stats.rx_packets;
     stats->tx_packets += dev->stats.tx_packets;
     stats->tx_dropped += dev->stats.tx_dropped;
+    rte_spinlock_unlock(&dev->stats_lock);
     ovs_mutex_unlock(&dev->mutex);
 
     return 0;
@@ -1233,7 +1358,9 @@ netdev_dpdk_get_stats(const struct netdev *netdev, struct netdev_stats *stats)
     stats->tx_errors = rte_stats.oerrors;
     stats->multicast = rte_stats.imcasts;
 
+    rte_spinlock_lock(&dev->stats_lock);
     stats->tx_dropped = dev->stats.tx_dropped;
+    rte_spinlock_unlock(&dev->stats_lock);
     ovs_mutex_unlock(&dev->mutex);
 
     return 0;
@@ -1516,7 +1643,7 @@ new_device(struct virtio_net *dev)
     ovs_mutex_lock(&dpdk_mutex);
     /* Add device to the vhost port with the same name as that passed down. */
     LIST_FOR_EACH(netdev, list_node, &dpdk_list) {
-        if (strncmp(dev->ifname, netdev->up.name, IFNAMSIZ) == 0) {
+        if (strncmp(dev->ifname, netdev->vhost_id, IF_NAME_SZ) == 0) {
             ovs_mutex_lock(&netdev->mutex);
             ovsrcu_set(&netdev->virtio_dev, dev);
             ovs_mutex_unlock(&netdev->mutex);
@@ -1566,6 +1693,11 @@ destroy_device(volatile struct virtio_net *dev)
              * setting the virtio_dev to NULL.
              */
             ovsrcu_synchronize();
+            /*
+             * As call to ovsrcu_synchronize() will end the quiescent state,
+             * put thread back into quiescent state before returning.
+             */
+            ovsrcu_quiesce_start();
         }
     }
     ovs_mutex_unlock(&dpdk_mutex);
@@ -1584,16 +1716,18 @@ netdev_dpdk_get_virtio(const struct netdev_dpdk *dev)
  * These callbacks allow virtio-net devices to be added to vhost ports when
  * configuration has been fully complete.
  */
-const struct virtio_net_device_ops virtio_net_device_ops =
+static const struct virtio_net_device_ops virtio_net_device_ops =
 {
     .new_device =  new_device,
     .destroy_device = destroy_device,
 };
 
 static void *
-start_cuse_session_loop(void *dummy OVS_UNUSED)
+start_vhost_loop(void *dummy OVS_UNUSED)
 {
      pthread_detach(pthread_self());
+     /* Put the cuse thread into quiescent state. */
+     ovsrcu_quiesce_start();
      rte_vhost_driver_session_start();
      return NULL;
 }
@@ -1601,10 +1735,16 @@ start_cuse_session_loop(void *dummy OVS_UNUSED)
 static int
 dpdk_vhost_class_init(void)
 {
-    pthread_t thread;
+    rte_vhost_driver_callback_register(&virtio_net_device_ops);
+    ovs_thread_create("vhost_thread", start_vhost_loop, NULL);
+    return 0;
+}
+
+static int
+dpdk_vhost_cuse_class_init(void)
+{
     int err = -1;
 
-    rte_vhost_driver_callback_register(&virtio_net_device_ops);
 
     /* Register CUSE device to handle IOCTLs.
      * Unless otherwise specified on the vswitchd command line, cuse_dev_name
@@ -1617,9 +1757,15 @@ dpdk_vhost_class_init(void)
         return -1;
     }
 
-    /* start_cuse_session_loop blocks OVS RCU quiescent state, so directly use
-     * pthread API. */
-    return pthread_create(&thread, NULL, start_cuse_session_loop, NULL);
+    dpdk_vhost_class_init();
+    return 0;
+}
+
+static int
+dpdk_vhost_user_class_init(void)
+{
+    dpdk_vhost_class_init();
+    return 0;
 }
 
 static void
@@ -1716,15 +1862,21 @@ dpdk_ring_open(const char dev_name[], unsigned int *eth_port_id) OVS_REQUIRES(dp
 }
 
 static int
-netdev_dpdk_ring_send(struct netdev *netdev, int qid OVS_UNUSED,
+netdev_dpdk_ring_send(struct netdev *netdev_, int qid,
                       struct dp_packet **pkts, int cnt, bool may_steal)
 {
-    struct netdev_dpdk *dev = netdev_dpdk_cast(netdev);
+    struct netdev_dpdk *netdev = netdev_dpdk_cast(netdev_);
+    unsigned i;
 
-    /* DPDK Rings have a single TX queue, Therefore needs locking. */
-    rte_spinlock_lock(&dev->txq_lock);
-    netdev_dpdk_send__(dev, 0, pkts, cnt, may_steal);
-    rte_spinlock_unlock(&dev->txq_lock);
+    /* When using 'dpdkr' and sending to a DPDK ring, we want to ensure that the
+     * rss hash field is clear. This is because the same mbuf may be modified by
+     * the consumer of the ring and return into the datapath without recalculating
+     * the RSS hash. */
+    for (i = 0; i < cnt; i++) {
+        dp_packet_set_rss_hash(pkts[i], 0);
+    }
+
+    netdev_dpdk_send__(netdev, qid, pkts, cnt, may_steal);
     return 0;
 }
 
@@ -1821,6 +1973,33 @@ unlock_dpdk:
     NULL,                       /* rxq_drain */               \
 }
 
+static int
+process_vhost_flags(char *flag, char *default_val, int size,
+                    char **argv, char **new_val)
+{
+    int changed = 0;
+
+    /* Depending on which version of vhost is in use, process the vhost-specific
+     * flag if it is provided on the vswitchd command line, otherwise resort to
+     * a default value.
+     *
+     * For vhost-user: Process "-cuse_dev_name" to set the custom location of
+     * the vhost-user socket(s).
+     * For vhost-cuse: Process "-vhost_sock_dir" to set the custom name of the
+     * vhost-cuse character device.
+     */
+    if (!strcmp(argv[1], flag) && (strlen(argv[2]) <= size)) {
+        changed = 1;
+        *new_val = strdup(argv[2]);
+        VLOG_INFO("User-provided %s in use: %s", flag, *new_val);
+    } else {
+        VLOG_INFO("No %s provided - defaulting to %s", flag, default_val);
+        *new_val = default_val;
+    }
+
+    return changed;
+}
+
 int
 dpdk_init(int argc, char **argv)
 {
@@ -1835,27 +2014,29 @@ dpdk_init(int argc, char **argv)
     argc--;
     argv++;
 
-    /* If the cuse_dev_name parameter has been provided, set 'cuse_dev_name' to
-     * this string if it meets the correct criteria. Otherwise, set it to the
-     * default (vhost-net).
-     */
-    if (!strcmp(argv[1], "--cuse_dev_name") &&
-        (strlen(argv[2]) <= NAME_MAX)) {
-
-        cuse_dev_name = strdup(argv[2]);
-
-        /* Remove the cuse_dev_name configuration parameters from the argument
+#ifdef VHOST_CUSE
+    if (process_vhost_flags("-cuse_dev_name", strdup("vhost-net"),
+                            PATH_MAX, argv, &cuse_dev_name)) {
+#else
+    if (process_vhost_flags("-vhost_sock_dir", strdup(ovs_rundir()),
+                            NAME_MAX, argv, &vhost_sock_dir)) {
+        struct stat s;
+        int err;
+
+        err = stat(vhost_sock_dir, &s);
+        if (err) {
+            VLOG_ERR("vHostUser socket DIR '%s' does not exist.",
+                     vhost_sock_dir);
+            return err;
+        }
+#endif
+        /* Remove the vhost flag configuration parameters from the argument
          * list, so that the correct elements are passed to the DPDK
          * initialization function
          */
         argc -= 2;
-        argv += 2;    /* Increment by two to bypass the cuse_dev_name arguments */
+        argv += 2;    /* Increment by two to bypass the vhost flag arguments */
         base = 2;
-
-        VLOG_ERR("User-provided cuse_dev_name in use: /dev/%s", cuse_dev_name);
-    } else {
-        cuse_dev_name = "vhost-net";
-        VLOG_INFO("No cuse_dev_name provided - defaulting to /dev/vhost-net");
     }
 
     /* Keep the program name argument as this is needed for call to
@@ -1877,12 +2058,12 @@ dpdk_init(int argc, char **argv)
     }
 
     /* We are called from the main thread here */
-    thread_set_nonpmd();
+    RTE_PER_LCORE(_lcore_id) = NON_PMD_CORE_ID;
 
     return result + 1 + base;
 }
 
-const struct netdev_class dpdk_class =
+static const struct netdev_class dpdk_class =
     NETDEV_DPDK_CLASS(
         "dpdk",
         NULL,
@@ -1896,13 +2077,13 @@ const struct netdev_class dpdk_class =
         netdev_dpdk_get_status,
         netdev_dpdk_rxq_recv);
 
-const struct netdev_class dpdk_ring_class =
+static const struct netdev_class dpdk_ring_class =
     NETDEV_DPDK_CLASS(
         "dpdkr",
         NULL,
         netdev_dpdk_ring_construct,
         netdev_dpdk_destruct,
-        NULL,
+        netdev_dpdk_set_multiq,
         netdev_dpdk_ring_send,
         netdev_dpdk_get_carrier,
         netdev_dpdk_get_stats,
@@ -1910,11 +2091,25 @@ const struct netdev_class dpdk_ring_class =
         netdev_dpdk_get_status,
         netdev_dpdk_rxq_recv);
 
-const struct netdev_class dpdk_vhost_class =
+static const struct netdev_class dpdk_vhost_cuse_class =
     NETDEV_DPDK_CLASS(
-        "dpdkvhost",
-        dpdk_vhost_class_init,
-        netdev_dpdk_vhost_construct,
+        "dpdkvhostcuse",
+        dpdk_vhost_cuse_class_init,
+        netdev_dpdk_vhost_cuse_construct,
+        netdev_dpdk_vhost_destruct,
+        netdev_dpdk_vhost_set_multiq,
+        netdev_dpdk_vhost_send,
+        netdev_dpdk_vhost_get_carrier,
+        netdev_dpdk_vhost_get_stats,
+        NULL,
+        NULL,
+        netdev_dpdk_vhost_rxq_recv);
+
+const struct netdev_class dpdk_vhost_user_class =
+    NETDEV_DPDK_CLASS(
+        "dpdkvhostuser",
+        dpdk_vhost_user_class_init,
+        netdev_dpdk_vhost_user_construct,
         netdev_dpdk_vhost_destruct,
         netdev_dpdk_vhost_set_multiq,
         netdev_dpdk_vhost_send,
@@ -1937,13 +2132,17 @@ netdev_dpdk_register(void)
         dpdk_common_init();
         netdev_register_provider(&dpdk_class);
         netdev_register_provider(&dpdk_ring_class);
-        netdev_register_provider(&dpdk_vhost_class);
+#ifdef VHOST_CUSE
+        netdev_register_provider(&dpdk_vhost_cuse_class);
+#else
+        netdev_register_provider(&dpdk_vhost_user_class);
+#endif
         ovsthread_once_done(&once);
     }
 }
 
 int
-pmd_thread_setaffinity_cpu(int cpu)
+pmd_thread_setaffinity_cpu(unsigned cpu)
 {
     cpu_set_t cpuset;
     int err;
@@ -1962,14 +2161,6 @@ pmd_thread_setaffinity_cpu(int cpu)
     return 0;
 }
 
-void
-thread_set_nonpmd(void)
-{
-    /* We have to use NON_PMD_CORE_ID to allow non-pmd threads to perform
-     * certain DPDK operations, like rte_eth_dev_configure(). */
-    RTE_PER_LCORE(_lcore_id) = NON_PMD_CORE_ID;
-}
-
 static bool
 thread_is_pmd(void)
 {