list: Rename struct list to struct ovs_list
[cascardo/ovs.git] / lib / netdev-dpdk.c
index f2202b4..53fdb8e 100644 (file)
@@ -36,6 +36,7 @@
 #include "odp-util.h"
 #include "ofp-print.h"
 #include "ofpbuf.h"
+#include "ovs-numa.h"
 #include "ovs-thread.h"
 #include "ovs-rcu.h"
 #include "packet-dpif.h"
@@ -69,8 +70,6 @@ static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 20);
 #define MP_CACHE_SZ          (256 * 2)
 #define SOCKET0              0
 
-#define NON_PMD_THREAD_TX_QUEUE 0
-
 #define NIC_PORT_RX_Q_SIZE 2048  /* Size of Physical NIC RX Queue, Max (n+32<=4096)*/
 #define NIC_PORT_TX_Q_SIZE 2048  /* Size of Physical NIC TX Queue, Max (n+32<=4096)*/
 
@@ -96,7 +95,8 @@ static const struct rte_eth_conf port_conf = {
     .rx_adv_conf = {
         .rss_conf = {
             .rss_key = NULL,
-            .rss_hf = ETH_RSS_IPV4_TCP | ETH_RSS_IPV4 | ETH_RSS_IPV6,
+            .rss_hf = ETH_RSS_IPV4_TCP | ETH_RSS_IPV4 | ETH_RSS_IPV6
+                    | ETH_RSS_IPV4_UDP | ETH_RSS_IPV6_TCP | ETH_RSS_IPV6_UDP,
         },
     },
     .txmode = {
@@ -125,6 +125,8 @@ static const struct rte_eth_txconf tx_conf = {
 
 enum { MAX_RX_QUEUE_LEN = 192 };
 enum { MAX_TX_QUEUE_LEN = 384 };
+enum { DPDK_RING_SIZE = 256 };
+BUILD_ASSERT_DECL(IS_POW2(DPDK_RING_SIZE));
 enum { DRAIN_TSC = 200000ULL };
 
 static int rte_eal_init_ret = ENODEV;
@@ -132,10 +134,10 @@ static int rte_eal_init_ret = ENODEV;
 static struct ovs_mutex dpdk_mutex = OVS_MUTEX_INITIALIZER;
 
 /* Contains all 'struct dpdk_dev's. */
-static struct list dpdk_list OVS_GUARDED_BY(dpdk_mutex)
+static struct ovs_list dpdk_list OVS_GUARDED_BY(dpdk_mutex)
     = LIST_INITIALIZER(&dpdk_list);
 
-static struct list dpdk_mp_list OVS_GUARDED_BY(dpdk_mutex)
+static struct ovs_list dpdk_mp_list OVS_GUARDED_BY(dpdk_mutex)
     = LIST_INITIALIZER(&dpdk_mp_list);
 
 /* This mutex must be used by non pmd threads when allocating or freeing
@@ -148,11 +150,14 @@ struct dpdk_mp {
     int mtu;
     int socket_id;
     int refcount;
-    struct list list_node OVS_GUARDED_BY(dpdk_mutex);
+    struct ovs_list list_node OVS_GUARDED_BY(dpdk_mutex);
 };
 
+/* There should be one 'struct dpdk_tx_queue' created for
+ * each cpu core. */
 struct dpdk_tx_queue {
-    rte_spinlock_t tx_lock;
+    bool flush_tx;                 /* Set to true to flush queue everytime */
+                                   /* pkts are queued. */
     int count;
     uint64_t tsc;
     struct rte_mbuf *burst_pkts[MAX_TX_QUEUE_LEN];
@@ -162,7 +167,7 @@ struct dpdk_tx_queue {
    so we have to keep them around once they've been created
 */
 
-static struct list dpdk_ring_list OVS_GUARDED_BY(dpdk_mutex)
+static struct ovs_list dpdk_ring_list OVS_GUARDED_BY(dpdk_mutex)
     = LIST_INITIALIZER(&dpdk_ring_list);
 
 struct dpdk_ring {
@@ -171,7 +176,7 @@ struct dpdk_ring {
     struct rte_ring *cring_rx;
     int user_port_id; /* User given port no, parsed from port name */
     int eth_port_id; /* ethernet device port id */
-    struct list list_node OVS_GUARDED_BY(dpdk_mutex);
+    struct ovs_list list_node OVS_GUARDED_BY(dpdk_mutex);
 };
 
 struct netdev_dpdk {
@@ -179,7 +184,7 @@ struct netdev_dpdk {
     int port_id;
     int max_packet_len;
 
-    struct dpdk_tx_queue tx_q[NR_QUEUE];
+    struct dpdk_tx_queue *tx_q;
 
     struct ovs_mutex mutex OVS_ACQ_AFTER(dpdk_mutex);
 
@@ -187,7 +192,6 @@ struct netdev_dpdk {
     int mtu;
     int socket_id;
     int buf_size;
-    struct netdev_stats stats_offset;
     struct netdev_stats stats;
 
     uint8_t hwaddr[ETH_ADDR_LEN];
@@ -197,7 +201,8 @@ struct netdev_dpdk {
     int link_reset_cnt;
 
     /* In dpdk_list. */
-    struct list list_node OVS_GUARDED_BY(dpdk_mutex);
+    struct ovs_list list_node OVS_GUARDED_BY(dpdk_mutex);
+    rte_spinlock_t dpdkr_tx_lock;
 };
 
 struct netdev_rxq_dpdk {
@@ -300,7 +305,8 @@ dpdk_mp_get(int socket_id, int mtu) OVS_REQUIRES(dpdk_mutex)
     dmp->mtu = mtu;
     dmp->refcount = 1;
 
-    if (snprintf(mp_name, RTE_MEMPOOL_NAMESIZE, "ovs_mp_%d", dmp->mtu) < 0) {
+    if (snprintf(mp_name, RTE_MEMPOOL_NAMESIZE, "ovs_mp_%d_%d", dmp->mtu,
+                 dmp->socket_id) < 0) {
         return NULL;
     }
 
@@ -395,13 +401,14 @@ dpdk_eth_dev_init(struct netdev_dpdk *dev) OVS_REQUIRES(dpdk_mutex)
         return ENODEV;
     }
 
-    diag = rte_eth_dev_configure(dev->port_id, NR_QUEUE, NR_QUEUE,  &port_conf);
+    diag = rte_eth_dev_configure(dev->port_id, dev->up.n_rxq, dev->up.n_txq,
+                                 &port_conf);
     if (diag) {
         VLOG_ERR("eth dev config error %d",diag);
         return -diag;
     }
 
-    for (i = 0; i < NR_QUEUE; i++) {
+    for (i = 0; i < dev->up.n_txq; i++) {
         diag = rte_eth_tx_queue_setup(dev->port_id, i, NIC_PORT_TX_Q_SIZE,
                                       dev->socket_id, &tx_conf);
         if (diag) {
@@ -410,7 +417,7 @@ dpdk_eth_dev_init(struct netdev_dpdk *dev) OVS_REQUIRES(dpdk_mutex)
         }
     }
 
-    for (i = 0; i < NR_QUEUE; i++) {
+    for (i = 0; i < dev->up.n_rxq; i++) {
         diag = rte_eth_rx_queue_setup(dev->port_id, i, NIC_PORT_RX_Q_SIZE,
                                       dev->socket_id,
                                       &rx_conf, dev->dpdk_mp->mp);
@@ -457,29 +464,46 @@ netdev_dpdk_alloc(void)
     return &netdev->up;
 }
 
+static void
+netdev_dpdk_alloc_txq(struct netdev_dpdk *netdev, unsigned int n_txqs)
+{
+    int i;
+
+    netdev->tx_q = dpdk_rte_mzalloc(n_txqs * sizeof *netdev->tx_q);
+    /* Each index is considered as a cpu core id, since there should
+     * be one tx queue for each cpu core. */
+    for (i = 0; i < n_txqs; i++) {
+        int numa_id = ovs_numa_get_numa_id(i);
+
+        /* If the corresponding core is not on the same numa node
+         * as 'netdev', flags the 'flush_tx'. */
+        netdev->tx_q[i].flush_tx = netdev->socket_id == numa_id;
+    }
+}
+
 static int
-netdev_dpdk_init(struct netdev *netdev_, unsigned int port_no) OVS_REQUIRES(dpdk_mutex)
+netdev_dpdk_init(struct netdev *netdev_, unsigned int port_no)
+    OVS_REQUIRES(dpdk_mutex)
 {
     struct netdev_dpdk *netdev = netdev_dpdk_cast(netdev_);
+    int sid;
     int err = 0;
-    int i;
 
     ovs_mutex_init(&netdev->mutex);
 
     ovs_mutex_lock(&netdev->mutex);
 
-    for (i = 0; i < NR_QUEUE; i++) {
-        rte_spinlock_init(&netdev->tx_q[i].tx_lock);
-    }
-
+    /* If the 'sid' is negative, it means that the kernel fails
+     * to obtain the pci numa info.  In that situation, always
+     * use 'SOCKET0'. */
+    sid = rte_eth_dev_socket_id(port_no);
+    netdev->socket_id = sid < 0 ? SOCKET0 : sid;
+    netdev_dpdk_alloc_txq(netdev, NR_QUEUE);
     netdev->port_id = port_no;
-
     netdev->flags = 0;
     netdev->mtu = ETHER_MTU;
     netdev->max_packet_len = MTU_TO_MAX_LEN(netdev->mtu);
-
-    /* XXX: need to discover device node at run time. */
-    netdev->socket_id = SOCKET0;
+    rte_spinlock_init(&netdev->dpdkr_tx_lock);
 
     netdev->dpdk_mp = dpdk_mp_get(netdev->socket_id, netdev->mtu);
     if (!netdev->dpdk_mp) {
@@ -487,15 +511,19 @@ netdev_dpdk_init(struct netdev *netdev_, unsigned int port_no) OVS_REQUIRES(dpdk
         goto unlock;
     }
 
+    netdev_->n_txq = NR_QUEUE;
+    netdev_->n_rxq = NR_QUEUE;
     err = dpdk_eth_dev_init(netdev);
     if (err) {
         goto unlock;
     }
-    netdev_->n_rxq = NR_QUEUE;
 
     list_push_back(&dpdk_list, &netdev->list_node);
 
 unlock:
+    if (err) {
+        rte_free(netdev->tx_q);
+    }
     ovs_mutex_unlock(&netdev->mutex);
     return err;
 }
@@ -547,6 +575,7 @@ netdev_dpdk_destruct(struct netdev *netdev_)
     ovs_mutex_unlock(&dev->mutex);
 
     ovs_mutex_lock(&dpdk_mutex);
+    rte_free(dev->tx_q);
     list_remove(&dev->list_node);
     dpdk_mp_put(dev->dpdk_mp);
     ovs_mutex_unlock(&dpdk_mutex);
@@ -569,14 +598,52 @@ netdev_dpdk_get_config(const struct netdev *netdev_, struct smap *args)
 
     ovs_mutex_lock(&dev->mutex);
 
-    /* XXX: Allow to configure number of queues. */
-    smap_add_format(args, "configured_rx_queues", "%u", netdev_->n_rxq);
-    smap_add_format(args, "configured_tx_queues", "%u", netdev_->n_rxq);
+    smap_add_format(args, "configured_rx_queues", "%d", netdev_->n_rxq);
+    smap_add_format(args, "configured_tx_queues", "%d", netdev_->n_txq);
     ovs_mutex_unlock(&dev->mutex);
 
     return 0;
 }
 
+static int
+netdev_dpdk_get_numa_id(const struct netdev *netdev_)
+{
+    struct netdev_dpdk *netdev = netdev_dpdk_cast(netdev_);
+
+    return netdev->socket_id;
+}
+
+/* Sets the number of tx queues and rx queues for the dpdk interface.
+ * If the configuration fails, do not try restoring its old configuration
+ * and just returns the error. */
+static int
+netdev_dpdk_set_multiq(struct netdev *netdev_, unsigned int n_txq,
+                       unsigned int n_rxq)
+{
+    struct netdev_dpdk *netdev = netdev_dpdk_cast(netdev_);
+    int err = 0;
+
+    if (netdev->up.n_txq == n_txq && netdev->up.n_rxq == n_rxq) {
+        return err;
+    }
+
+    ovs_mutex_lock(&dpdk_mutex);
+    ovs_mutex_lock(&netdev->mutex);
+
+    rte_eth_dev_stop(netdev->port_id);
+
+    netdev->up.n_txq = n_txq;
+    netdev->up.n_rxq = n_rxq;
+    rte_free(netdev->tx_q);
+    netdev_dpdk_alloc_txq(netdev, n_txq);
+    err = dpdk_eth_dev_init(netdev);
+
+    ovs_mutex_unlock(&netdev->mutex);
+    ovs_mutex_unlock(&dpdk_mutex);
+
+    return err;
+}
+
 static struct netdev_rxq *
 netdev_dpdk_rxq_alloc(void)
 {
@@ -621,9 +688,20 @@ static inline void
 dpdk_queue_flush__(struct netdev_dpdk *dev, int qid)
 {
     struct dpdk_tx_queue *txq = &dev->tx_q[qid];
-    uint32_t nb_tx;
+    uint32_t nb_tx = 0;
+
+    while (nb_tx != txq->count) {
+        uint32_t ret;
+
+        ret = rte_eth_tx_burst(dev->port_id, qid, txq->burst_pkts + nb_tx,
+                               txq->count - nb_tx);
+        if (!ret) {
+            break;
+        }
+
+        nb_tx += ret;
+    }
 
-    nb_tx = rte_eth_tx_burst(dev->port_id, qid, txq->burst_pkts, txq->count);
     if (OVS_UNLIKELY(nb_tx != txq->count)) {
         /* free buffers, which we couldn't transmit, one at a time (each
          * packet could come from a different mempool) */
@@ -632,7 +710,11 @@ dpdk_queue_flush__(struct netdev_dpdk *dev, int qid)
         for (i = nb_tx; i < txq->count; i++) {
             rte_pktmbuf_free_seg(txq->burst_pkts[i]);
         }
+        ovs_mutex_lock(&dev->mutex);
+        dev->stats.tx_dropped += txq->count-nb_tx;
+        ovs_mutex_unlock(&dev->mutex);
     }
+
     txq->count = 0;
     txq->tsc = rte_get_timer_cycles();
 }
@@ -645,9 +727,7 @@ dpdk_queue_flush(struct netdev_dpdk *dev, int qid)
     if (txq->count == 0) {
         return;
     }
-    rte_spinlock_lock(&txq->tx_lock);
     dpdk_queue_flush__(dev, qid);
-    rte_spinlock_unlock(&txq->tx_lock);
 }
 
 static int
@@ -659,7 +739,11 @@ netdev_dpdk_rxq_recv(struct netdev_rxq *rxq_, struct dpif_packet **packets,
     struct netdev_dpdk *dev = netdev_dpdk_cast(netdev);
     int nb_rx;
 
-    dpdk_queue_flush(dev, rxq_->queue_id);
+    /* There is only one tx queue for this core.  Do not flush other
+     * queueus. */
+    if (rxq_->queue_id == rte_lcore_id()) {
+        dpdk_queue_flush(dev, rxq_->queue_id);
+    }
 
     nb_rx = rte_eth_rx_burst(rx->port_id, rxq_->queue_id,
                              (struct rte_mbuf **) packets,
@@ -683,7 +767,6 @@ dpdk_queue_pkts(struct netdev_dpdk *dev, int qid,
 
     int i = 0;
 
-    rte_spinlock_lock(&txq->tx_lock);
     while (i < cnt) {
         int freeslots = MAX_TX_QUEUE_LEN - txq->count;
         int tocopy = MIN(freeslots, cnt-i);
@@ -694,7 +777,7 @@ dpdk_queue_pkts(struct netdev_dpdk *dev, int qid,
         txq->count += tocopy;
         i += tocopy;
 
-        if (txq->count == MAX_TX_QUEUE_LEN) {
+        if (txq->count == MAX_TX_QUEUE_LEN || txq->flush_tx) {
             dpdk_queue_flush__(dev, qid);
         }
         diff_tsc = rte_get_timer_cycles() - txq->tsc;
@@ -702,12 +785,12 @@ dpdk_queue_pkts(struct netdev_dpdk *dev, int qid,
             dpdk_queue_flush__(dev, qid);
         }
     }
-    rte_spinlock_unlock(&txq->tx_lock);
 }
 
 /* Tx function. Transmit packets indefinitely */
 static void
-dpdk_do_tx_copy(struct netdev *netdev, struct dpif_packet ** pkts, int cnt)
+dpdk_do_tx_copy(struct netdev *netdev, int qid, struct dpif_packet ** pkts,
+                int cnt)
     OVS_NO_THREAD_SAFETY_ANALYSIS
 {
     struct netdev_dpdk *dev = netdev_dpdk_cast(netdev);
@@ -756,24 +839,25 @@ dpdk_do_tx_copy(struct netdev *netdev, struct dpif_packet ** pkts, int cnt)
         ovs_mutex_unlock(&dev->mutex);
     }
 
-    dpdk_queue_pkts(dev, NON_PMD_THREAD_TX_QUEUE, mbufs, newcnt);
-    dpdk_queue_flush(dev, NON_PMD_THREAD_TX_QUEUE);
+    dpdk_queue_pkts(dev, qid, mbufs, newcnt);
+    dpdk_queue_flush(dev, qid);
 
     if (!thread_is_pmd()) {
         ovs_mutex_unlock(&nonpmd_mempool_mutex);
     }
 }
 
-static int
-netdev_dpdk_send(struct netdev *netdev, struct dpif_packet **pkts, int cnt,
-                 bool may_steal)
+static inline void
+netdev_dpdk_send__(struct netdev_dpdk *dev, int qid,
+                   struct dpif_packet **pkts, int cnt, bool may_steal)
 {
-    struct netdev_dpdk *dev = netdev_dpdk_cast(netdev);
-    int ret;
     int i;
 
-    if (!may_steal || pkts[0]->ofpbuf.source != OFPBUF_DPDK) {
-        dpdk_do_tx_copy(netdev, pkts, cnt);
+    if (OVS_UNLIKELY(!may_steal ||
+                     pkts[0]->ofpbuf.source != OFPBUF_DPDK)) {
+        struct netdev *netdev = &dev->up;
+
+        dpdk_do_tx_copy(netdev, qid, pkts, cnt);
 
         if (may_steal) {
             for (i = 0; i < cnt; i++) {
@@ -781,12 +865,9 @@ netdev_dpdk_send(struct netdev *netdev, struct dpif_packet **pkts, int cnt,
             }
         }
     } else {
-        int qid;
         int next_tx_idx = 0;
         int dropped = 0;
 
-        qid = rte_lcore_id() % NR_QUEUE;
-
         for (i = 0; i < cnt; i++) {
             int size = ofpbuf_size(&pkts[i]->ofpbuf);
             if (OVS_UNLIKELY(size > dev->max_packet_len)) {
@@ -816,9 +897,16 @@ netdev_dpdk_send(struct netdev *netdev, struct dpif_packet **pkts, int cnt,
             ovs_mutex_unlock(&dev->mutex);
         }
     }
-    ret = 0;
+}
 
-    return ret;
+static int
+netdev_dpdk_eth_send(struct netdev *netdev, int qid,
+                     struct dpif_packet **pkts, int cnt, bool may_steal)
+{
+    struct netdev_dpdk *dev = netdev_dpdk_cast(netdev);
+
+    netdev_dpdk_send__(dev, qid, pkts, cnt, may_steal);
+    return 0;
 }
 
 static int
@@ -923,29 +1011,17 @@ netdev_dpdk_get_stats(const struct netdev *netdev, struct netdev_stats *stats)
     ovs_mutex_lock(&dev->mutex);
     rte_eth_stats_get(dev->port_id, &rte_stats);
 
-    *stats = dev->stats_offset;
-
-    stats->rx_packets += rte_stats.ipackets;
-    stats->tx_packets += rte_stats.opackets;
-    stats->rx_bytes += rte_stats.ibytes;
-    stats->tx_bytes += rte_stats.obytes;
-    stats->rx_errors += rte_stats.ierrors;
-    stats->tx_errors += rte_stats.oerrors;
-    stats->multicast += rte_stats.imcasts;
-
-    stats->tx_dropped += dev->stats.tx_dropped;
-    ovs_mutex_unlock(&dev->mutex);
-
-    return 0;
-}
+    memset(stats, 0, sizeof(*stats));
 
-static int
-netdev_dpdk_set_stats(struct netdev *netdev, const struct netdev_stats *stats)
-{
-    struct netdev_dpdk *dev = netdev_dpdk_cast(netdev);
+    stats->rx_packets = rte_stats.ipackets;
+    stats->tx_packets = rte_stats.opackets;
+    stats->rx_bytes = rte_stats.ibytes;
+    stats->tx_bytes = rte_stats.obytes;
+    stats->rx_errors = rte_stats.ierrors;
+    stats->tx_errors = rte_stats.oerrors;
+    stats->multicast = rte_stats.imcasts;
 
-    ovs_mutex_lock(&dev->mutex);
-    dev->stats_offset = *stats;
+    stats->tx_dropped = dev->stats.tx_dropped;
     ovs_mutex_unlock(&dev->mutex);
 
     return 0;
@@ -1100,7 +1176,7 @@ netdev_dpdk_get_status(const struct netdev *netdev_, struct smap *args)
     struct netdev_dpdk *dev = netdev_dpdk_cast(netdev_);
     struct rte_eth_dev_info dev_info;
 
-    if (dev->port_id <= 0)
+    if (dev->port_id < 0)
         return ENODEV;
 
     ovs_mutex_lock(&dev->mutex);
@@ -1225,12 +1301,15 @@ dpdk_ring_create(const char dev_name[], unsigned int port_no,
         return ENOMEM;
     }
 
+    /* XXX: Add support for multiquque ring. */
     err = snprintf(ring_name, 10, "%s_tx", dev_name);
     if (err < 0) {
         return -err;
     }
 
-    ivshmem->cring_tx = rte_ring_create(ring_name, MAX_RX_QUEUE_LEN, SOCKET0, 0);
+    /* Create single consumer/producer rings, netdev does explicit locking. */
+    ivshmem->cring_tx = rte_ring_create(ring_name, DPDK_RING_SIZE, SOCKET0,
+                                        RING_F_SP_ENQ | RING_F_SC_DEQ);
     if (ivshmem->cring_tx == NULL) {
         rte_free(ivshmem);
         return ENOMEM;
@@ -1241,7 +1320,9 @@ dpdk_ring_create(const char dev_name[], unsigned int port_no,
         return -err;
     }
 
-    ivshmem->cring_rx = rte_ring_create(ring_name, MAX_RX_QUEUE_LEN, SOCKET0, 0);
+    /* Create single consumer/producer rings, netdev does explicit locking. */
+    ivshmem->cring_rx = rte_ring_create(ring_name, DPDK_RING_SIZE, SOCKET0,
+                                        RING_F_SP_ENQ | RING_F_SC_DEQ);
     if (ivshmem->cring_rx == NULL) {
         rte_free(ivshmem);
         return ENOMEM;
@@ -1288,6 +1369,19 @@ dpdk_ring_open(const char dev_name[], unsigned int *eth_port_id) OVS_REQUIRES(dp
     return dpdk_ring_create(dev_name, port_no, eth_port_id);
 }
 
+static int
+netdev_dpdk_ring_send(struct netdev *netdev, int qid OVS_UNUSED,
+                      struct dpif_packet **pkts, int cnt, bool may_steal)
+{
+    struct netdev_dpdk *dev = netdev_dpdk_cast(netdev);
+
+    /* DPDK Rings have a single TX queue, Therefore needs locking. */
+    rte_spinlock_lock(&dev->dpdkr_tx_lock);
+    netdev_dpdk_send__(dev, 0, pkts, cnt, may_steal);
+    rte_spinlock_unlock(&dev->dpdkr_tx_lock);
+    return 0;
+}
+
 static int
 netdev_dpdk_ring_construct(struct netdev *netdev)
 {
@@ -1312,7 +1406,7 @@ unlock_dpdk:
     return err;
 }
 
-#define NETDEV_DPDK_CLASS(NAME, INIT, CONSTRUCT)              \
+#define NETDEV_DPDK_CLASS(NAME, INIT, CONSTRUCT, MULTIQ, SEND)      \
 {                                                             \
     NAME,                                                     \
     INIT,                       /* init */                    \
@@ -1326,8 +1420,13 @@ unlock_dpdk:
     netdev_dpdk_get_config,                                   \
     NULL,                       /* netdev_dpdk_set_config */  \
     NULL,                       /* get_tunnel_config */       \
+    NULL, /* build header */                                  \
+    NULL, /* push header */                                   \
+    NULL, /* pop header */                                    \
+    netdev_dpdk_get_numa_id,    /* get_numa_id */             \
+    MULTIQ,                     /* set_multiq */              \
                                                               \
-    netdev_dpdk_send,           /* send */                    \
+    SEND,                       /* send */                    \
     NULL,                       /* send_wait */               \
                                                               \
     netdev_dpdk_set_etheraddr,                                \
@@ -1339,7 +1438,6 @@ unlock_dpdk:
     netdev_dpdk_get_carrier_resets,                           \
     netdev_dpdk_set_miimon,                                   \
     netdev_dpdk_get_stats,                                    \
-    netdev_dpdk_set_stats,                                    \
     netdev_dpdk_get_features,                                 \
     NULL,                       /* set_advertisements */      \
                                                               \
@@ -1413,13 +1511,17 @@ const struct netdev_class dpdk_class =
     NETDEV_DPDK_CLASS(
         "dpdk",
         dpdk_class_init,
-        netdev_dpdk_construct);
+        netdev_dpdk_construct,
+        netdev_dpdk_set_multiq,
+        netdev_dpdk_eth_send);
 
 const struct netdev_class dpdk_ring_class =
     NETDEV_DPDK_CLASS(
         "dpdkr",
         NULL,
-        netdev_dpdk_ring_construct);
+        netdev_dpdk_ring_construct,
+        NULL,
+        netdev_dpdk_ring_send);
 
 void
 netdev_dpdk_register(void)
@@ -1452,7 +1554,8 @@ pmd_thread_setaffinity_cpu(int cpu)
         return err;
     }
     /* lcore_id 0 is reseved for use by non pmd threads. */
-    RTE_PER_LCORE(_lcore_id) = cpu + 1;
+    ovs_assert(cpu);
+    RTE_PER_LCORE(_lcore_id) = cpu;
 
     return 0;
 }
@@ -1460,9 +1563,6 @@ pmd_thread_setaffinity_cpu(int cpu)
 void
 thread_set_nonpmd(void)
 {
-    /* We cannot have RTE_MAX_LCORE pmd threads, because lcore_id 0 is reserved
-     * for non pmd threads */
-    BUILD_ASSERT(NR_PMD_THREADS < RTE_MAX_LCORE);
     /* We have to use 0 to allow non pmd threads to perform certain DPDK
      * operations, like rte_eth_dev_configure(). */
     RTE_PER_LCORE(_lcore_id) = 0;