#include "odp-util.h"
#include "ofp-print.h"
#include "ofpbuf.h"
+#include "ovs-numa.h"
#include "ovs-thread.h"
#include "ovs-rcu.h"
#include "packet-dpif.h"
#define MP_CACHE_SZ (256 * 2)
#define SOCKET0 0
-#define NON_PMD_THREAD_TX_QUEUE 0
-
#define NIC_PORT_RX_Q_SIZE 2048 /* Size of Physical NIC RX Queue, Max (n+32<=4096)*/
#define NIC_PORT_TX_Q_SIZE 2048 /* Size of Physical NIC TX Queue, Max (n+32<=4096)*/
/* There should be one 'struct dpdk_tx_queue' created for
* each cpu core. */
struct dpdk_tx_queue {
- rte_spinlock_t tx_lock;
+ bool flush_tx; /* Set to true to flush queue everytime */
+ /* pkts are queued. */
int count;
uint64_t tsc;
struct rte_mbuf *burst_pkts[MAX_TX_QUEUE_LEN];
/* In dpdk_list. */
struct list list_node OVS_GUARDED_BY(dpdk_mutex);
+ rte_spinlock_t dpdkr_tx_lock;
};
struct netdev_rxq_dpdk {
}
static void
-netdev_dpdk_set_txq(struct netdev_dpdk *netdev, unsigned int n_txqs)
+netdev_dpdk_alloc_txq(struct netdev_dpdk *netdev, unsigned int n_txqs)
{
int i;
netdev->tx_q = dpdk_rte_mzalloc(n_txqs * sizeof *netdev->tx_q);
+ /* Each index is considered as a cpu core id, since there should
+ * be one tx queue for each cpu core. */
for (i = 0; i < n_txqs; i++) {
- rte_spinlock_init(&netdev->tx_q[i].tx_lock);
+ int numa_id = ovs_numa_get_numa_id(i);
+
+ /* If the corresponding core is not on the same numa node
+ * as 'netdev', flags the 'flush_tx'. */
+ netdev->tx_q[i].flush_tx = netdev->socket_id == numa_id;
}
}
OVS_REQUIRES(dpdk_mutex)
{
struct netdev_dpdk *netdev = netdev_dpdk_cast(netdev_);
+ int sid;
int err = 0;
ovs_mutex_init(&netdev->mutex);
ovs_mutex_lock(&netdev->mutex);
- netdev_dpdk_set_txq(netdev, NR_QUEUE);
+ /* If the 'sid' is negative, it means that the kernel fails
+ * to obtain the pci numa info. In that situation, always
+ * use 'SOCKET0'. */
+ sid = rte_eth_dev_socket_id(port_no);
+ netdev->socket_id = sid < 0 ? SOCKET0 : sid;
+ netdev_dpdk_alloc_txq(netdev, NR_QUEUE);
netdev->port_id = port_no;
netdev->flags = 0;
netdev->mtu = ETHER_MTU;
netdev->max_packet_len = MTU_TO_MAX_LEN(netdev->mtu);
-
- /* XXX: need to discover device node at run time. */
- netdev->socket_id = SOCKET0;
+ rte_spinlock_init(&netdev->dpdkr_tx_lock);
netdev->dpdk_mp = dpdk_mp_get(netdev->socket_id, netdev->mtu);
if (!netdev->dpdk_mp) {
ovs_mutex_lock(&dev->mutex);
- /* XXX: Allow to configure number of queues. */
- smap_add_format(args, "configured_rx_queues", "%u", netdev_->n_rxq);
- smap_add_format(args, "configured_tx_queues", "%u", netdev_->n_rxq);
+ smap_add_format(args, "configured_rx_queues", "%d", netdev_->n_rxq);
+ smap_add_format(args, "configured_tx_queues", "%d", netdev_->n_txq);
ovs_mutex_unlock(&dev->mutex);
return 0;
return err;
}
+ ovs_mutex_lock(&dpdk_mutex);
ovs_mutex_lock(&netdev->mutex);
+
rte_eth_dev_stop(netdev->port_id);
+
netdev->up.n_txq = n_txq;
netdev->up.n_rxq = n_rxq;
+ rte_free(netdev->tx_q);
+ netdev_dpdk_alloc_txq(netdev, n_txq);
err = dpdk_eth_dev_init(netdev);
- if (!err && netdev->up.n_txq != n_txq) {
- rte_free(netdev->tx_q);
- netdev_dpdk_set_txq(netdev, n_txq);
- }
+
ovs_mutex_unlock(&netdev->mutex);
+ ovs_mutex_unlock(&dpdk_mutex);
return err;
}
if (txq->count == 0) {
return;
}
- rte_spinlock_lock(&txq->tx_lock);
dpdk_queue_flush__(dev, qid);
- rte_spinlock_unlock(&txq->tx_lock);
}
static int
int i = 0;
- rte_spinlock_lock(&txq->tx_lock);
while (i < cnt) {
int freeslots = MAX_TX_QUEUE_LEN - txq->count;
int tocopy = MIN(freeslots, cnt-i);
txq->count += tocopy;
i += tocopy;
- if (txq->count == MAX_TX_QUEUE_LEN) {
+ if (txq->count == MAX_TX_QUEUE_LEN || txq->flush_tx) {
dpdk_queue_flush__(dev, qid);
}
diff_tsc = rte_get_timer_cycles() - txq->tsc;
dpdk_queue_flush__(dev, qid);
}
}
- rte_spinlock_unlock(&txq->tx_lock);
}
/* Tx function. Transmit packets indefinitely */
static void
-dpdk_do_tx_copy(struct netdev *netdev, struct dpif_packet ** pkts, int cnt)
+dpdk_do_tx_copy(struct netdev *netdev, int qid, struct dpif_packet ** pkts,
+ int cnt)
OVS_NO_THREAD_SAFETY_ANALYSIS
{
struct netdev_dpdk *dev = netdev_dpdk_cast(netdev);
ovs_mutex_unlock(&dev->mutex);
}
- dpdk_queue_pkts(dev, NON_PMD_THREAD_TX_QUEUE, mbufs, newcnt);
- dpdk_queue_flush(dev, NON_PMD_THREAD_TX_QUEUE);
+ dpdk_queue_pkts(dev, qid, mbufs, newcnt);
+ dpdk_queue_flush(dev, qid);
if (!thread_is_pmd()) {
ovs_mutex_unlock(&nonpmd_mempool_mutex);
}
}
-static int
-netdev_dpdk_send(struct netdev *netdev, int qid, struct dpif_packet **pkts,
- int cnt, bool may_steal)
+static inline void
+netdev_dpdk_send__(struct netdev_dpdk *dev, int qid,
+ struct dpif_packet **pkts, int cnt, bool may_steal)
{
- struct netdev_dpdk *dev = netdev_dpdk_cast(netdev);
- int ret;
int i;
- if (!may_steal || pkts[0]->ofpbuf.source != OFPBUF_DPDK) {
- dpdk_do_tx_copy(netdev, pkts, cnt);
+ if (OVS_UNLIKELY(!may_steal ||
+ pkts[0]->ofpbuf.source != OFPBUF_DPDK)) {
+ struct netdev *netdev = &dev->up;
+
+ dpdk_do_tx_copy(netdev, qid, pkts, cnt);
if (may_steal) {
for (i = 0; i < cnt; i++) {
int next_tx_idx = 0;
int dropped = 0;
- qid = rte_lcore_id();
-
for (i = 0; i < cnt; i++) {
int size = ofpbuf_size(&pkts[i]->ofpbuf);
if (OVS_UNLIKELY(size > dev->max_packet_len)) {
ovs_mutex_unlock(&dev->mutex);
}
}
- ret = 0;
+}
- return ret;
+static int
+netdev_dpdk_eth_send(struct netdev *netdev, int qid,
+ struct dpif_packet **pkts, int cnt, bool may_steal)
+{
+ struct netdev_dpdk *dev = netdev_dpdk_cast(netdev);
+
+ netdev_dpdk_send__(dev, qid, pkts, cnt, may_steal);
+ return 0;
}
static int
return ENOMEM;
}
+ /* XXX: Add support for multiquque ring. */
err = snprintf(ring_name, 10, "%s_tx", dev_name);
if (err < 0) {
return -err;
}
- ivshmem->cring_tx = rte_ring_create(ring_name, DPDK_RING_SIZE, SOCKET0, 0);
+ /* Create single consumer/producer rings, netdev does explicit locking. */
+ ivshmem->cring_tx = rte_ring_create(ring_name, DPDK_RING_SIZE, SOCKET0,
+ RING_F_SP_ENQ | RING_F_SC_DEQ);
if (ivshmem->cring_tx == NULL) {
rte_free(ivshmem);
return ENOMEM;
return -err;
}
- ivshmem->cring_rx = rte_ring_create(ring_name, DPDK_RING_SIZE, SOCKET0, 0);
+ /* Create single consumer/producer rings, netdev does explicit locking. */
+ ivshmem->cring_rx = rte_ring_create(ring_name, DPDK_RING_SIZE, SOCKET0,
+ RING_F_SP_ENQ | RING_F_SC_DEQ);
if (ivshmem->cring_rx == NULL) {
rte_free(ivshmem);
return ENOMEM;
return dpdk_ring_create(dev_name, port_no, eth_port_id);
}
+static int
+netdev_dpdk_ring_send(struct netdev *netdev, int qid OVS_UNUSED,
+ struct dpif_packet **pkts, int cnt, bool may_steal)
+{
+ struct netdev_dpdk *dev = netdev_dpdk_cast(netdev);
+
+ /* DPDK Rings have a single TX queue, Therefore needs locking. */
+ rte_spinlock_lock(&dev->dpdkr_tx_lock);
+ netdev_dpdk_send__(dev, 0, pkts, cnt, may_steal);
+ rte_spinlock_unlock(&dev->dpdkr_tx_lock);
+ return 0;
+}
+
static int
netdev_dpdk_ring_construct(struct netdev *netdev)
{
return err;
}
-#define NETDEV_DPDK_CLASS(NAME, INIT, CONSTRUCT, MULTIQ) \
+#define NETDEV_DPDK_CLASS(NAME, INIT, CONSTRUCT, MULTIQ, SEND) \
{ \
NAME, \
INIT, /* init */ \
netdev_dpdk_get_config, \
NULL, /* netdev_dpdk_set_config */ \
NULL, /* get_tunnel_config */ \
+ NULL, /* build header */ \
+ NULL, /* push header */ \
+ NULL, /* pop header */ \
netdev_dpdk_get_numa_id, /* get_numa_id */ \
MULTIQ, /* set_multiq */ \
\
- netdev_dpdk_send, /* send */ \
+ SEND, /* send */ \
NULL, /* send_wait */ \
\
netdev_dpdk_set_etheraddr, \
"dpdk",
dpdk_class_init,
netdev_dpdk_construct,
- netdev_dpdk_set_multiq);
+ netdev_dpdk_set_multiq,
+ netdev_dpdk_eth_send);
const struct netdev_class dpdk_ring_class =
NETDEV_DPDK_CLASS(
"dpdkr",
NULL,
netdev_dpdk_ring_construct,
- NULL);
+ NULL,
+ netdev_dpdk_ring_send);
void
netdev_dpdk_register(void)
return err;
}
/* lcore_id 0 is reseved for use by non pmd threads. */
- RTE_PER_LCORE(_lcore_id) = cpu + 1;
+ ovs_assert(cpu);
+ RTE_PER_LCORE(_lcore_id) = cpu;
return 0;
}
void
thread_set_nonpmd(void)
{
- /* We cannot have RTE_MAX_LCORE pmd threads, because lcore_id 0 is reserved
- * for non pmd threads */
- BUILD_ASSERT(NR_PMD_THREADS < RTE_MAX_LCORE);
/* We have to use 0 to allow non pmd threads to perform certain DPDK
* operations, like rte_eth_dev_configure(). */
RTE_PER_LCORE(_lcore_id) = 0;