+ ovs_mutex_unlock(&pmd->poll_mutex);
+ if (found) {
+ dp_netdev_reload_pmd__(pmd);
+ }
+}
+
+/* Deletes all rx queues of 'port' from all pmd threads of dp and
+ * reloads them if needed. */
+static void
+dp_netdev_del_port_from_all_pmds(struct dp_netdev *dp,
+ struct dp_netdev_port *port)
+{
+ int numa_id = netdev_get_numa_id(port->netdev);
+ struct dp_netdev_pmd_thread *pmd;
+
+ CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
+ if (pmd->numa_id == numa_id) {
+ dp_netdev_del_port_from_pmd(port, pmd);
+ }
+ }
+}
+
+/* Returns PMD thread from this numa node with fewer rx queues to poll.
+ * Returns NULL if there is no PMD threads on this numa node.
+ * Can be called safely only by main thread. */
+static struct dp_netdev_pmd_thread *
+dp_netdev_less_loaded_pmd_on_numa(struct dp_netdev *dp, int numa_id)
+{
+ int min_cnt = -1;
+ struct dp_netdev_pmd_thread *pmd, *res = NULL;
+
+ CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
+ if (pmd->numa_id == numa_id
+ && (min_cnt > pmd->poll_cnt || res == NULL)) {
+ min_cnt = pmd->poll_cnt;
+ res = pmd;
+ }
+ }
+
+ return res;
+}
+
+/* Adds rx queue to poll_list of PMD thread. */
+static void
+dp_netdev_add_rxq_to_pmd(struct dp_netdev_pmd_thread *pmd,
+ struct dp_netdev_port *port, struct netdev_rxq *rx)
+ OVS_REQUIRES(pmd->poll_mutex)
+{
+ struct rxq_poll *poll = xmalloc(sizeof *poll);
+
+ port_ref(port);
+ poll->port = port;
+ poll->rx = rx;
+
+ list_push_back(&pmd->poll_list, &poll->node);
+ pmd->poll_cnt++;
+}
+
+/* Distributes all rx queues of 'port' between all PMD threads and reloads
+ * them if needed. */
+static void
+dp_netdev_add_port_to_pmds(struct dp_netdev *dp, struct dp_netdev_port *port)
+{
+ int numa_id = netdev_get_numa_id(port->netdev);
+ struct dp_netdev_pmd_thread *pmd;
+ struct hmapx to_reload;
+ struct hmapx_node *node;
+ int i;
+
+ hmapx_init(&to_reload);
+ /* Cannot create pmd threads for invalid numa node. */
+ ovs_assert(ovs_numa_numa_id_is_valid(numa_id));
+
+ for (i = 0; i < netdev_n_rxq(port->netdev); i++) {
+ pmd = dp_netdev_less_loaded_pmd_on_numa(dp, numa_id);
+ if (!pmd) {
+ /* There is no pmd threads on this numa node. */
+ dp_netdev_set_pmds_on_numa(dp, numa_id);
+ /* Assigning of rx queues done. */
+ break;
+ }
+
+ ovs_mutex_lock(&pmd->poll_mutex);
+ dp_netdev_add_rxq_to_pmd(pmd, port, port->rxq[i]);
+ ovs_mutex_unlock(&pmd->poll_mutex);
+
+ hmapx_add(&to_reload, pmd);
+ }
+
+ HMAPX_FOR_EACH (node, &to_reload) {
+ pmd = (struct dp_netdev_pmd_thread *) node->data;
+ dp_netdev_reload_pmd__(pmd);
+ }
+
+ hmapx_destroy(&to_reload);