+ /* Unref all ports and free poll_list. */
+ dp_netdev_pmd_clear_poll_list(pmd);
+
+ /* Purges the 'pmd''s flows after stopping the thread, but before
+ * destroying the flows, so that the flow stats can be collected. */
+ if (dp->dp_purge_cb) {
+ dp->dp_purge_cb(dp->dp_purge_aux, pmd->core_id);
+ }
+ cmap_remove(&pmd->dp->poll_threads, &pmd->node, hash_int(pmd->core_id, 0));
+ dp_netdev_pmd_unref(pmd);
+}
+
+/* Destroys all pmd threads. */
+static void
+dp_netdev_destroy_all_pmds(struct dp_netdev *dp)
+{
+ struct dp_netdev_pmd_thread *pmd;
+ struct dp_netdev_pmd_thread **pmd_list;
+ size_t k = 0, n_pmds;
+
+ n_pmds = cmap_count(&dp->poll_threads);
+ pmd_list = xcalloc(n_pmds, sizeof *pmd_list);
+
+ CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
+ /* We cannot call dp_netdev_del_pmd(), since it alters
+ * 'dp->poll_threads' (while we're iterating it) and it
+ * might quiesce. */
+ ovs_assert(k < n_pmds);
+ pmd_list[k++] = pmd;
+ }
+
+ for (size_t i = 0; i < k; i++) {
+ dp_netdev_del_pmd(dp, pmd_list[i]);
+ }
+ free(pmd_list);
+}
+
+/* Deletes all pmd threads on numa node 'numa_id' and
+ * fixes tx_qids of other threads to keep them sequential. */
+static void
+dp_netdev_del_pmds_on_numa(struct dp_netdev *dp, int numa_id)
+{
+ struct dp_netdev_pmd_thread *pmd;
+ int n_pmds_on_numa, n_pmds;
+ int *free_idx, k = 0;
+ struct dp_netdev_pmd_thread **pmd_list;
+
+ n_pmds_on_numa = get_n_pmd_threads_on_numa(dp, numa_id);
+ free_idx = xcalloc(n_pmds_on_numa, sizeof *free_idx);
+ pmd_list = xcalloc(n_pmds_on_numa, sizeof *pmd_list);
+
+ CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
+ /* We cannot call dp_netdev_del_pmd(), since it alters
+ * 'dp->poll_threads' (while we're iterating it) and it
+ * might quiesce. */
+ if (pmd->numa_id == numa_id) {
+ atomic_read_relaxed(&pmd->tx_qid, &free_idx[k]);
+ pmd_list[k] = pmd;
+ ovs_assert(k < n_pmds_on_numa);
+ k++;
+ }
+ }
+
+ for (int i = 0; i < k; i++) {
+ dp_netdev_del_pmd(dp, pmd_list[i]);
+ }
+
+ n_pmds = get_n_pmd_threads(dp);
+ CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
+ int old_tx_qid;
+
+ atomic_read_relaxed(&pmd->tx_qid, &old_tx_qid);
+
+ if (old_tx_qid >= n_pmds) {
+ int new_tx_qid = free_idx[--k];
+
+ atomic_store_relaxed(&pmd->tx_qid, new_tx_qid);
+ }
+ }
+
+ free(pmd_list);
+ free(free_idx);
+}
+
+/* Deletes all rx queues from pmd->poll_list. */
+static void
+dp_netdev_pmd_clear_poll_list(struct dp_netdev_pmd_thread *pmd)
+{
+ struct rxq_poll *poll;
+
+ ovs_mutex_lock(&pmd->poll_mutex);
+ LIST_FOR_EACH_POP (poll, node, &pmd->poll_list) {
+ port_unref(poll->port);
+ free(poll);
+ }
+ pmd->poll_cnt = 0;
+ ovs_mutex_unlock(&pmd->poll_mutex);
+}
+
+/* Deletes all rx queues of 'port' from poll_list of pmd thread and
+ * reloads it if poll_list was changed. */
+static void
+dp_netdev_del_port_from_pmd(struct dp_netdev_port *port,
+ struct dp_netdev_pmd_thread *pmd)
+{
+ struct rxq_poll *poll, *next;
+ bool found = false;
+
+ ovs_mutex_lock(&pmd->poll_mutex);
+ LIST_FOR_EACH_SAFE (poll, next, node, &pmd->poll_list) {
+ if (poll->port == port) {
+ found = true;
+ port_unref(poll->port);
+ list_remove(&poll->node);
+ pmd->poll_cnt--;
+ free(poll);
+ }
+ }
+ ovs_mutex_unlock(&pmd->poll_mutex);
+ if (found) {
+ dp_netdev_reload_pmd__(pmd);
+ }
+}
+
+/* Deletes all rx queues of 'port' from all pmd threads of dp and
+ * reloads them if needed. */
+static void
+dp_netdev_del_port_from_all_pmds(struct dp_netdev *dp,
+ struct dp_netdev_port *port)
+{
+ int numa_id = netdev_get_numa_id(port->netdev);
+ struct dp_netdev_pmd_thread *pmd;
+
+ CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
+ if (pmd->numa_id == numa_id) {
+ dp_netdev_del_port_from_pmd(port, pmd);
+ }
+ }
+}
+
+/* Returns PMD thread from this numa node with fewer rx queues to poll.
+ * Returns NULL if there is no PMD threads on this numa node.
+ * Can be called safely only by main thread. */
+static struct dp_netdev_pmd_thread *
+dp_netdev_less_loaded_pmd_on_numa(struct dp_netdev *dp, int numa_id)
+{
+ int min_cnt = -1;
+ struct dp_netdev_pmd_thread *pmd, *res = NULL;
+
+ CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
+ if (pmd->numa_id == numa_id
+ && (min_cnt > pmd->poll_cnt || res == NULL)) {
+ min_cnt = pmd->poll_cnt;
+ res = pmd;
+ }
+ }
+
+ return res;
+}
+
+/* Adds rx queue to poll_list of PMD thread. */
+static void
+dp_netdev_add_rxq_to_pmd(struct dp_netdev_pmd_thread *pmd,
+ struct dp_netdev_port *port, struct netdev_rxq *rx)
+ OVS_REQUIRES(pmd->poll_mutex)
+{
+ struct rxq_poll *poll = xmalloc(sizeof *poll);
+
+ port_ref(port);
+ poll->port = port;
+ poll->rx = rx;
+
+ list_push_back(&pmd->poll_list, &poll->node);
+ pmd->poll_cnt++;
+}
+
+/* Distributes all rx queues of 'port' between all PMD threads and reloads
+ * them if needed. */
+static void
+dp_netdev_add_port_to_pmds(struct dp_netdev *dp, struct dp_netdev_port *port)
+{
+ int numa_id = netdev_get_numa_id(port->netdev);
+ struct dp_netdev_pmd_thread *pmd;
+ struct hmapx to_reload;
+ struct hmapx_node *node;
+ int i;
+
+ hmapx_init(&to_reload);
+ /* Cannot create pmd threads for invalid numa node. */
+ ovs_assert(ovs_numa_numa_id_is_valid(numa_id));
+
+ for (i = 0; i < netdev_n_rxq(port->netdev); i++) {
+ pmd = dp_netdev_less_loaded_pmd_on_numa(dp, numa_id);
+ if (!pmd) {
+ /* There is no pmd threads on this numa node. */
+ dp_netdev_set_pmds_on_numa(dp, numa_id);
+ /* Assigning of rx queues done. */
+ break;
+ }
+
+ ovs_mutex_lock(&pmd->poll_mutex);
+ dp_netdev_add_rxq_to_pmd(pmd, port, port->rxq[i]);
+ ovs_mutex_unlock(&pmd->poll_mutex);
+
+ hmapx_add(&to_reload, pmd);
+ }
+
+ HMAPX_FOR_EACH (node, &to_reload) {
+ pmd = (struct dp_netdev_pmd_thread *) node->data;
+ dp_netdev_reload_pmd__(pmd);
+ }
+
+ hmapx_destroy(&to_reload);
+}
+
+/* Checks the numa node id of 'netdev' and starts pmd threads for
+ * the numa node. */
+static void
+dp_netdev_set_pmds_on_numa(struct dp_netdev *dp, int numa_id)
+{
+ int n_pmds;
+
+ if (!ovs_numa_numa_id_is_valid(numa_id)) {
+ VLOG_ERR("Cannot create pmd threads due to numa id (%d)"
+ "invalid", numa_id);
+ return ;
+ }
+
+ n_pmds = get_n_pmd_threads_on_numa(dp, numa_id);
+
+ /* If there are already pmd threads created for the numa node
+ * in which 'netdev' is on, do nothing. Else, creates the
+ * pmd threads for the numa node. */
+ if (!n_pmds) {
+ int can_have, n_unpinned, i, index = 0;
+ struct dp_netdev_pmd_thread **pmds;
+ struct dp_netdev_port *port;
+
+ n_unpinned = ovs_numa_get_n_unpinned_cores_on_numa(numa_id);
+ if (!n_unpinned) {
+ VLOG_ERR("Cannot create pmd threads due to out of unpinned "
+ "cores on numa node");
+ return;
+ }
+
+ /* If cpu mask is specified, uses all unpinned cores, otherwise
+ * tries creating NR_PMD_THREADS pmd threads. */
+ can_have = dp->pmd_cmask ? n_unpinned : MIN(n_unpinned, NR_PMD_THREADS);
+ pmds = xzalloc(can_have * sizeof *pmds);
+ for (i = 0; i < can_have; i++) {
+ unsigned core_id = ovs_numa_get_unpinned_core_on_numa(numa_id);
+ pmds[i] = xzalloc(sizeof **pmds);
+ dp_netdev_configure_pmd(pmds[i], dp, i, core_id, numa_id);
+ }
+
+ /* Distributes rx queues of this numa node between new pmd threads. */
+ CMAP_FOR_EACH (port, node, &dp->ports) {
+ if (netdev_is_pmd(port->netdev)
+ && netdev_get_numa_id(port->netdev) == numa_id) {
+ for (i = 0; i < netdev_n_rxq(port->netdev); i++) {
+ /* Make thread-safety analyser happy. */
+ ovs_mutex_lock(&pmds[index]->poll_mutex);
+ dp_netdev_add_rxq_to_pmd(pmds[index], port, port->rxq[i]);
+ ovs_mutex_unlock(&pmds[index]->poll_mutex);
+ index = (index + 1) % can_have;
+ }
+ }
+ }
+
+ /* Actual start of pmd threads. */
+ for (i = 0; i < can_have; i++) {
+ pmds[i]->thread = ovs_thread_create("pmd", pmd_thread_main, pmds[i]);
+ }
+ free(pmds);
+ VLOG_INFO("Created %d pmd threads on numa node %d", can_have, numa_id);
+ }
+}
+
+\f
+/* Called after pmd threads config change. Restarts pmd threads with
+ * new configuration. */
+static void
+dp_netdev_reset_pmd_threads(struct dp_netdev *dp)
+{
+ struct dp_netdev_port *port;
+
+ CMAP_FOR_EACH (port, node, &dp->ports) {
+ if (netdev_is_pmd(port->netdev)) {
+ int numa_id = netdev_get_numa_id(port->netdev);
+
+ dp_netdev_set_pmds_on_numa(dp, numa_id);
+ }
+ }
+}
+
+static char *
+dpif_netdev_get_datapath_version(void)
+{
+ return xstrdup("<built-in>");
+}
+
+static void
+dp_netdev_flow_used(struct dp_netdev_flow *netdev_flow, int cnt, int size,
+ uint16_t tcp_flags, long long now)
+{
+ uint16_t flags;
+
+ atomic_store_relaxed(&netdev_flow->stats.used, now);
+ non_atomic_ullong_add(&netdev_flow->stats.packet_count, cnt);
+ non_atomic_ullong_add(&netdev_flow->stats.byte_count, size);
+ atomic_read_relaxed(&netdev_flow->stats.tcp_flags, &flags);
+ flags |= tcp_flags;
+ atomic_store_relaxed(&netdev_flow->stats.tcp_flags, flags);
+}
+
+static void
+dp_netdev_count_packet(struct dp_netdev_pmd_thread *pmd,
+ enum dp_stat_type type, int cnt)
+{
+ non_atomic_ullong_add(&pmd->stats.n[type], cnt);
+}
+
+static int
+dp_netdev_upcall(struct dp_netdev_pmd_thread *pmd, struct dp_packet *packet_,
+ struct flow *flow, struct flow_wildcards *wc, ovs_u128 *ufid,
+ enum dpif_upcall_type type, const struct nlattr *userdata,
+ struct ofpbuf *actions, struct ofpbuf *put_actions)
+{
+ struct dp_netdev *dp = pmd->dp;
+ struct flow_tnl orig_tunnel;
+ int err;
+
+ if (OVS_UNLIKELY(!dp->upcall_cb)) {
+ return ENODEV;
+ }
+
+ /* Upcall processing expects the Geneve options to be in the translated
+ * format but we need to retain the raw format for datapath use. */
+ orig_tunnel.flags = flow->tunnel.flags;
+ if (flow->tunnel.flags & FLOW_TNL_F_UDPIF) {
+ orig_tunnel.metadata.present.len = flow->tunnel.metadata.present.len;
+ memcpy(orig_tunnel.metadata.opts.gnv, flow->tunnel.metadata.opts.gnv,
+ flow->tunnel.metadata.present.len);
+ err = tun_metadata_from_geneve_udpif(&orig_tunnel, &orig_tunnel,
+ &flow->tunnel);
+ if (err) {
+ return err;
+ }
+ }
+
+ if (OVS_UNLIKELY(!VLOG_DROP_DBG(&upcall_rl))) {
+ struct ds ds = DS_EMPTY_INITIALIZER;
+ char *packet_str;
+ struct ofpbuf key;
+ struct odp_flow_key_parms odp_parms = {
+ .flow = flow,
+ .mask = &wc->masks,
+ .odp_in_port = flow->in_port.odp_port,
+ .support = dp_netdev_support,
+ };
+
+ ofpbuf_init(&key, 0);
+ odp_flow_key_from_flow(&odp_parms, &key);
+ packet_str = ofp_packet_to_string(dp_packet_data(packet_),
+ dp_packet_size(packet_));
+
+ odp_flow_key_format(key.data, key.size, &ds);
+
+ VLOG_DBG("%s: %s upcall:\n%s\n%s", dp->name,
+ dpif_upcall_type_to_string(type), ds_cstr(&ds), packet_str);
+
+ ofpbuf_uninit(&key);
+ free(packet_str);
+
+ ds_destroy(&ds);
+ }
+
+ err = dp->upcall_cb(packet_, flow, ufid, pmd->core_id, type, userdata,
+ actions, wc, put_actions, dp->upcall_aux);
+ if (err && err != ENOSPC) {
+ return err;
+ }
+
+ /* Translate tunnel metadata masks to datapath format. */
+ if (wc) {
+ if (wc->masks.tunnel.metadata.present.map) {
+ struct geneve_opt opts[TLV_TOT_OPT_SIZE /
+ sizeof(struct geneve_opt)];
+
+ if (orig_tunnel.flags & FLOW_TNL_F_UDPIF) {
+ tun_metadata_to_geneve_udpif_mask(&flow->tunnel,
+ &wc->masks.tunnel,
+ orig_tunnel.metadata.opts.gnv,
+ orig_tunnel.metadata.present.len,
+ opts);
+ } else {
+ orig_tunnel.metadata.present.len = 0;
+ }
+
+ memset(&wc->masks.tunnel.metadata, 0,
+ sizeof wc->masks.tunnel.metadata);
+ memcpy(&wc->masks.tunnel.metadata.opts.gnv, opts,
+ orig_tunnel.metadata.present.len);
+ }
+ wc->masks.tunnel.metadata.present.len = 0xff;
+ }
+
+ /* Restore tunnel metadata. We need to use the saved options to ensure
+ * that any unknown options are not lost. The generated mask will have
+ * the same structure, matching on types and lengths but wildcarding
+ * option data we don't care about. */
+ if (orig_tunnel.flags & FLOW_TNL_F_UDPIF) {
+ memcpy(&flow->tunnel.metadata.opts.gnv, orig_tunnel.metadata.opts.gnv,
+ orig_tunnel.metadata.present.len);
+ flow->tunnel.metadata.present.len = orig_tunnel.metadata.present.len;
+ flow->tunnel.flags |= FLOW_TNL_F_UDPIF;
+ }
+
+ return err;