#include "odp-util.h"
#include "ofp-print.h"
#include "ofpbuf.h"
+#include "ovs-numa.h"
#include "ovs-thread.h"
#include "ovs-rcu.h"
#include "packet-dpif.h"
#define MBUF_SIZE(mtu) (MTU_TO_MAX_LEN(mtu) + (512) + \
sizeof(struct rte_mbuf) + RTE_PKTMBUF_HEADROOM)
-/* TODO: mempool size should be based on system resources. */
+/* XXX: mempool size should be based on system resources. */
#define NB_MBUF (4096 * 64)
#define MP_CACHE_SZ (256 * 2)
#define SOCKET0 0
-#define NON_PMD_THREAD_TX_QUEUE 0
-
#define NIC_PORT_RX_Q_SIZE 2048 /* Size of Physical NIC RX Queue, Max (n+32<=4096)*/
#define NIC_PORT_TX_Q_SIZE 2048 /* Size of Physical NIC TX Queue, Max (n+32<=4096)*/
-/* TODO: Needs per NIC value for these constants. */
+/* XXX: Needs per NIC value for these constants. */
#define RX_PTHRESH 32 /* Default values of RX prefetch threshold reg. */
#define RX_HTHRESH 32 /* Default values of RX host threshold reg. */
#define RX_WTHRESH 16 /* Default values of RX write-back threshold reg. */
.rx_adv_conf = {
.rss_conf = {
.rss_key = NULL,
- .rss_hf = ETH_RSS_IPV4_TCP | ETH_RSS_IPV4 | ETH_RSS_IPV6,
+ .rss_hf = ETH_RSS_IPV4_TCP | ETH_RSS_IPV4 | ETH_RSS_IPV6
+ | ETH_RSS_IPV4_UDP | ETH_RSS_IPV6_TCP | ETH_RSS_IPV6_UDP,
},
},
.txmode = {
.txq_flags = ETH_TXQ_FLAGS_NOMULTSEGS|ETH_TXQ_FLAGS_NOOFFLOADS,
};
-enum { MAX_RX_QUEUE_LEN = 64 };
-enum { MAX_TX_QUEUE_LEN = 64 };
+enum { MAX_RX_QUEUE_LEN = 192 };
+enum { MAX_TX_QUEUE_LEN = 384 };
+enum { DPDK_RING_SIZE = 256 };
+BUILD_ASSERT_DECL(IS_POW2(DPDK_RING_SIZE));
enum { DRAIN_TSC = 200000ULL };
static int rte_eal_init_ret = ENODEV;
static struct ovs_mutex dpdk_mutex = OVS_MUTEX_INITIALIZER;
/* Contains all 'struct dpdk_dev's. */
-static struct list dpdk_list OVS_GUARDED_BY(dpdk_mutex)
+static struct ovs_list dpdk_list OVS_GUARDED_BY(dpdk_mutex)
= LIST_INITIALIZER(&dpdk_list);
-static struct list dpdk_mp_list OVS_GUARDED_BY(dpdk_mutex)
+static struct ovs_list dpdk_mp_list OVS_GUARDED_BY(dpdk_mutex)
= LIST_INITIALIZER(&dpdk_mp_list);
/* This mutex must be used by non pmd threads when allocating or freeing
int mtu;
int socket_id;
int refcount;
- struct list list_node OVS_GUARDED_BY(dpdk_mutex);
+ struct ovs_list list_node OVS_GUARDED_BY(dpdk_mutex);
};
+/* There should be one 'struct dpdk_tx_queue' created for
+ * each cpu core. */
struct dpdk_tx_queue {
- rte_spinlock_t tx_lock;
+ bool flush_tx; /* Set to true to flush queue everytime */
+ /* pkts are queued. */
int count;
uint64_t tsc;
struct rte_mbuf *burst_pkts[MAX_TX_QUEUE_LEN];
so we have to keep them around once they've been created
*/
-static struct list dpdk_ring_list OVS_GUARDED_BY(dpdk_mutex)
+static struct ovs_list dpdk_ring_list OVS_GUARDED_BY(dpdk_mutex)
= LIST_INITIALIZER(&dpdk_ring_list);
struct dpdk_ring {
struct rte_ring *cring_rx;
int user_port_id; /* User given port no, parsed from port name */
int eth_port_id; /* ethernet device port id */
- struct list list_node OVS_GUARDED_BY(dpdk_mutex);
+ struct ovs_list list_node OVS_GUARDED_BY(dpdk_mutex);
};
struct netdev_dpdk {
int port_id;
int max_packet_len;
- struct dpdk_tx_queue tx_q[NR_QUEUE];
+ struct dpdk_tx_queue *tx_q;
struct ovs_mutex mutex OVS_ACQ_AFTER(dpdk_mutex);
int mtu;
int socket_id;
int buf_size;
- struct netdev_stats stats_offset;
struct netdev_stats stats;
uint8_t hwaddr[ETH_ADDR_LEN];
int link_reset_cnt;
/* In dpdk_list. */
- struct list list_node OVS_GUARDED_BY(dpdk_mutex);
+ struct ovs_list list_node OVS_GUARDED_BY(dpdk_mutex);
+ rte_spinlock_t dpdkr_tx_lock;
};
struct netdev_rxq_dpdk {
return class->construct == netdev_dpdk_construct;
}
-/* TODO: use dpdk malloc for entire OVS. infact huge page shld be used
+/* XXX: use dpdk malloc for entire OVS. infact huge page shld be used
* for all other sengments data, bss and text. */
static void *
dmp->mtu = mtu;
dmp->refcount = 1;
- if (snprintf(mp_name, RTE_MEMPOOL_NAMESIZE, "ovs_mp_%d", dmp->mtu) < 0) {
+ if (snprintf(mp_name, RTE_MEMPOOL_NAMESIZE, "ovs_mp_%d_%d", dmp->mtu,
+ dmp->socket_id) < 0) {
return NULL;
}
return ENODEV;
}
- diag = rte_eth_dev_configure(dev->port_id, NR_QUEUE, NR_QUEUE, &port_conf);
+ diag = rte_eth_dev_configure(dev->port_id, dev->up.n_rxq, dev->up.n_txq,
+ &port_conf);
if (diag) {
VLOG_ERR("eth dev config error %d",diag);
return -diag;
}
- for (i = 0; i < NR_QUEUE; i++) {
+ for (i = 0; i < dev->up.n_txq; i++) {
diag = rte_eth_tx_queue_setup(dev->port_id, i, NIC_PORT_TX_Q_SIZE,
dev->socket_id, &tx_conf);
if (diag) {
}
}
- for (i = 0; i < NR_QUEUE; i++) {
+ for (i = 0; i < dev->up.n_rxq; i++) {
diag = rte_eth_rx_queue_setup(dev->port_id, i, NIC_PORT_RX_Q_SIZE,
dev->socket_id,
&rx_conf, dev->dpdk_mp->mp);
return &netdev->up;
}
+static void
+netdev_dpdk_alloc_txq(struct netdev_dpdk *netdev, unsigned int n_txqs)
+{
+ int i;
+
+ netdev->tx_q = dpdk_rte_mzalloc(n_txqs * sizeof *netdev->tx_q);
+ /* Each index is considered as a cpu core id, since there should
+ * be one tx queue for each cpu core. */
+ for (i = 0; i < n_txqs; i++) {
+ int numa_id = ovs_numa_get_numa_id(i);
+
+ /* If the corresponding core is not on the same numa node
+ * as 'netdev', flags the 'flush_tx'. */
+ netdev->tx_q[i].flush_tx = netdev->socket_id == numa_id;
+ }
+}
+
static int
-netdev_dpdk_init(struct netdev *netdev_, unsigned int port_no) OVS_REQUIRES(dpdk_mutex)
+netdev_dpdk_init(struct netdev *netdev_, unsigned int port_no)
+ OVS_REQUIRES(dpdk_mutex)
{
struct netdev_dpdk *netdev = netdev_dpdk_cast(netdev_);
+ int sid;
int err = 0;
- int i;
ovs_mutex_init(&netdev->mutex);
ovs_mutex_lock(&netdev->mutex);
- for (i = 0; i < NR_QUEUE; i++) {
- rte_spinlock_init(&netdev->tx_q[i].tx_lock);
- }
-
+ /* If the 'sid' is negative, it means that the kernel fails
+ * to obtain the pci numa info. In that situation, always
+ * use 'SOCKET0'. */
+ sid = rte_eth_dev_socket_id(port_no);
+ netdev->socket_id = sid < 0 ? SOCKET0 : sid;
+ netdev_dpdk_alloc_txq(netdev, NR_QUEUE);
netdev->port_id = port_no;
-
netdev->flags = 0;
netdev->mtu = ETHER_MTU;
netdev->max_packet_len = MTU_TO_MAX_LEN(netdev->mtu);
-
- /* TODO: need to discover device node at run time. */
- netdev->socket_id = SOCKET0;
+ rte_spinlock_init(&netdev->dpdkr_tx_lock);
netdev->dpdk_mp = dpdk_mp_get(netdev->socket_id, netdev->mtu);
if (!netdev->dpdk_mp) {
goto unlock;
}
+ netdev_->n_txq = NR_QUEUE;
+ netdev_->n_rxq = NR_QUEUE;
err = dpdk_eth_dev_init(netdev);
if (err) {
goto unlock;
}
- netdev_->n_rxq = NR_QUEUE;
list_push_back(&dpdk_list, &netdev->list_node);
unlock:
+ if (err) {
+ rte_free(netdev->tx_q);
+ }
ovs_mutex_unlock(&netdev->mutex);
return err;
}
ovs_mutex_unlock(&dev->mutex);
ovs_mutex_lock(&dpdk_mutex);
+ rte_free(dev->tx_q);
list_remove(&dev->list_node);
dpdk_mp_put(dev->dpdk_mp);
ovs_mutex_unlock(&dpdk_mutex);
ovs_mutex_lock(&dev->mutex);
- /* TODO: Allow to configure number of queues. */
- smap_add_format(args, "configured_rx_queues", "%u", netdev_->n_rxq);
- smap_add_format(args, "configured_tx_queues", "%u", netdev_->n_rxq);
+ smap_add_format(args, "configured_rx_queues", "%d", netdev_->n_rxq);
+ smap_add_format(args, "configured_tx_queues", "%d", netdev_->n_txq);
ovs_mutex_unlock(&dev->mutex);
return 0;
}
+static int
+netdev_dpdk_get_numa_id(const struct netdev *netdev_)
+{
+ struct netdev_dpdk *netdev = netdev_dpdk_cast(netdev_);
+
+ return netdev->socket_id;
+}
+
+/* Sets the number of tx queues and rx queues for the dpdk interface.
+ * If the configuration fails, do not try restoring its old configuration
+ * and just returns the error. */
+static int
+netdev_dpdk_set_multiq(struct netdev *netdev_, unsigned int n_txq,
+ unsigned int n_rxq)
+{
+ struct netdev_dpdk *netdev = netdev_dpdk_cast(netdev_);
+ int err = 0;
+
+ if (netdev->up.n_txq == n_txq && netdev->up.n_rxq == n_rxq) {
+ return err;
+ }
+
+ ovs_mutex_lock(&dpdk_mutex);
+ ovs_mutex_lock(&netdev->mutex);
+
+ rte_eth_dev_stop(netdev->port_id);
+
+ netdev->up.n_txq = n_txq;
+ netdev->up.n_rxq = n_rxq;
+ rte_free(netdev->tx_q);
+ netdev_dpdk_alloc_txq(netdev, n_txq);
+ err = dpdk_eth_dev_init(netdev);
+
+ ovs_mutex_unlock(&netdev->mutex);
+ ovs_mutex_unlock(&dpdk_mutex);
+
+ return err;
+}
+
static struct netdev_rxq *
netdev_dpdk_rxq_alloc(void)
{
dpdk_queue_flush__(struct netdev_dpdk *dev, int qid)
{
struct dpdk_tx_queue *txq = &dev->tx_q[qid];
- uint32_t nb_tx;
+ uint32_t nb_tx = 0;
+
+ while (nb_tx != txq->count) {
+ uint32_t ret;
+
+ ret = rte_eth_tx_burst(dev->port_id, qid, txq->burst_pkts + nb_tx,
+ txq->count - nb_tx);
+ if (!ret) {
+ break;
+ }
+
+ nb_tx += ret;
+ }
- nb_tx = rte_eth_tx_burst(dev->port_id, qid, txq->burst_pkts, txq->count);
if (OVS_UNLIKELY(nb_tx != txq->count)) {
/* free buffers, which we couldn't transmit, one at a time (each
* packet could come from a different mempool) */
for (i = nb_tx; i < txq->count; i++) {
rte_pktmbuf_free_seg(txq->burst_pkts[i]);
}
+ ovs_mutex_lock(&dev->mutex);
+ dev->stats.tx_dropped += txq->count-nb_tx;
+ ovs_mutex_unlock(&dev->mutex);
}
+
txq->count = 0;
txq->tsc = rte_get_timer_cycles();
}
if (txq->count == 0) {
return;
}
- rte_spinlock_lock(&txq->tx_lock);
dpdk_queue_flush__(dev, qid);
- rte_spinlock_unlock(&txq->tx_lock);
}
static int
struct netdev_dpdk *dev = netdev_dpdk_cast(netdev);
int nb_rx;
- dpdk_queue_flush(dev, rxq_->queue_id);
+ /* There is only one tx queue for this core. Do not flush other
+ * queueus. */
+ if (rxq_->queue_id == rte_lcore_id()) {
+ dpdk_queue_flush(dev, rxq_->queue_id);
+ }
nb_rx = rte_eth_rx_burst(rx->port_id, rxq_->queue_id,
(struct rte_mbuf **) packets,
int i = 0;
- rte_spinlock_lock(&txq->tx_lock);
while (i < cnt) {
int freeslots = MAX_TX_QUEUE_LEN - txq->count;
int tocopy = MIN(freeslots, cnt-i);
txq->count += tocopy;
i += tocopy;
- if (txq->count == MAX_TX_QUEUE_LEN) {
+ if (txq->count == MAX_TX_QUEUE_LEN || txq->flush_tx) {
dpdk_queue_flush__(dev, qid);
}
diff_tsc = rte_get_timer_cycles() - txq->tsc;
dpdk_queue_flush__(dev, qid);
}
}
- rte_spinlock_unlock(&txq->tx_lock);
}
/* Tx function. Transmit packets indefinitely */
static void
-dpdk_do_tx_copy(struct netdev *netdev, struct dpif_packet ** pkts, int cnt)
+dpdk_do_tx_copy(struct netdev *netdev, int qid, struct dpif_packet ** pkts,
+ int cnt)
OVS_NO_THREAD_SAFETY_ANALYSIS
{
struct netdev_dpdk *dev = netdev_dpdk_cast(netdev);
ovs_mutex_unlock(&dev->mutex);
}
- dpdk_queue_pkts(dev, NON_PMD_THREAD_TX_QUEUE, mbufs, newcnt);
- dpdk_queue_flush(dev, NON_PMD_THREAD_TX_QUEUE);
+ dpdk_queue_pkts(dev, qid, mbufs, newcnt);
+ dpdk_queue_flush(dev, qid);
if (!thread_is_pmd()) {
ovs_mutex_unlock(&nonpmd_mempool_mutex);
}
}
-static int
-netdev_dpdk_send(struct netdev *netdev, struct dpif_packet **pkts, int cnt,
- bool may_steal)
+static inline void
+netdev_dpdk_send__(struct netdev_dpdk *dev, int qid,
+ struct dpif_packet **pkts, int cnt, bool may_steal)
{
- struct netdev_dpdk *dev = netdev_dpdk_cast(netdev);
- int ret;
int i;
- if (!may_steal || pkts[0]->ofpbuf.source != OFPBUF_DPDK) {
- dpdk_do_tx_copy(netdev, pkts, cnt);
+ if (OVS_UNLIKELY(!may_steal ||
+ pkts[0]->ofpbuf.source != OFPBUF_DPDK)) {
+ struct netdev *netdev = &dev->up;
+
+ dpdk_do_tx_copy(netdev, qid, pkts, cnt);
if (may_steal) {
for (i = 0; i < cnt; i++) {
}
}
} else {
- int qid;
int next_tx_idx = 0;
int dropped = 0;
- qid = rte_lcore_id() % NR_QUEUE;
-
for (i = 0; i < cnt; i++) {
int size = ofpbuf_size(&pkts[i]->ofpbuf);
if (OVS_UNLIKELY(size > dev->max_packet_len)) {
ovs_mutex_unlock(&dev->mutex);
}
}
- ret = 0;
+}
- return ret;
+static int
+netdev_dpdk_eth_send(struct netdev *netdev, int qid,
+ struct dpif_packet **pkts, int cnt, bool may_steal)
+{
+ struct netdev_dpdk *dev = netdev_dpdk_cast(netdev);
+
+ netdev_dpdk_send__(dev, qid, pkts, cnt, may_steal);
+ return 0;
}
static int
ovs_mutex_lock(&dev->mutex);
rte_eth_stats_get(dev->port_id, &rte_stats);
- *stats = dev->stats_offset;
-
- stats->rx_packets += rte_stats.ipackets;
- stats->tx_packets += rte_stats.opackets;
- stats->rx_bytes += rte_stats.ibytes;
- stats->tx_bytes += rte_stats.obytes;
- stats->rx_errors += rte_stats.ierrors;
- stats->tx_errors += rte_stats.oerrors;
- stats->multicast += rte_stats.imcasts;
+ memset(stats, 0, sizeof(*stats));
- stats->tx_dropped += dev->stats.tx_dropped;
- ovs_mutex_unlock(&dev->mutex);
-
- return 0;
-}
+ stats->rx_packets = rte_stats.ipackets;
+ stats->tx_packets = rte_stats.opackets;
+ stats->rx_bytes = rte_stats.ibytes;
+ stats->tx_bytes = rte_stats.obytes;
+ stats->rx_errors = rte_stats.ierrors;
+ stats->tx_errors = rte_stats.oerrors;
+ stats->multicast = rte_stats.imcasts;
-static int
-netdev_dpdk_set_stats(struct netdev *netdev, const struct netdev_stats *stats)
-{
- struct netdev_dpdk *dev = netdev_dpdk_cast(netdev);
-
- ovs_mutex_lock(&dev->mutex);
- dev->stats_offset = *stats;
+ stats->tx_dropped = dev->stats.tx_dropped;
ovs_mutex_unlock(&dev->mutex);
return 0;
struct netdev_dpdk *dev = netdev_dpdk_cast(netdev_);
struct rte_eth_dev_info dev_info;
- if (dev->port_id <= 0)
+ if (dev->port_id < 0)
return ENODEV;
ovs_mutex_lock(&dev->mutex);
{
int result;
- result = rte_pmd_init_all();
- if (result) {
- VLOG_ERR("Cannot init PMD");
- return -result;
- }
-
result = rte_eal_pci_probe();
if (result) {
VLOG_ERR("Cannot probe PCI");
return ENOMEM;
}
+ /* XXX: Add support for multiquque ring. */
err = snprintf(ring_name, 10, "%s_tx", dev_name);
if (err < 0) {
return -err;
}
- ivshmem->cring_tx = rte_ring_create(ring_name, MAX_RX_QUEUE_LEN, SOCKET0, 0);
+ /* Create single consumer/producer rings, netdev does explicit locking. */
+ ivshmem->cring_tx = rte_ring_create(ring_name, DPDK_RING_SIZE, SOCKET0,
+ RING_F_SP_ENQ | RING_F_SC_DEQ);
if (ivshmem->cring_tx == NULL) {
rte_free(ivshmem);
return ENOMEM;
return -err;
}
- ivshmem->cring_rx = rte_ring_create(ring_name, MAX_RX_QUEUE_LEN, SOCKET0, 0);
+ /* Create single consumer/producer rings, netdev does explicit locking. */
+ ivshmem->cring_rx = rte_ring_create(ring_name, DPDK_RING_SIZE, SOCKET0,
+ RING_F_SP_ENQ | RING_F_SC_DEQ);
if (ivshmem->cring_rx == NULL) {
rte_free(ivshmem);
return ENOMEM;
}
- err = rte_eth_from_rings(&ivshmem->cring_rx, 1, &ivshmem->cring_tx, 1, SOCKET0);
+ err = rte_eth_from_rings(dev_name, &ivshmem->cring_rx, 1,
+ &ivshmem->cring_tx, 1, SOCKET0);
+
if (err < 0) {
rte_free(ivshmem);
return ENODEV;
return dpdk_ring_create(dev_name, port_no, eth_port_id);
}
+static int
+netdev_dpdk_ring_send(struct netdev *netdev, int qid OVS_UNUSED,
+ struct dpif_packet **pkts, int cnt, bool may_steal)
+{
+ struct netdev_dpdk *dev = netdev_dpdk_cast(netdev);
+
+ /* DPDK Rings have a single TX queue, Therefore needs locking. */
+ rte_spinlock_lock(&dev->dpdkr_tx_lock);
+ netdev_dpdk_send__(dev, 0, pkts, cnt, may_steal);
+ rte_spinlock_unlock(&dev->dpdkr_tx_lock);
+ return 0;
+}
+
static int
netdev_dpdk_ring_construct(struct netdev *netdev)
{
return err;
}
-#define NETDEV_DPDK_CLASS(NAME, INIT, CONSTRUCT) \
+#define NETDEV_DPDK_CLASS(NAME, INIT, CONSTRUCT, MULTIQ, SEND) \
{ \
NAME, \
INIT, /* init */ \
netdev_dpdk_get_config, \
NULL, /* netdev_dpdk_set_config */ \
NULL, /* get_tunnel_config */ \
+ NULL, /* build header */ \
+ NULL, /* push header */ \
+ NULL, /* pop header */ \
+ netdev_dpdk_get_numa_id, /* get_numa_id */ \
+ MULTIQ, /* set_multiq */ \
\
- netdev_dpdk_send, /* send */ \
+ SEND, /* send */ \
NULL, /* send_wait */ \
\
netdev_dpdk_set_etheraddr, \
netdev_dpdk_get_carrier_resets, \
netdev_dpdk_set_miimon, \
netdev_dpdk_get_stats, \
- netdev_dpdk_set_stats, \
netdev_dpdk_get_features, \
NULL, /* set_advertisements */ \
\
ovs_abort(result, "Cannot init EAL\n");
}
- rte_memzone_dump();
+ rte_memzone_dump(stdout);
rte_eal_init_ret = 0;
if (argc > result) {
NETDEV_DPDK_CLASS(
"dpdk",
dpdk_class_init,
- netdev_dpdk_construct);
+ netdev_dpdk_construct,
+ netdev_dpdk_set_multiq,
+ netdev_dpdk_eth_send);
const struct netdev_class dpdk_ring_class =
NETDEV_DPDK_CLASS(
"dpdkr",
NULL,
- netdev_dpdk_ring_construct);
+ netdev_dpdk_ring_construct,
+ NULL,
+ netdev_dpdk_ring_send);
void
netdev_dpdk_register(void)
return err;
}
/* lcore_id 0 is reseved for use by non pmd threads. */
- RTE_PER_LCORE(_lcore_id) = cpu + 1;
+ ovs_assert(cpu);
+ RTE_PER_LCORE(_lcore_id) = cpu;
return 0;
}
void
thread_set_nonpmd(void)
{
- /* We cannot have RTE_MAX_LCORE pmd threads, because lcore_id 0 is reserved
- * for non pmd threads */
- BUILD_ASSERT(NR_PMD_THREADS < RTE_MAX_LCORE);
/* We have to use 0 to allow non pmd threads to perform certain DPDK
* operations, like rte_eth_dev_configure(). */
RTE_PER_LCORE(_lcore_id) = 0;