netlink-socket: Refill comment to fit within 79 columns.
[cascardo/ovs.git] / lib / netdev-dpdk.c
index ba41d2e..a9f7b7c 100644 (file)
@@ -38,6 +38,7 @@
 #include "ofpbuf.h"
 #include "ovs-thread.h"
 #include "ovs-rcu.h"
+#include "packet-dpif.h"
 #include "packets.h"
 #include "shash.h"
 #include "sset.h"
@@ -70,6 +71,9 @@ static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 20);
 
 #define NON_PMD_THREAD_TX_QUEUE 0
 
+#define NIC_PORT_RX_Q_SIZE 2048  /* Size of Physical NIC RX Queue, Max (n+32<=4096)*/
+#define NIC_PORT_TX_Q_SIZE 2048  /* Size of Physical NIC TX Queue, Max (n+32<=4096)*/
+
 /* TODO: Needs per NIC value for these constants. */
 #define RX_PTHRESH 32 /* Default values of RX prefetch threshold reg. */
 #define RX_HTHRESH 32 /* Default values of RX host threshold reg. */
@@ -80,42 +84,43 @@ static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 20);
 #define TX_WTHRESH 0  /* Default values of TX write-back threshold reg. */
 
 static const struct rte_eth_conf port_conf = {
-        .rxmode = {
-                .mq_mode = ETH_MQ_RX_RSS,
-                .split_hdr_size = 0,
-                .header_split   = 0, /* Header Split disabled */
-                .hw_ip_checksum = 0, /* IP checksum offload disabled */
-                .hw_vlan_filter = 0, /* VLAN filtering disabled */
-                .jumbo_frame    = 0, /* Jumbo Frame Support disabled */
-                .hw_strip_crc   = 0,
-        },
-        .rx_adv_conf = {
-                .rss_conf = {
-                        .rss_key = NULL,
-                        .rss_hf = ETH_RSS_IPV4_TCP | ETH_RSS_IPV4 | ETH_RSS_IPV6,
-                },
-        },
-        .txmode = {
-                .mq_mode = ETH_MQ_TX_NONE,
+    .rxmode = {
+        .mq_mode = ETH_MQ_RX_RSS,
+        .split_hdr_size = 0,
+        .header_split   = 0, /* Header Split disabled */
+        .hw_ip_checksum = 0, /* IP checksum offload disabled */
+        .hw_vlan_filter = 0, /* VLAN filtering disabled */
+        .jumbo_frame    = 0, /* Jumbo Frame Support disabled */
+        .hw_strip_crc   = 0,
+    },
+    .rx_adv_conf = {
+        .rss_conf = {
+            .rss_key = NULL,
+            .rss_hf = ETH_RSS_IPV4_TCP | ETH_RSS_IPV4 | ETH_RSS_IPV6,
         },
+    },
+    .txmode = {
+        .mq_mode = ETH_MQ_TX_NONE,
+    },
 };
 
 static const struct rte_eth_rxconf rx_conf = {
-        .rx_thresh = {
-                .pthresh = RX_PTHRESH,
-                .hthresh = RX_HTHRESH,
-                .wthresh = RX_WTHRESH,
-        },
+    .rx_thresh = {
+        .pthresh = RX_PTHRESH,
+        .hthresh = RX_HTHRESH,
+        .wthresh = RX_WTHRESH,
+    },
 };
 
 static const struct rte_eth_txconf tx_conf = {
-        .tx_thresh = {
-                .pthresh = TX_PTHRESH,
-                .hthresh = TX_HTHRESH,
-                .wthresh = TX_WTHRESH,
-        },
-        .tx_free_thresh = 0,
-        .tx_rs_thresh = 0,
+    .tx_thresh = {
+        .pthresh = TX_PTHRESH,
+        .hthresh = TX_HTHRESH,
+        .wthresh = TX_WTHRESH,
+    },
+    .tx_free_thresh = 0,
+    .tx_rs_thresh = 0,
+    .txq_flags = ETH_TXQ_FLAGS_NOMULTSEGS|ETH_TXQ_FLAGS_NOOFFLOADS,
 };
 
 enum { MAX_RX_QUEUE_LEN = 64 };
@@ -203,9 +208,10 @@ dpdk_rte_mzalloc(size_t sz)
 }
 
 void
-free_dpdk_buf(struct ofpbuf *b)
+free_dpdk_buf(struct dpif_packet *p)
 {
-    struct rte_mbuf *pkt = (struct rte_mbuf *) b;
+    struct ofpbuf *buf = &p->ofpbuf;
+    struct rte_mbuf *pkt = (struct rte_mbuf *) buf->dpdk_buf;
 
     rte_mempool_put(pkt->pool, pkt);
 }
@@ -217,16 +223,16 @@ __rte_pktmbuf_init(struct rte_mempool *mp,
                    unsigned i OVS_UNUSED)
 {
     struct rte_mbuf *m = _m;
-    uint32_t buf_len = mp->elt_size - sizeof(struct ofpbuf);
+    uint32_t buf_len = mp->elt_size - sizeof(struct dpif_packet);
 
-    RTE_MBUF_ASSERT(mp->elt_size >= sizeof(struct ofpbuf));
+    RTE_MBUF_ASSERT(mp->elt_size >= sizeof(struct dpif_packet));
 
     memset(m, 0, mp->elt_size);
 
     /* start of buffer is just after mbuf structure */
-    m->buf_addr = (char *)m + sizeof(struct ofpbuf);
+    m->buf_addr = (char *)m + sizeof(struct dpif_packet);
     m->buf_physaddr = rte_mempool_virt2phy(mp, m) +
-                    sizeof(struct ofpbuf);
+                    sizeof(struct dpif_packet);
     m->buf_len = (uint16_t)buf_len;
 
     /* keep some headroom between start of buffer and data */
@@ -369,8 +375,8 @@ dpdk_eth_dev_init(struct netdev_dpdk *dev) OVS_REQUIRES(dpdk_mutex)
     }
 
     for (i = 0; i < NR_QUEUE; i++) {
-        diag = rte_eth_tx_queue_setup(dev->port_id, i, MAX_TX_QUEUE_LEN, 0,
-                                      &tx_conf);
+        diag = rte_eth_tx_queue_setup(dev->port_id, i, NIC_PORT_TX_Q_SIZE,
+                                      dev->socket_id, &tx_conf);
         if (diag) {
             VLOG_ERR("eth dev tx queue setup error %d",diag);
             return diag;
@@ -378,7 +384,8 @@ dpdk_eth_dev_init(struct netdev_dpdk *dev) OVS_REQUIRES(dpdk_mutex)
     }
 
     for (i = 0; i < NR_QUEUE; i++) {
-        diag = rte_eth_rx_queue_setup(dev->port_id, i, MAX_RX_QUEUE_LEN, 0,
+        diag = rte_eth_rx_queue_setup(dev->port_id, i, NIC_PORT_RX_Q_SIZE,
+                                      dev->socket_id,
                                       &rx_conf, dev->dpdk_mp->mp);
         if (diag) {
             VLOG_ERR("eth dev rx queue setup error %d",diag);
@@ -563,29 +570,39 @@ netdev_dpdk_rxq_dealloc(struct netdev_rxq *rxq_)
     rte_free(rx);
 }
 
-inline static void
-dpdk_queue_flush(struct netdev_dpdk *dev, int qid)
+static inline void
+dpdk_queue_flush__(struct netdev_dpdk *dev, int qid)
 {
     struct dpdk_tx_queue *txq = &dev->tx_q[qid];
     uint32_t nb_tx;
 
-    if (txq->count == 0) {
-        return;
-    }
-    rte_spinlock_lock(&txq->tx_lock);
     nb_tx = rte_eth_tx_burst(dev->port_id, qid, txq->burst_pkts, txq->count);
-    if (nb_tx != txq->count) {
+    if (OVS_UNLIKELY(nb_tx != txq->count)) {
         /* free buffers if we couldn't transmit packets */
         rte_mempool_put_bulk(dev->dpdk_mp->mp,
                              (void **) &txq->burst_pkts[nb_tx],
                              (txq->count - nb_tx));
     }
     txq->count = 0;
+    txq->tsc = rte_get_timer_cycles();
+}
+
+static inline void
+dpdk_queue_flush(struct netdev_dpdk *dev, int qid)
+{
+    struct dpdk_tx_queue *txq = &dev->tx_q[qid];
+
+    if (txq->count == 0) {
+        return;
+    }
+    rte_spinlock_lock(&txq->tx_lock);
+    dpdk_queue_flush__(dev, qid);
     rte_spinlock_unlock(&txq->tx_lock);
 }
 
 static int
-netdev_dpdk_rxq_recv(struct netdev_rxq *rxq_, struct ofpbuf **packets, int *c)
+netdev_dpdk_rxq_recv(struct netdev_rxq *rxq_, struct dpif_packet **packets,
+                     int *c)
 {
     struct netdev_rxq_dpdk *rx = netdev_rxq_dpdk_cast(rxq_);
     struct netdev *netdev = rx->up.netdev;
@@ -595,7 +612,9 @@ netdev_dpdk_rxq_recv(struct netdev_rxq *rxq_, struct ofpbuf **packets, int *c)
     dpdk_queue_flush(dev, rxq_->queue_id);
 
     nb_rx = rte_eth_rx_burst(rx->port_id, rxq_->queue_id,
-                             (struct rte_mbuf **) packets, MAX_RX_QUEUE_LEN);
+                             (struct rte_mbuf **) packets,
+                             MIN((int)NETDEV_MAX_RX_BATCH,
+                                 (int)MAX_RX_QUEUE_LEN));
     if (!nb_rx) {
         return EAGAIN;
     }
@@ -606,103 +625,136 @@ netdev_dpdk_rxq_recv(struct netdev_rxq *rxq_, struct ofpbuf **packets, int *c)
 }
 
 inline static void
-dpdk_queue_pkt(struct netdev_dpdk *dev, int qid,
-               struct rte_mbuf *pkt)
+dpdk_queue_pkts(struct netdev_dpdk *dev, int qid,
+               struct rte_mbuf **pkts, int cnt)
 {
     struct dpdk_tx_queue *txq = &dev->tx_q[qid];
     uint64_t diff_tsc;
-    uint64_t cur_tsc;
-    uint32_t nb_tx;
+
+    int i = 0;
 
     rte_spinlock_lock(&txq->tx_lock);
-    txq->burst_pkts[txq->count++] = pkt;
-    if (txq->count == MAX_TX_QUEUE_LEN) {
-        goto flush;
-    }
-    cur_tsc = rte_get_timer_cycles();
-    if (txq->count == 1) {
-        txq->tsc = cur_tsc;
-    }
-    diff_tsc = cur_tsc - txq->tsc;
-    if (diff_tsc >= DRAIN_TSC) {
-        goto flush;
-    }
-    rte_spinlock_unlock(&txq->tx_lock);
-    return;
+    while (i < cnt) {
+        int freeslots = MAX_TX_QUEUE_LEN - txq->count;
+        int tocopy = MIN(freeslots, cnt-i);
 
-flush:
-    nb_tx = rte_eth_tx_burst(dev->port_id, qid, txq->burst_pkts, txq->count);
-    if (nb_tx != txq->count) {
-        /* free buffers if we couldn't transmit packets */
-        rte_mempool_put_bulk(dev->dpdk_mp->mp,
-                             (void **) &txq->burst_pkts[nb_tx],
-                             (txq->count - nb_tx));
+        memcpy(&txq->burst_pkts[txq->count], &pkts[i],
+               tocopy * sizeof (struct rte_mbuf *));
+
+        txq->count += tocopy;
+        i += tocopy;
+
+        if (txq->count == MAX_TX_QUEUE_LEN) {
+            dpdk_queue_flush__(dev, qid);
+        }
+        diff_tsc = rte_get_timer_cycles() - txq->tsc;
+        if (diff_tsc >= DRAIN_TSC) {
+            dpdk_queue_flush__(dev, qid);
+        }
     }
-    txq->count = 0;
     rte_spinlock_unlock(&txq->tx_lock);
 }
 
 /* Tx function. Transmit packets indefinitely */
 static void
-dpdk_do_tx_copy(struct netdev *netdev, char *buf, int size)
+dpdk_do_tx_copy(struct netdev *netdev, struct dpif_packet ** pkts, int cnt)
 {
     struct netdev_dpdk *dev = netdev_dpdk_cast(netdev);
-    struct rte_mbuf *pkt;
+    struct rte_mbuf *mbufs[cnt];
+    int dropped = 0;
+    int newcnt = 0;
+    int i;
+
+    for (i = 0; i < cnt; i++) {
+        int size = ofpbuf_size(&pkts[i]->ofpbuf);
+        if (OVS_UNLIKELY(size > dev->max_packet_len)) {
+            VLOG_WARN_RL(&rl, "Too big size %d max_packet_len %d",
+                         (int)size , dev->max_packet_len);
+
+            dropped++;
+            continue;
+        }
+
+        mbufs[newcnt] = rte_pktmbuf_alloc(dev->dpdk_mp->mp);
+
+        if (!mbufs[newcnt]) {
+            dropped += cnt - i;
+            break;
+        }
 
-    pkt = rte_pktmbuf_alloc(dev->dpdk_mp->mp);
-    if (!pkt) {
+        /* We have to do a copy for now */
+        memcpy(mbufs[newcnt]->pkt.data, ofpbuf_data(&pkts[i]->ofpbuf), size);
+
+        rte_pktmbuf_data_len(mbufs[newcnt]) = size;
+        rte_pktmbuf_pkt_len(mbufs[newcnt]) = size;
+
+        newcnt++;
+    }
+
+    if (OVS_UNLIKELY(dropped)) {
         ovs_mutex_lock(&dev->mutex);
-        dev->stats.tx_dropped++;
+        dev->stats.tx_dropped += dropped;
         ovs_mutex_unlock(&dev->mutex);
-        return;
     }
 
-    /* We have to do a copy for now */
-    memcpy(pkt->pkt.data, buf, size);
-
-    rte_pktmbuf_data_len(pkt) = size;
-    rte_pktmbuf_pkt_len(pkt) = size;
-
-    dpdk_queue_pkt(dev, NON_PMD_THREAD_TX_QUEUE, pkt);
+    dpdk_queue_pkts(dev, NON_PMD_THREAD_TX_QUEUE, mbufs, newcnt);
     dpdk_queue_flush(dev, NON_PMD_THREAD_TX_QUEUE);
 }
 
 static int
-netdev_dpdk_send(struct netdev *netdev,
-                 struct ofpbuf *ofpbuf, bool may_steal)
+netdev_dpdk_send(struct netdev *netdev, struct dpif_packet **pkts, int cnt,
+                 bool may_steal)
 {
     struct netdev_dpdk *dev = netdev_dpdk_cast(netdev);
     int ret;
+    int i;
 
-    if (ofpbuf_size(ofpbuf) > dev->max_packet_len) {
-        VLOG_WARN_RL(&rl, "Too big size %d max_packet_len %d",
-                     (int)ofpbuf_size(ofpbuf) , dev->max_packet_len);
-
-        ovs_mutex_lock(&dev->mutex);
-        dev->stats.tx_dropped++;
-        ovs_mutex_unlock(&dev->mutex);
-
-        ret = E2BIG;
-        goto out;
-    }
-
-    if (!may_steal || ofpbuf->source != OFPBUF_DPDK) {
-        dpdk_do_tx_copy(netdev, (char *) ofpbuf_data(ofpbuf), ofpbuf_size(ofpbuf));
+    if (!may_steal || pkts[0]->ofpbuf.source != OFPBUF_DPDK) {
+        dpdk_do_tx_copy(netdev, pkts, cnt);
 
         if (may_steal) {
-            ofpbuf_delete(ofpbuf);
+            for (i = 0; i < cnt; i++) {
+                dpif_packet_delete(pkts[i]);
+            }
         }
     } else {
         int qid;
+        int next_tx_idx = 0;
+        int dropped = 0;
 
         qid = rte_lcore_id() % NR_QUEUE;
 
-        dpdk_queue_pkt(dev, qid, (struct rte_mbuf *)ofpbuf);
+        for (i = 0; i < cnt; i++) {
+            int size = ofpbuf_size(&pkts[i]->ofpbuf);
+            if (OVS_UNLIKELY(size > dev->max_packet_len)) {
+                if (next_tx_idx != i) {
+                    dpdk_queue_pkts(dev, qid,
+                                    (struct rte_mbuf **)&pkts[next_tx_idx],
+                                    i-next_tx_idx);
+                }
+
+                VLOG_WARN_RL(&rl, "Too big size %d max_packet_len %d",
+                             (int)size , dev->max_packet_len);
+
+                dpif_packet_delete(pkts[i]);
+                dropped++;
+                next_tx_idx = i + 1;
+            }
+        }
+        if (next_tx_idx != cnt) {
+           dpdk_queue_pkts(dev, qid,
+                            (struct rte_mbuf **)&pkts[next_tx_idx],
+                            cnt-next_tx_idx);
+        }
 
+        if (OVS_UNLIKELY(dropped)) {
+            ovs_mutex_lock(&dev->mutex);
+            dev->stats.tx_dropped += dropped;
+            ovs_mutex_unlock(&dev->mutex);
+        }
     }
     ret = 0;
 
-out:
     return ret;
 }
 
@@ -778,7 +830,6 @@ netdev_dpdk_set_mtu(const struct netdev *netdev, int mtu)
 
     err = dpdk_eth_dev_init(dev);
     if (err) {
-
         dpdk_mp_put(mp);
         dev->mtu = old_mtu;
         dev->dpdk_mp = old_mp;
@@ -1176,21 +1227,29 @@ dpdk_init(int argc, char **argv)
 {
     int result;
 
-    if (strcmp(argv[1], "--dpdk"))
+    if (argc < 2 || strcmp(argv[1], "--dpdk"))
         return 0;
 
+    /* Make sure program name passed to rte_eal_init() is vswitchd. */
+    argv[1] = argv[0];
+
     argc--;
     argv++;
 
     /* Make sure things are initialized ... */
     result = rte_eal_init(argc, argv);
-    if (result < 0)
+    if (result < 0) {
         ovs_abort(result, "Cannot init EAL\n");
+    }
 
     rte_memzone_dump();
     rte_eal_init_ret = 0;
 
-    return result;
+    if (argc > result) {
+        argv[result] = argv[0];
+    }
+
+    return result + 1;
 }
 
 void