X-Git-Url: http://git.cascardo.eti.br/?a=blobdiff_plain;f=lib%2Fdpif-netdev.c;h=78ed4863353d104a76fe5de41bb8a77f8541ba7f;hb=2bc1bbd27ded5752f438ff35673f38a527ea0915;hp=55712dde1674beb117e1ea70db81d3aec82a2a07;hpb=27bbe15dec4e1862396b5c4d265f0ced71b49930;p=cascardo%2Fovs.git diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c index 55712dde1..78ed48633 100644 --- a/lib/dpif-netdev.c +++ b/lib/dpif-netdev.c @@ -15,7 +15,7 @@ */ #include -#include "dpif.h" +#include "dpif-netdev.h" #include #include @@ -31,16 +31,19 @@ #include #include -#include "classifier.h" +#include "cmap.h" #include "csum.h" +#include "dp-packet.h" #include "dpif.h" #include "dpif-provider.h" #include "dummy.h" #include "dynamic-string.h" +#include "fat-rwlock.h" #include "flow.h" -#include "hmap.h" +#include "cmap.h" #include "latch.h" #include "list.h" +#include "match.h" #include "meta-flow.h" #include "netdev.h" #include "netdev-dpdk.h" @@ -50,24 +53,24 @@ #include "odp-util.h" #include "ofp-print.h" #include "ofpbuf.h" +#include "ovs-numa.h" #include "ovs-rcu.h" #include "packets.h" #include "poll-loop.h" +#include "pvector.h" #include "random.h" #include "seq.h" #include "shash.h" #include "sset.h" #include "timeval.h" +#include "tnl-arp-cache.h" #include "unixctl.h" #include "util.h" -#include "vlog.h" +#include "openvswitch/vlog.h" VLOG_DEFINE_THIS_MODULE(dpif_netdev); -/* By default, choose a priority in the middle. */ -#define NETDEV_RULE_PRIORITY 0x8000 - -#define NR_THREADS 1 +#define FLOW_DUMP_MAX_BATCH 50 /* Use per thread recirc_depth to prevent recirculation loop. */ #define MAX_RECIRC_DEPTH 5 DEFINE_STATIC_PER_THREAD_DATA(uint32_t, recirc_depth, 0) @@ -75,11 +78,6 @@ DEFINE_STATIC_PER_THREAD_DATA(uint32_t, recirc_depth, 0) /* Configuration parameters. */ enum { MAX_FLOWS = 65536 }; /* Maximum number of flows in flow table. */ -/* Queues. */ -enum { MAX_QUEUE_LEN = 128 }; /* Maximum number of packets per queue. */ -enum { QUEUE_MASK = MAX_QUEUE_LEN - 1 }; -BUILD_ASSERT_DECL(IS_POW2(MAX_QUEUE_LEN)); - /* Protects against changes to 'dp_netdevs'. */ static struct ovs_mutex dp_netdev_mutex = OVS_MUTEX_INITIALIZER; @@ -87,27 +85,85 @@ static struct ovs_mutex dp_netdev_mutex = OVS_MUTEX_INITIALIZER; static struct shash dp_netdevs OVS_GUARDED_BY(dp_netdev_mutex) = SHASH_INITIALIZER(&dp_netdevs); -struct dp_netdev_upcall { - struct dpif_upcall upcall; /* Queued upcall information. */ - struct ofpbuf buf; /* ofpbuf instance for upcall.packet. */ +static struct vlog_rate_limit upcall_rl = VLOG_RATE_LIMIT_INIT(600, 600); + +/* Stores a miniflow with inline values */ + +struct netdev_flow_key { + uint32_t hash; /* Hash function differs for different users. */ + uint32_t len; /* Length of the following miniflow (incl. map). */ + struct miniflow mf; + uint64_t buf[FLOW_MAX_PACKET_U64S - MINI_N_INLINE]; }; -/* A queue passing packets from a struct dp_netdev to its clients (handlers). +/* Exact match cache for frequently used flows + * + * The cache uses a 32-bit hash of the packet (which can be the RSS hash) to + * search its entries for a miniflow that matches exactly the miniflow of the + * packet. It stores the 'dpcls_rule' (rule) that matches the miniflow. + * + * A cache entry holds a reference to its 'dp_netdev_flow'. + * + * A miniflow with a given hash can be in one of EM_FLOW_HASH_SEGS different + * entries. The 32-bit hash is split into EM_FLOW_HASH_SEGS values (each of + * them is EM_FLOW_HASH_SHIFT bits wide and the remainder is thrown away). Each + * value is the index of a cache entry where the miniflow could be. * * * Thread-safety * ============= * - * Any access at all requires the owning 'dp_netdev''s queue_rwlock and - * its own mutex. */ -struct dp_netdev_queue { - struct ovs_mutex mutex; - struct seq *seq; /* Incremented whenever a packet is queued. */ - struct dp_netdev_upcall upcalls[MAX_QUEUE_LEN] OVS_GUARDED; - unsigned int head OVS_GUARDED; - unsigned int tail OVS_GUARDED; + * Each pmd_thread has its own private exact match cache. + * If dp_netdev_input is not called from a pmd thread, a mutex is used. + */ + +#define EM_FLOW_HASH_SHIFT 10 +#define EM_FLOW_HASH_ENTRIES (1u << EM_FLOW_HASH_SHIFT) +#define EM_FLOW_HASH_MASK (EM_FLOW_HASH_ENTRIES - 1) +#define EM_FLOW_HASH_SEGS 2 + +struct emc_entry { + struct dp_netdev_flow *flow; + struct netdev_flow_key key; /* key.hash used for emc hash value. */ +}; + +struct emc_cache { + struct emc_entry entries[EM_FLOW_HASH_ENTRIES]; + int sweep_idx; /* For emc_cache_slow_sweep(). */ +}; + +/* Iterate in the exact match cache through every entry that might contain a + * miniflow with hash 'HASH'. */ +#define EMC_FOR_EACH_POS_WITH_HASH(EMC, CURRENT_ENTRY, HASH) \ + for (uint32_t i__ = 0, srch_hash__ = (HASH); \ + (CURRENT_ENTRY) = &(EMC)->entries[srch_hash__ & EM_FLOW_HASH_MASK], \ + i__ < EM_FLOW_HASH_SEGS; \ + i__++, srch_hash__ >>= EM_FLOW_HASH_SHIFT) + +/* Simple non-wildcarding single-priority classifier. */ + +struct dpcls { + struct cmap subtables_map; + struct pvector subtables; +}; + +/* A rule to be inserted to the classifier. */ +struct dpcls_rule { + struct cmap_node cmap_node; /* Within struct dpcls_subtable 'rules'. */ + struct netdev_flow_key *mask; /* Subtable's mask. */ + struct netdev_flow_key flow; /* Matching key. */ + /* 'flow' must be the last field, additional space is allocated here. */ }; +static void dpcls_init(struct dpcls *); +static void dpcls_destroy(struct dpcls *); +static void dpcls_insert(struct dpcls *, struct dpcls_rule *, + const struct netdev_flow_key *mask); +static void dpcls_remove(struct dpcls *, struct dpcls_rule *); +static bool dpcls_lookup(const struct dpcls *cls, + const struct netdev_flow_key keys[], + struct dpcls_rule **rules, size_t cnt); + /* Datapath based on the network device interface from netdev.h. * * @@ -120,81 +176,66 @@ struct dp_netdev_queue { * Acquisition order is, from outermost to innermost: * * dp_netdev_mutex (global) - * port_rwlock - * flow_mutex - * cls.rwlock - * queue_rwlock + * port_mutex */ struct dp_netdev { const struct dpif_class *const class; const char *const name; + struct dpif *dpif; struct ovs_refcount ref_cnt; atomic_flag destroyed; - /* Flows. - * - * Readers of 'cls' and 'flow_table' must take a 'cls->rwlock' read lock. - * - * Writers of 'cls' and 'flow_table' must take the 'flow_mutex' and then - * the 'cls->rwlock' write lock. (The outer 'flow_mutex' allows writers to - * atomically perform multiple operations on 'cls' and 'flow_table'.) - */ - struct ovs_mutex flow_mutex; - struct classifier cls; /* Classifier. Protected by cls.rwlock. */ - struct hmap flow_table OVS_GUARDED; /* Flow table. */ - - /* Queues. - * - * 'queue_rwlock' protects the modification of 'handler_queues' and - * 'n_handlers'. The queue elements are protected by its - * 'handler_queues''s mutex. */ - struct fat_rwlock queue_rwlock; - struct dp_netdev_queue *handler_queues; - uint32_t n_handlers; - - /* Statistics. - * - * ovsthread_stats is internally synchronized. */ - struct ovsthread_stats stats; /* Contains 'struct dp_netdev_stats *'. */ - /* Ports. * - * Any lookup into 'ports' or any access to the dp_netdev_ports found - * through 'ports' requires taking 'port_rwlock'. */ - struct ovs_rwlock port_rwlock; - struct hmap ports OVS_GUARDED; + * Protected by RCU. Take the mutex to add or remove ports. */ + struct ovs_mutex port_mutex; + struct cmap ports; struct seq *port_seq; /* Incremented whenever a port changes. */ - /* Forwarding threads. */ - struct latch exit_latch; - struct pmd_thread *pmd_threads; - size_t n_pmd_threads; - int pmd_count; + /* Protects access to ofproto-dpif-upcall interface during revalidator + * thread synchronization. */ + struct fat_rwlock upcall_rwlock; + upcall_callback *upcall_cb; /* Callback function for executing upcalls. */ + void *upcall_aux; + + /* Stores all 'struct dp_netdev_pmd_thread's. */ + struct cmap poll_threads; + + /* Protects the access of the 'struct dp_netdev_pmd_thread' + * instance for non-pmd thread. */ + struct ovs_mutex non_pmd_mutex; + + /* Each pmd thread will store its pointer to + * 'struct dp_netdev_pmd_thread' in 'per_pmd_key'. */ + ovsthread_key_t per_pmd_key; + + /* Number of rx queues for each dpdk interface and the cpu mask + * for pin of pmd threads. */ + size_t n_dpdk_rxqs; + char *pmd_cmask; + uint64_t last_tnl_conf_seq; }; static struct dp_netdev_port *dp_netdev_lookup_port(const struct dp_netdev *dp, - odp_port_t) - OVS_REQ_RDLOCK(dp->port_rwlock); + odp_port_t); enum dp_stat_type { - DP_STAT_HIT, /* Packets that matched in the flow table. */ + DP_STAT_EXACT_HIT, /* Packets that had an exact match (emc). */ + DP_STAT_MASKED_HIT, /* Packets that matched in the flow table. */ DP_STAT_MISS, /* Packets that did not match. */ DP_STAT_LOST, /* Packets not passed up to the client. */ DP_N_STATS }; -/* Contained by struct dp_netdev's 'stats' member. */ -struct dp_netdev_stats { - struct ovs_mutex mutex; /* Protects 'n'. */ - - /* Indexed by DP_STAT_*, protected by 'mutex'. */ - unsigned long long int n[DP_N_STATS] OVS_GUARDED; +enum pmd_cycles_counter_type { + PMD_CYCLES_POLLING, /* Cycles spent polling NICs. */ + PMD_CYCLES_PROCESSING, /* Cycles spent processing packets */ + PMD_N_CYCLES }; - /* A port in a netdev-based datapath. */ struct dp_netdev_port { - struct hmap_node node; /* Node in dp_netdev's 'ports'. */ + struct cmap_node node; /* Node in dp_netdev's 'ports'. */ odp_port_t port_no; struct netdev *netdev; struct netdev_saved_flags *sf; @@ -203,14 +244,22 @@ struct dp_netdev_port { char *type; /* Port type as requested by user. */ }; -/* A flow in dp_netdev's 'flow_table'. +/* Contained by struct dp_netdev_flow's 'stats' member. */ +struct dp_netdev_flow_stats { + atomic_llong used; /* Last used time, in monotonic msecs. */ + atomic_ullong packet_count; /* Number of packets matched. */ + atomic_ullong byte_count; /* Number of bytes matched. */ + atomic_uint16_t tcp_flags; /* Bitwise-OR of seen tcp_flags values. */ +}; + +/* A flow in 'dp_netdev_pmd_thread's 'flow_table'. * * * Thread-safety * ============= * * Except near the beginning or ending of its lifespan, rule 'rule' belongs to - * its dp_netdev's classifier. The text below calls this classifier 'cls'. + * its pmd thread's classifier. The text below calls this classifier 'cls'. * * Motivation * ---------- @@ -229,76 +278,62 @@ struct dp_netdev_port { * Rules * ----- * - * A flow 'flow' may be accessed without a risk of being freed by code that - * holds a read-lock or write-lock on 'cls->rwlock' or that owns a reference to - * 'flow->ref_cnt' (or both). Code that needs to hold onto a flow for a while - * should take 'cls->rwlock', find the flow it needs, increment 'flow->ref_cnt' - * with dpif_netdev_flow_ref(), and drop 'cls->rwlock'. + * A flow 'flow' may be accessed without a risk of being freed during an RCU + * grace period. Code that needs to hold onto a flow for a while + * should try incrementing 'flow->ref_cnt' with dp_netdev_flow_ref(). * * 'flow->ref_cnt' protects 'flow' from being freed. It doesn't protect the - * flow from being deleted from 'cls' (that's 'cls->rwlock') and it doesn't - * protect members of 'flow' from modification (that's 'flow->mutex'). - * - * 'flow->mutex' protects the members of 'flow' from modification. It doesn't - * protect the flow from being deleted from 'cls' (that's 'cls->rwlock') and it - * doesn't prevent the flow from being freed (that's 'flow->ref_cnt'). + * flow from being deleted from 'cls' and it doesn't protect members of 'flow' + * from modification. * * Some members, marked 'const', are immutable. Accessing other members * requires synchronization, as noted in more detail below. */ struct dp_netdev_flow { - /* Packet classification. */ - const struct cls_rule cr; /* In owning dp_netdev's 'cls'. */ + bool dead; /* Hash table index by unmasked flow. */ - const struct hmap_node node; /* In owning dp_netdev's 'flow_table'. */ - const struct flow flow; /* The flow that created this entry. */ - - /* Protects members marked OVS_GUARDED. - * - * Acquire after datapath's flow_mutex. */ - struct ovs_mutex mutex OVS_ACQ_AFTER(dp_netdev_mutex); + const struct cmap_node node; /* In owning dp_netdev_pmd_thread's */ + /* 'flow_table'. */ + const ovs_u128 ufid; /* Unique flow identifier. */ + const struct flow flow; /* Unmasked flow that created this entry. */ + const int pmd_id; /* The 'core_id' of pmd thread owning this */ + /* flow. */ + + /* Number of references. + * The classifier owns one reference. + * Any thread trying to keep a rule from being freed should hold its own + * reference. */ + struct ovs_refcount ref_cnt; - /* Statistics. - * - * Reading or writing these members requires 'mutex'. */ - struct ovsthread_stats stats; /* Contains "struct dp_netdev_flow_stats". */ + /* Statistics. */ + struct dp_netdev_flow_stats stats; - /* Actions. - * - * Reading 'actions' requires 'mutex'. - * Writing 'actions' requires 'mutex' and (to allow for transactions) the - * datapath's flow_mutex. */ + /* Actions. */ OVSRCU_TYPE(struct dp_netdev_actions *) actions; -}; - -static void dp_netdev_flow_free(struct dp_netdev_flow *); -/* Contained by struct dp_netdev_flow's 'stats' member. */ -struct dp_netdev_flow_stats { - struct ovs_mutex mutex; /* Guards all the other members. */ - - long long int used OVS_GUARDED; /* Last used time, in monotonic msecs. */ - long long int packet_count OVS_GUARDED; /* Number of packets matched. */ - long long int byte_count OVS_GUARDED; /* Number of bytes matched. */ - uint16_t tcp_flags OVS_GUARDED; /* Bitwise-OR of seen tcp_flags values. */ + /* Packet classification. */ + struct dpcls_rule cr; /* In owning dp_netdev's 'cls'. */ + /* 'cr' must be the last member. */ }; +static void dp_netdev_flow_unref(struct dp_netdev_flow *); +static bool dp_netdev_flow_ref(struct dp_netdev_flow *); +static int dpif_netdev_flow_from_nlattrs(const struct nlattr *, uint32_t, + struct flow *); + /* A set of datapath actions within a "struct dp_netdev_flow". * * * Thread-safety * ============= * - * A struct dp_netdev_actions 'actions' may be accessed without a risk of being - * freed by code that holds a read-lock or write-lock on 'flow->mutex' (where - * 'flow' is the dp_netdev_flow for which 'flow->actions == actions') or that - * owns a reference to 'actions->ref_cnt' (or both). */ + * A struct dp_netdev_actions 'actions' is protected with RCU. */ struct dp_netdev_actions { /* These members are immutable: they do not change during the struct's * lifetime. */ - struct nlattr *actions; /* Sequence of OVS_ACTION_ATTR_* attributes. */ unsigned int size; /* Size of 'actions', in bytes. */ + struct nlattr actions[]; /* Sequence of OVS_ACTION_ATTR_* attributes. */ }; struct dp_netdev_actions *dp_netdev_actions_create(const struct nlattr *, @@ -307,24 +342,84 @@ struct dp_netdev_actions *dp_netdev_flow_get_actions( const struct dp_netdev_flow *); static void dp_netdev_actions_free(struct dp_netdev_actions *); +/* Contained by struct dp_netdev_pmd_thread's 'stats' member. */ +struct dp_netdev_pmd_stats { + /* Indexed by DP_STAT_*. */ + atomic_ullong n[DP_N_STATS]; +}; + +/* Contained by struct dp_netdev_pmd_thread's 'cycle' member. */ +struct dp_netdev_pmd_cycles { + /* Indexed by PMD_CYCLES_*. */ + atomic_ullong n[PMD_N_CYCLES]; +}; + /* PMD: Poll modes drivers. PMD accesses devices via polling to eliminate * the performance overhead of interrupt processing. Therefore netdev can * not implement rx-wait for these devices. dpif-netdev needs to poll * these device to check for recv buffer. pmd-thread does polling for - * devices assigned to itself thread. + * devices assigned to itself. * * DPDK used PMD for accessing NIC. * - * A thread that receives packets from PMD ports, looks them up in the flow - * table, and executes the actions it finds. - **/ -struct pmd_thread { + * Note, instance with cpu core id NON_PMD_CORE_ID will be reserved for + * I/O of all non-pmd threads. There will be no actual thread created + * for the instance. + * + * Each struct has its own flow table and classifier. Packets received + * from managed ports are looked up in the corresponding pmd thread's + * flow table, and are executed with the found actions. + * */ +struct dp_netdev_pmd_thread { struct dp_netdev *dp; + struct ovs_refcount ref_cnt; /* Every reference must be refcount'ed. */ + struct cmap_node node; /* In 'dp->poll_threads'. */ + + pthread_cond_t cond; /* For synchronizing pmd thread reload. */ + struct ovs_mutex cond_mutex; /* Mutex for condition variable. */ + + /* Per thread exact-match cache. Note, the instance for cpu core + * NON_PMD_CORE_ID can be accessed by multiple threads, and thusly + * need to be protected (e.g. by 'dp_netdev_mutex'). All other + * instances will only be accessed by its own pmd thread. */ + struct emc_cache flow_cache; + + /* Classifier and Flow-Table. + * + * Writers of 'flow_table' must take the 'flow_mutex'. Corresponding + * changes to 'cls' must be made while still holding the 'flow_mutex'. + */ + struct ovs_mutex flow_mutex; + struct dpcls cls; + struct cmap flow_table OVS_GUARDED; /* Flow table. */ + + /* Statistics. */ + struct dp_netdev_pmd_stats stats; + + /* Cycles counters */ + struct dp_netdev_pmd_cycles cycles; + + /* Used to count cicles. See 'cycles_counter_end()' */ + unsigned long long last_cycles; + + struct latch exit_latch; /* For terminating the pmd thread. */ + atomic_uint change_seq; /* For reloading pmd ports. */ pthread_t thread; - int id; - atomic_uint change_seq; + int index; /* Idx of this pmd thread among pmd*/ + /* threads on same numa node. */ + int core_id; /* CPU core id of this pmd thread. */ + int numa_id; /* numa node id of this pmd thread. */ + + /* Only a pmd thread can write on its own 'cycles' and 'stats'. + * The main thread keeps 'stats_zero' and 'cycles_zero' as base + * values and subtracts them from 'stats' and 'cycles' before + * reporting to the user */ + unsigned long long stats_zero[DP_N_STATS]; + uint64_t cycles_zero[PMD_N_CYCLES]; }; +#define PMD_INITIAL_SEQ 1 + /* Interface to netdev-based datapath. */ struct dpif_netdev { struct dpif dpif; @@ -333,37 +428,88 @@ struct dpif_netdev { }; static int get_port_by_number(struct dp_netdev *dp, odp_port_t port_no, - struct dp_netdev_port **portp) - OVS_REQ_RDLOCK(dp->port_rwlock); + struct dp_netdev_port **portp); static int get_port_by_name(struct dp_netdev *dp, const char *devname, - struct dp_netdev_port **portp) - OVS_REQ_RDLOCK(dp->port_rwlock); + struct dp_netdev_port **portp); static void dp_netdev_free(struct dp_netdev *) OVS_REQUIRES(dp_netdev_mutex); -static void dp_netdev_flow_flush(struct dp_netdev *); static int do_add_port(struct dp_netdev *dp, const char *devname, const char *type, odp_port_t port_no) - OVS_REQ_WRLOCK(dp->port_rwlock); -static int do_del_port(struct dp_netdev *dp, odp_port_t port_no) - OVS_REQ_WRLOCK(dp->port_rwlock); -static void dp_netdev_destroy_all_queues(struct dp_netdev *dp) - OVS_REQ_WRLOCK(dp->queue_rwlock); + OVS_REQUIRES(dp->port_mutex); +static void do_del_port(struct dp_netdev *dp, struct dp_netdev_port *) + OVS_REQUIRES(dp->port_mutex); static int dpif_netdev_open(const struct dpif_class *, const char *name, bool create, struct dpif **); -static int dp_netdev_output_userspace(struct dp_netdev *dp, struct ofpbuf *, - int queue_no, int type, - const struct miniflow *, - const struct nlattr *userdata); -static void dp_netdev_execute_actions(struct dp_netdev *dp, - const struct miniflow *, - struct ofpbuf *, bool may_steal, - struct pkt_metadata *, +static void dp_netdev_execute_actions(struct dp_netdev_pmd_thread *pmd, + struct dp_packet **, int c, + bool may_steal, const struct nlattr *actions, size_t actions_len); -static void dp_netdev_port_input(struct dp_netdev *dp, struct ofpbuf *packet, - struct pkt_metadata *); +static void dp_netdev_input(struct dp_netdev_pmd_thread *, + struct dp_packet **, int cnt); + +static void dp_netdev_disable_upcall(struct dp_netdev *); +void dp_netdev_pmd_reload_done(struct dp_netdev_pmd_thread *pmd); +static void dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, + struct dp_netdev *dp, int index, + int core_id, int numa_id); +static void dp_netdev_destroy_pmd(struct dp_netdev_pmd_thread *pmd); +static void dp_netdev_set_nonpmd(struct dp_netdev *dp); +static struct dp_netdev_pmd_thread *dp_netdev_get_pmd(struct dp_netdev *dp, + int core_id); +static struct dp_netdev_pmd_thread * +dp_netdev_pmd_get_next(struct dp_netdev *dp, struct cmap_position *pos); +static void dp_netdev_destroy_all_pmds(struct dp_netdev *dp); +static void dp_netdev_del_pmds_on_numa(struct dp_netdev *dp, int numa_id); +static void dp_netdev_set_pmds_on_numa(struct dp_netdev *dp, int numa_id); +static void dp_netdev_reset_pmd_threads(struct dp_netdev *dp); +static bool dp_netdev_pmd_try_ref(struct dp_netdev_pmd_thread *pmd); +static void dp_netdev_pmd_unref(struct dp_netdev_pmd_thread *pmd); +static void dp_netdev_pmd_flow_flush(struct dp_netdev_pmd_thread *pmd); + +static inline bool emc_entry_alive(struct emc_entry *ce); +static void emc_clear_entry(struct emc_entry *ce); + +static void +emc_cache_init(struct emc_cache *flow_cache) +{ + int i; + + BUILD_ASSERT(offsetof(struct miniflow, inline_values) == sizeof(uint64_t)); + + flow_cache->sweep_idx = 0; + for (i = 0; i < ARRAY_SIZE(flow_cache->entries); i++) { + flow_cache->entries[i].flow = NULL; + flow_cache->entries[i].key.hash = 0; + flow_cache->entries[i].key.len + = offsetof(struct miniflow, inline_values); + miniflow_initialize(&flow_cache->entries[i].key.mf, + flow_cache->entries[i].key.buf); + } +} + +static void +emc_cache_uninit(struct emc_cache *flow_cache) +{ + int i; + + for (i = 0; i < ARRAY_SIZE(flow_cache->entries); i++) { + emc_clear_entry(&flow_cache->entries[i]); + } +} -static void dp_netdev_set_pmd_threads(struct dp_netdev *, int n); +/* Check and clear dead flow references slowly (one entry at each + * invocation). */ +static void +emc_cache_slow_sweep(struct emc_cache *flow_cache) +{ + struct emc_entry *entry = &flow_cache->entries[flow_cache->sweep_idx]; + + if (!emc_entry_alive(entry)) { + emc_clear_entry(entry); + } + flow_cache->sweep_idx = (flow_cache->sweep_idx + 1) & EM_FLOW_HASH_MASK; +} static struct dpif_netdev * dpif_netdev_cast(const struct dpif *dpif) @@ -377,14 +523,197 @@ get_dp_netdev(const struct dpif *dpif) { return dpif_netdev_cast(dpif)->dp; } + +enum pmd_info_type { + PMD_INFO_SHOW_STATS, /* show how cpu cycles are spent */ + PMD_INFO_CLEAR_STATS /* set the cycles count to 0 */ +}; + +static void +pmd_info_show_stats(struct ds *reply, + struct dp_netdev_pmd_thread *pmd, + unsigned long long stats[DP_N_STATS], + uint64_t cycles[PMD_N_CYCLES]) +{ + unsigned long long total_packets = 0; + uint64_t total_cycles = 0; + int i; + + /* These loops subtracts reference values ('*_zero') from the counters. + * Since loads and stores are relaxed, it might be possible for a '*_zero' + * value to be more recent than the current value we're reading from the + * counter. This is not a big problem, since these numbers are not + * supposed to be too accurate, but we should at least make sure that + * the result is not negative. */ + for (i = 0; i < DP_N_STATS; i++) { + if (stats[i] > pmd->stats_zero[i]) { + stats[i] -= pmd->stats_zero[i]; + } else { + stats[i] = 0; + } + + if (i != DP_STAT_LOST) { + /* Lost packets are already included in DP_STAT_MISS */ + total_packets += stats[i]; + } + } + + for (i = 0; i < PMD_N_CYCLES; i++) { + if (cycles[i] > pmd->cycles_zero[i]) { + cycles[i] -= pmd->cycles_zero[i]; + } else { + cycles[i] = 0; + } + + total_cycles += cycles[i]; + } + + ds_put_cstr(reply, (pmd->core_id == NON_PMD_CORE_ID) + ? "main thread" : "pmd thread"); + + if (pmd->numa_id != OVS_NUMA_UNSPEC) { + ds_put_format(reply, " numa_id %d", pmd->numa_id); + } + if (pmd->core_id != OVS_CORE_UNSPEC) { + ds_put_format(reply, " core_id %d", pmd->core_id); + } + ds_put_cstr(reply, ":\n"); + + ds_put_format(reply, + "\temc hits:%llu\n\tmegaflow hits:%llu\n" + "\tmiss:%llu\n\tlost:%llu\n", + stats[DP_STAT_EXACT_HIT], stats[DP_STAT_MASKED_HIT], + stats[DP_STAT_MISS], stats[DP_STAT_LOST]); + + if (total_cycles == 0) { + return; + } + + ds_put_format(reply, + "\tpolling cycles:%"PRIu64" (%.02f%%)\n" + "\tprocessing cycles:%"PRIu64" (%.02f%%)\n", + cycles[PMD_CYCLES_POLLING], + cycles[PMD_CYCLES_POLLING] / (double)total_cycles * 100, + cycles[PMD_CYCLES_PROCESSING], + cycles[PMD_CYCLES_PROCESSING] / (double)total_cycles * 100); + + if (total_packets == 0) { + return; + } + + ds_put_format(reply, + "\tavg cycles per packet: %.02f (%"PRIu64"/%llu)\n", + total_cycles / (double)total_packets, + total_cycles, total_packets); + + ds_put_format(reply, + "\tavg processing cycles per packet: " + "%.02f (%"PRIu64"/%llu)\n", + cycles[PMD_CYCLES_PROCESSING] / (double)total_packets, + cycles[PMD_CYCLES_PROCESSING], total_packets); +} + +static void +pmd_info_clear_stats(struct ds *reply OVS_UNUSED, + struct dp_netdev_pmd_thread *pmd, + unsigned long long stats[DP_N_STATS], + uint64_t cycles[PMD_N_CYCLES]) +{ + int i; + + /* We cannot write 'stats' and 'cycles' (because they're written by other + * threads) and we shouldn't change 'stats' (because they're used to count + * datapath stats, which must not be cleared here). Instead, we save the + * current values and subtract them from the values to be displayed in the + * future */ + for (i = 0; i < DP_N_STATS; i++) { + pmd->stats_zero[i] = stats[i]; + } + for (i = 0; i < PMD_N_CYCLES; i++) { + pmd->cycles_zero[i] = cycles[i]; + } +} + +static void +dpif_netdev_pmd_info(struct unixctl_conn *conn, int argc, const char *argv[], + void *aux) +{ + struct ds reply = DS_EMPTY_INITIALIZER; + struct dp_netdev_pmd_thread *pmd; + struct dp_netdev *dp = NULL; + enum pmd_info_type type = *(enum pmd_info_type *) aux; + + ovs_mutex_lock(&dp_netdev_mutex); + + if (argc == 2) { + dp = shash_find_data(&dp_netdevs, argv[1]); + } else if (shash_count(&dp_netdevs) == 1) { + /* There's only one datapath */ + dp = shash_first(&dp_netdevs)->data; + } + + if (!dp) { + ovs_mutex_unlock(&dp_netdev_mutex); + unixctl_command_reply_error(conn, + "please specify an existing datapath"); + return; + } + + CMAP_FOR_EACH (pmd, node, &dp->poll_threads) { + unsigned long long stats[DP_N_STATS]; + uint64_t cycles[PMD_N_CYCLES]; + int i; + + /* Read current stats and cycle counters */ + for (i = 0; i < ARRAY_SIZE(stats); i++) { + atomic_read_relaxed(&pmd->stats.n[i], &stats[i]); + } + for (i = 0; i < ARRAY_SIZE(cycles); i++) { + atomic_read_relaxed(&pmd->cycles.n[i], &cycles[i]); + } + + if (type == PMD_INFO_CLEAR_STATS) { + pmd_info_clear_stats(&reply, pmd, stats, cycles); + } else if (type == PMD_INFO_SHOW_STATS) { + pmd_info_show_stats(&reply, pmd, stats, cycles); + } + } + + ovs_mutex_unlock(&dp_netdev_mutex); + + unixctl_command_reply(conn, ds_cstr(&reply)); + ds_destroy(&reply); +} + +static int +dpif_netdev_init(void) +{ + static enum pmd_info_type show_aux = PMD_INFO_SHOW_STATS, + clear_aux = PMD_INFO_CLEAR_STATS; + + unixctl_command_register("dpif-netdev/pmd-stats-show", "[dp]", + 0, 1, dpif_netdev_pmd_info, + (void *)&show_aux); + unixctl_command_register("dpif-netdev/pmd-stats-clear", "[dp]", + 0, 1, dpif_netdev_pmd_info, + (void *)&clear_aux); + return 0; +} static int -dpif_netdev_enumerate(struct sset *all_dps) +dpif_netdev_enumerate(struct sset *all_dps, + const struct dpif_class *dpif_class) { struct shash_node *node; ovs_mutex_lock(&dp_netdev_mutex); SHASH_FOR_EACH(node, &dp_netdevs) { + struct dp_netdev *dp = node->data; + if (dpif_class != dp->class) { + /* 'dp_netdevs' contains both "netdev" and "dummy" dpifs. + * If the class doesn't match, skip this dpif. */ + continue; + } sset_add(all_dps, node->name); } ovs_mutex_unlock(&dp_netdev_mutex); @@ -426,7 +755,7 @@ create_dpif_netdev(struct dp_netdev *dp) * Return ODPP_NONE on failure. */ static odp_port_t choose_port(struct dp_netdev *dp, const char *name) - OVS_REQ_RDLOCK(dp->port_rwlock) + OVS_REQUIRES(dp->port_mutex) { uint32_t port_no; @@ -480,27 +809,34 @@ create_dp_netdev(const char *name, const struct dpif_class *class, ovs_refcount_init(&dp->ref_cnt); atomic_flag_clear(&dp->destroyed); - ovs_mutex_init(&dp->flow_mutex); - classifier_init(&dp->cls, NULL); - hmap_init(&dp->flow_table); + ovs_mutex_init(&dp->port_mutex); + cmap_init(&dp->ports); + dp->port_seq = seq_create(); + fat_rwlock_init(&dp->upcall_rwlock); - fat_rwlock_init(&dp->queue_rwlock); + /* Disable upcalls by default. */ + dp_netdev_disable_upcall(dp); + dp->upcall_aux = NULL; + dp->upcall_cb = NULL; - ovsthread_stats_init(&dp->stats); + cmap_init(&dp->poll_threads); + ovs_mutex_init_recursive(&dp->non_pmd_mutex); + ovsthread_key_create(&dp->per_pmd_key, NULL); - ovs_rwlock_init(&dp->port_rwlock); - hmap_init(&dp->ports); - dp->port_seq = seq_create(); - latch_init(&dp->exit_latch); + /* Reserves the core NON_PMD_CORE_ID for all non-pmd threads. */ + ovs_numa_try_pin_core_specific(NON_PMD_CORE_ID); + dp_netdev_set_nonpmd(dp); + dp->n_dpdk_rxqs = NR_QUEUE; - ovs_rwlock_wrlock(&dp->port_rwlock); + ovs_mutex_lock(&dp->port_mutex); error = do_add_port(dp, name, "internal", ODPP_LOCAL); - ovs_rwlock_unlock(&dp->port_rwlock); + ovs_mutex_unlock(&dp->port_mutex); if (error) { dp_netdev_free(dp); return error; } + dp->last_tnl_conf_seq = seq_read(tnl_conf_seq); *dpp = dp; return 0; } @@ -523,6 +859,7 @@ dpif_netdev_open(const struct dpif_class *class, const char *name, } if (!error) { *dpifp = create_dpif_netdev(dp); + dp->dpif = *dpifp; } ovs_mutex_unlock(&dp_netdev_mutex); @@ -530,22 +867,15 @@ dpif_netdev_open(const struct dpif_class *class, const char *name, } static void -dp_netdev_purge_queues(struct dp_netdev *dp) - OVS_REQ_WRLOCK(dp->queue_rwlock) +dp_netdev_destroy_upcall_lock(struct dp_netdev *dp) + OVS_NO_THREAD_SAFETY_ANALYSIS { - int i; + /* Check that upcalls are disabled, i.e. that the rwlock is taken */ + ovs_assert(fat_rwlock_tryrdlock(&dp->upcall_rwlock)); - for (i = 0; i < dp->n_handlers; i++) { - struct dp_netdev_queue *q = &dp->handler_queues[i]; - - ovs_mutex_lock(&q->mutex); - while (q->tail != q->head) { - struct dp_netdev_upcall *u = &q->upcalls[q->tail++ & QUEUE_MASK]; - ofpbuf_uninit(&u->upcall.packet); - ofpbuf_uninit(&u->buf); - } - ovs_mutex_unlock(&q->mutex); - } + /* Before freeing a lock we should release it */ + fat_rwlock_unlock(&dp->upcall_rwlock); + fat_rwlock_destroy(&dp->upcall_rwlock); } /* Requires dp_netdev_mutex so that we can't get a new reference to 'dp' @@ -554,40 +884,28 @@ static void dp_netdev_free(struct dp_netdev *dp) OVS_REQUIRES(dp_netdev_mutex) { - struct dp_netdev_port *port, *next; - struct dp_netdev_stats *bucket; - int i; + struct dp_netdev_port *port; shash_find_and_delete(&dp_netdevs, dp->name); - dp_netdev_set_pmd_threads(dp, 0); - free(dp->pmd_threads); - - dp_netdev_flow_flush(dp); - ovs_rwlock_wrlock(&dp->port_rwlock); - HMAP_FOR_EACH_SAFE (port, next, node, &dp->ports) { - do_del_port(dp, port->port_no); - } - ovs_rwlock_unlock(&dp->port_rwlock); + dp_netdev_destroy_all_pmds(dp); + cmap_destroy(&dp->poll_threads); + ovs_mutex_destroy(&dp->non_pmd_mutex); + ovsthread_key_delete(dp->per_pmd_key); - OVSTHREAD_STATS_FOR_EACH_BUCKET (bucket, i, &dp->stats) { - ovs_mutex_destroy(&bucket->mutex); - free_cacheline(bucket); + ovs_mutex_lock(&dp->port_mutex); + CMAP_FOR_EACH (port, node, &dp->ports) { + do_del_port(dp, port); } - ovsthread_stats_destroy(&dp->stats); + ovs_mutex_unlock(&dp->port_mutex); - fat_rwlock_wrlock(&dp->queue_rwlock); - dp_netdev_destroy_all_queues(dp); - fat_rwlock_unlock(&dp->queue_rwlock); + seq_destroy(dp->port_seq); + cmap_destroy(&dp->ports); - fat_rwlock_destroy(&dp->queue_rwlock); + /* Upcalls must be disabled at this point */ + dp_netdev_destroy_upcall_lock(dp); - classifier_destroy(&dp->cls); - hmap_destroy(&dp->flow_table); - ovs_mutex_destroy(&dp->flow_mutex); - seq_destroy(dp->port_seq); - hmap_destroy(&dp->ports); - latch_destroy(&dp->exit_latch); + free(dp->pmd_cmask); free(CONST_CAST(char *, dp->name)); free(dp); } @@ -599,7 +917,7 @@ dp_netdev_unref(struct dp_netdev *dp) /* Take dp_netdev_mutex so that, if dp->ref_cnt falls to zero, we can't * get a new reference to 'dp' through the 'dp_netdevs' shash. */ ovs_mutex_lock(&dp_netdev_mutex); - if (ovs_refcount_unref(&dp->ref_cnt) == 1) { + if (ovs_refcount_unref_relaxed(&dp->ref_cnt) == 1) { dp_netdev_free(dp); } ovs_mutex_unlock(&dp_netdev_mutex); @@ -621,7 +939,7 @@ dpif_netdev_destroy(struct dpif *dpif) struct dp_netdev *dp = get_dp_netdev(dpif); if (!atomic_flag_test_and_set(&dp->destroyed)) { - if (ovs_refcount_unref(&dp->ref_cnt) == 1) { + if (ovs_refcount_unref_relaxed(&dp->ref_cnt) == 1) { /* Can't happen: 'dpif' still owns a reference to 'dp'. */ OVS_NOT_REACHED(); } @@ -630,24 +948,40 @@ dpif_netdev_destroy(struct dpif *dpif) return 0; } +/* Add 'n' to the atomic variable 'var' non-atomically and using relaxed + * load/store semantics. While the increment is not atomic, the load and + * store operations are, making it impossible to read inconsistent values. + * + * This is used to update thread local stats counters. */ +static void +non_atomic_ullong_add(atomic_ullong *var, unsigned long long n) +{ + unsigned long long tmp; + + atomic_read_relaxed(var, &tmp); + tmp += n; + atomic_store_relaxed(var, tmp); +} + static int dpif_netdev_get_stats(const struct dpif *dpif, struct dpif_dp_stats *stats) { struct dp_netdev *dp = get_dp_netdev(dpif); - struct dp_netdev_stats *bucket; - size_t i; - - fat_rwlock_rdlock(&dp->cls.rwlock); - stats->n_flows = hmap_count(&dp->flow_table); - fat_rwlock_unlock(&dp->cls.rwlock); - - stats->n_hit = stats->n_missed = stats->n_lost = 0; - OVSTHREAD_STATS_FOR_EACH_BUCKET (bucket, i, &dp->stats) { - ovs_mutex_lock(&bucket->mutex); - stats->n_hit += bucket->n[DP_STAT_HIT]; - stats->n_missed += bucket->n[DP_STAT_MISS]; - stats->n_lost += bucket->n[DP_STAT_LOST]; - ovs_mutex_unlock(&bucket->mutex); + struct dp_netdev_pmd_thread *pmd; + + stats->n_flows = stats->n_hit = stats->n_missed = stats->n_lost = 0; + CMAP_FOR_EACH (pmd, node, &dp->poll_threads) { + unsigned long long n; + stats->n_flows += cmap_count(&pmd->flow_table); + + atomic_read_relaxed(&pmd->stats.n[DP_STAT_MASKED_HIT], &n); + stats->n_hit += n; + atomic_read_relaxed(&pmd->stats.n[DP_STAT_EXACT_HIT], &n); + stats->n_hit += n; + atomic_read_relaxed(&pmd->stats.n[DP_STAT_MISS], &n); + stats->n_missed += n; + atomic_read_relaxed(&pmd->stats.n[DP_STAT_LOST], &n); + stats->n_lost += n; } stats->n_masks = UINT32_MAX; stats->n_mask_hit = UINT64_MAX; @@ -656,22 +990,42 @@ dpif_netdev_get_stats(const struct dpif *dpif, struct dpif_dp_stats *stats) } static void -dp_netdev_reload_pmd_threads(struct dp_netdev *dp) +dp_netdev_reload_pmd__(struct dp_netdev_pmd_thread *pmd) { - int i; + int old_seq; + + if (pmd->core_id == NON_PMD_CORE_ID) { + return; + } + + ovs_mutex_lock(&pmd->cond_mutex); + atomic_add_relaxed(&pmd->change_seq, 1, &old_seq); + ovs_mutex_cond_wait(&pmd->cond, &pmd->cond_mutex); + ovs_mutex_unlock(&pmd->cond_mutex); +} - for (i = 0; i < dp->n_pmd_threads; i++) { - struct pmd_thread *f = &dp->pmd_threads[i]; - int id; +/* Causes all pmd threads to reload its tx/rx devices. + * Must be called after adding/removing ports. */ +static void +dp_netdev_reload_pmds(struct dp_netdev *dp) +{ + struct dp_netdev_pmd_thread *pmd; + + CMAP_FOR_EACH (pmd, node, &dp->poll_threads) { + dp_netdev_reload_pmd__(pmd); + } +} - atomic_add(&f->change_seq, 1, &id); - } +static uint32_t +hash_port_no(odp_port_t port_no) +{ + return hash_int(odp_to_u32(port_no), 0); } static int do_add_port(struct dp_netdev *dp, const char *devname, const char *type, odp_port_t port_no) - OVS_REQ_WRLOCK(dp->port_rwlock) + OVS_REQUIRES(dp->port_mutex) { struct netdev_saved_flags *sf; struct dp_netdev_port *port; @@ -681,7 +1035,10 @@ do_add_port(struct dp_netdev *dp, const char *devname, const char *type, int error; int i; - /* XXX reject devices already in some dp_netdev. */ + /* Reject devices already in 'dp'. */ + if (!get_port_by_name(dp, devname, &port)) { + return EEXIST; + } /* Open and validate network device. */ open_type = dpif_netdev_port_open_type(dp->class, type); @@ -698,6 +1055,21 @@ do_add_port(struct dp_netdev *dp, const char *devname, const char *type, return EINVAL; } + if (netdev_is_pmd(netdev)) { + int n_cores = ovs_numa_get_n_cores(); + + if (n_cores == OVS_CORE_UNSPEC) { + VLOG_ERR("%s, cannot get cpu core info", devname); + return ENOENT; + } + /* There can only be ovs_numa_get_n_cores() pmd threads, + * so creates a txq for each. */ + error = netdev_set_multiq(netdev, n_cores, dp->n_dpdk_rxqs); + if (error && (error != EOPNOTSUPP)) { + VLOG_ERR("%s, cannot set multiq", devname); + return errno; + } + } port = xzalloc(sizeof *port); port->port_no = port_no; port->netdev = netdev; @@ -710,6 +1082,9 @@ do_add_port(struct dp_netdev *dp, const char *devname, const char *type, VLOG_ERR("%s: cannot receive packets on this network device (%s)", devname, ovs_strerror(errno)); netdev_close(netdev); + free(port->type); + free(port->rxq); + free(port); return error; } } @@ -720,20 +1095,20 @@ do_add_port(struct dp_netdev *dp, const char *devname, const char *type, netdev_rxq_close(port->rxq[i]); } netdev_close(netdev); + free(port->type); free(port->rxq); free(port); return error; } port->sf = sf; - if (netdev_is_pmd(netdev)) { - dp->pmd_count++; - dp_netdev_set_pmd_threads(dp, NR_THREADS); - dp_netdev_reload_pmd_threads(dp); - } ovs_refcount_init(&port->ref_cnt); + cmap_insert(&dp->ports, &port->node, hash_port_no(port_no)); - hmap_insert(&dp->ports, &port->node, hash_int(odp_to_u32(port_no), 0)); + if (netdev_is_pmd(netdev)) { + dp_netdev_set_pmds_on_numa(dp, netdev_get_numa_id(netdev)); + dp_netdev_reload_pmds(dp); + } seq_change(dp->port_seq); return 0; @@ -749,7 +1124,7 @@ dpif_netdev_port_add(struct dpif *dpif, struct netdev *netdev, odp_port_t port_no; int error; - ovs_rwlock_wrlock(&dp->port_rwlock); + ovs_mutex_lock(&dp->port_mutex); dpif_port = netdev_vport_get_dpif_port(netdev, namebuf, sizeof namebuf); if (*port_nop != ODPP_NONE) { port_no = *port_nop; @@ -762,7 +1137,7 @@ dpif_netdev_port_add(struct dpif *dpif, struct netdev *netdev, *port_nop = port_no; error = do_add_port(dp, dpif_port, netdev_get_type(netdev), port_no); } - ovs_rwlock_unlock(&dp->port_rwlock); + ovs_mutex_unlock(&dp->port_mutex); return error; } @@ -773,9 +1148,18 @@ dpif_netdev_port_del(struct dpif *dpif, odp_port_t port_no) struct dp_netdev *dp = get_dp_netdev(dpif); int error; - ovs_rwlock_wrlock(&dp->port_rwlock); - error = port_no == ODPP_LOCAL ? EINVAL : do_del_port(dp, port_no); - ovs_rwlock_unlock(&dp->port_rwlock); + ovs_mutex_lock(&dp->port_mutex); + if (port_no == ODPP_LOCAL) { + error = EINVAL; + } else { + struct dp_netdev_port *port; + + error = get_port_by_number(dp, port_no, &port); + if (!error) { + do_del_port(dp, port); + } + } + ovs_mutex_unlock(&dp->port_mutex); return error; } @@ -788,12 +1172,10 @@ is_valid_port_number(odp_port_t port_no) static struct dp_netdev_port * dp_netdev_lookup_port(const struct dp_netdev *dp, odp_port_t port_no) - OVS_REQ_RDLOCK(dp->port_rwlock) { struct dp_netdev_port *port; - HMAP_FOR_EACH_IN_BUCKET (port, node, hash_int(odp_to_u32(port_no), 0), - &dp->ports) { + CMAP_FOR_EACH_WITH_HASH (port, node, hash_port_no(port_no), &dp->ports) { if (port->port_no == port_no) { return port; } @@ -804,7 +1186,6 @@ dp_netdev_lookup_port(const struct dp_netdev *dp, odp_port_t port_no) static int get_port_by_number(struct dp_netdev *dp, odp_port_t port_no, struct dp_netdev_port **portp) - OVS_REQ_RDLOCK(dp->port_rwlock) { if (!is_valid_port_number(port_no)) { *portp = NULL; @@ -823,18 +1204,30 @@ port_ref(struct dp_netdev_port *port) } } +static bool +port_try_ref(struct dp_netdev_port *port) +{ + if (port) { + return ovs_refcount_try_ref_rcu(&port->ref_cnt); + } + + return false; +} + static void port_unref(struct dp_netdev_port *port) { - if (port && ovs_refcount_unref(&port->ref_cnt) == 1) { + if (port && ovs_refcount_unref_relaxed(&port->ref_cnt) == 1) { + int n_rxq = netdev_n_rxq(port->netdev); int i; netdev_close(port->netdev); netdev_restore_flags(port->sf); - for (i = 0; i < netdev_n_rxq(port->netdev); i++) { + for (i = 0; i < n_rxq; i++) { netdev_rxq_close(port->rxq[i]); } + free(port->rxq); free(port->type); free(port); } @@ -843,11 +1236,11 @@ port_unref(struct dp_netdev_port *port) static int get_port_by_name(struct dp_netdev *dp, const char *devname, struct dp_netdev_port **portp) - OVS_REQ_RDLOCK(dp->port_rwlock) + OVS_REQUIRES(dp->port_mutex) { struct dp_netdev_port *port; - HMAP_FOR_EACH (port, node, &dp->ports) { + CMAP_FOR_EACH (port, node, &dp->ports) { if (!strcmp(netdev_get_name(port->netdev), devname)) { *portp = port; return 0; @@ -857,25 +1250,56 @@ get_port_by_name(struct dp_netdev *dp, } static int -do_del_port(struct dp_netdev *dp, odp_port_t port_no) - OVS_REQ_WRLOCK(dp->port_rwlock) +get_n_pmd_threads_on_numa(struct dp_netdev *dp, int numa_id) +{ + struct dp_netdev_pmd_thread *pmd; + int n_pmds = 0; + + CMAP_FOR_EACH (pmd, node, &dp->poll_threads) { + if (pmd->numa_id == numa_id) { + n_pmds++; + } + } + + return n_pmds; +} + +/* Returns 'true' if there is a port with pmd netdev and the netdev + * is on numa node 'numa_id'. */ +static bool +has_pmd_port_for_numa(struct dp_netdev *dp, int numa_id) { struct dp_netdev_port *port; - int error; - error = get_port_by_number(dp, port_no, &port); - if (error) { - return error; + CMAP_FOR_EACH (port, node, &dp->ports) { + if (netdev_is_pmd(port->netdev) + && netdev_get_numa_id(port->netdev) == numa_id) { + return true; + } } - hmap_remove(&dp->ports, &port->node); + return false; +} + + +static void +do_del_port(struct dp_netdev *dp, struct dp_netdev_port *port) + OVS_REQUIRES(dp->port_mutex) +{ + cmap_remove(&dp->ports, &port->node, hash_odp_port(port->port_no)); seq_change(dp->port_seq); if (netdev_is_pmd(port->netdev)) { - dp_netdev_reload_pmd_threads(dp); + int numa_id = netdev_get_numa_id(port->netdev); + + /* If there is no netdev on the numa node, deletes the pmd threads + * for that numa. Else, just reloads the queues. */ + if (!has_pmd_port_for_numa(dp, numa_id)) { + dp_netdev_del_pmds_on_numa(dp, numa_id); + } + dp_netdev_reload_pmds(dp); } port_unref(port); - return 0; } static void @@ -895,12 +1319,10 @@ dpif_netdev_port_query_by_number(const struct dpif *dpif, odp_port_t port_no, struct dp_netdev_port *port; int error; - ovs_rwlock_rdlock(&dp->port_rwlock); error = get_port_by_number(dp, port_no, &port); if (!error && dpif_port) { answer_port_query(port, dpif_port); } - ovs_rwlock_unlock(&dp->port_rwlock); return error; } @@ -913,12 +1335,12 @@ dpif_netdev_port_query_by_name(const struct dpif *dpif, const char *devname, struct dp_netdev_port *port; int error; - ovs_rwlock_rdlock(&dp->port_rwlock); + ovs_mutex_lock(&dp->port_mutex); error = get_port_by_name(dp, devname, &port); if (!error && dpif_port) { answer_port_query(port, dpif_port); } - ovs_rwlock_unlock(&dp->port_rwlock); + ovs_mutex_unlock(&dp->port_mutex); return error; } @@ -926,60 +1348,64 @@ dpif_netdev_port_query_by_name(const struct dpif *dpif, const char *devname, static void dp_netdev_flow_free(struct dp_netdev_flow *flow) { - struct dp_netdev_flow_stats *bucket; - size_t i; + dp_netdev_actions_free(dp_netdev_flow_get_actions(flow)); + free(flow); +} - OVSTHREAD_STATS_FOR_EACH_BUCKET (bucket, i, &flow->stats) { - ovs_mutex_destroy(&bucket->mutex); - free_cacheline(bucket); +static void dp_netdev_flow_unref(struct dp_netdev_flow *flow) +{ + if (ovs_refcount_unref_relaxed(&flow->ref_cnt) == 1) { + ovsrcu_postpone(dp_netdev_flow_free, flow); } - ovsthread_stats_destroy(&flow->stats); +} - cls_rule_destroy(CONST_CAST(struct cls_rule *, &flow->cr)); - dp_netdev_actions_free(dp_netdev_flow_get_actions(flow)); - ovs_mutex_destroy(&flow->mutex); - free(flow); +static uint32_t +dp_netdev_flow_hash(const ovs_u128 *ufid) +{ + return ufid->u32[0]; } static void -dp_netdev_remove_flow(struct dp_netdev *dp, struct dp_netdev_flow *flow) - OVS_REQ_WRLOCK(dp->cls.rwlock) - OVS_REQUIRES(dp->flow_mutex) +dp_netdev_pmd_remove_flow(struct dp_netdev_pmd_thread *pmd, + struct dp_netdev_flow *flow) + OVS_REQUIRES(pmd->flow_mutex) { - struct cls_rule *cr = CONST_CAST(struct cls_rule *, &flow->cr); - struct hmap_node *node = CONST_CAST(struct hmap_node *, &flow->node); + struct cmap_node *node = CONST_CAST(struct cmap_node *, &flow->node); - classifier_remove(&dp->cls, cr); - hmap_remove(&dp->flow_table, node); - ovsrcu_postpone(dp_netdev_flow_free, flow); + dpcls_remove(&pmd->cls, &flow->cr); + cmap_remove(&pmd->flow_table, node, dp_netdev_flow_hash(&flow->ufid)); + flow->dead = true; + + dp_netdev_flow_unref(flow); } static void -dp_netdev_flow_flush(struct dp_netdev *dp) +dp_netdev_pmd_flow_flush(struct dp_netdev_pmd_thread *pmd) { - struct dp_netdev_flow *netdev_flow, *next; + struct dp_netdev_flow *netdev_flow; - ovs_mutex_lock(&dp->flow_mutex); - fat_rwlock_wrlock(&dp->cls.rwlock); - HMAP_FOR_EACH_SAFE (netdev_flow, next, node, &dp->flow_table) { - dp_netdev_remove_flow(dp, netdev_flow); + ovs_mutex_lock(&pmd->flow_mutex); + CMAP_FOR_EACH (netdev_flow, node, &pmd->flow_table) { + dp_netdev_pmd_remove_flow(pmd, netdev_flow); } - fat_rwlock_unlock(&dp->cls.rwlock); - ovs_mutex_unlock(&dp->flow_mutex); + ovs_mutex_unlock(&pmd->flow_mutex); } static int dpif_netdev_flow_flush(struct dpif *dpif) { struct dp_netdev *dp = get_dp_netdev(dpif); + struct dp_netdev_pmd_thread *pmd; + + CMAP_FOR_EACH (pmd, node, &dp->poll_threads) { + dp_netdev_pmd_flow_flush(pmd); + } - dp_netdev_flow_flush(dp); return 0; } struct dp_netdev_port_state { - uint32_t bucket; - uint32_t offset; + struct cmap_position position; char *name; }; @@ -996,11 +1422,10 @@ dpif_netdev_port_dump_next(const struct dpif *dpif, void *state_, { struct dp_netdev_port_state *state = state_; struct dp_netdev *dp = get_dp_netdev(dpif); - struct hmap_node *node; + struct cmap_node *node; int retval; - ovs_rwlock_rdlock(&dp->port_rwlock); - node = hmap_at_position(&dp->ports, &state->bucket, &state->offset); + node = cmap_next_position(&dp->ports, &state->position); if (node) { struct dp_netdev_port *port; @@ -1016,7 +1441,6 @@ dpif_netdev_port_dump_next(const struct dpif *dpif, void *state_, } else { retval = EOF; } - ovs_rwlock_unlock(&dp->port_rwlock); return retval; } @@ -1057,36 +1481,290 @@ dpif_netdev_port_poll_wait(const struct dpif *dpif_) } static struct dp_netdev_flow * -dp_netdev_flow_cast(const struct cls_rule *cr) +dp_netdev_flow_cast(const struct dpcls_rule *cr) { return cr ? CONTAINER_OF(cr, struct dp_netdev_flow, cr) : NULL; } -static struct dp_netdev_flow * -dp_netdev_lookup_flow(const struct dp_netdev *dp, const struct miniflow *key) - OVS_EXCLUDED(dp->cls.rwlock) +static bool dp_netdev_flow_ref(struct dp_netdev_flow *flow) { - struct dp_netdev_flow *netdev_flow; - struct cls_rule *rule; + return ovs_refcount_try_ref_rcu(&flow->ref_cnt); +} - fat_rwlock_rdlock(&dp->cls.rwlock); - rule = classifier_lookup_miniflow_first(&dp->cls, key); - netdev_flow = dp_netdev_flow_cast(rule); - fat_rwlock_unlock(&dp->cls.rwlock); +/* netdev_flow_key utilities. + * + * netdev_flow_key is basically a miniflow. We use these functions + * (netdev_flow_key_clone, netdev_flow_key_equal, ...) instead of the miniflow + * functions (miniflow_clone_inline, miniflow_equal, ...), because: + * + * - Since we are dealing exclusively with miniflows created by + * miniflow_extract(), if the map is different the miniflow is different. + * Therefore we can be faster by comparing the map and the miniflow in a + * single memcmp(). + * _ netdev_flow_key's miniflow has always inline values. + * - These functions can be inlined by the compiler. + * + * The following assertions make sure that what we're doing with miniflow is + * safe + */ +BUILD_ASSERT_DECL(offsetof(struct miniflow, inline_values) + == sizeof(uint64_t)); + +/* Given the number of bits set in the miniflow map, returns the size of the + * 'netdev_flow_key.mf' */ +static inline uint32_t +netdev_flow_key_size(uint32_t flow_u32s) +{ + return offsetof(struct miniflow, inline_values) + + MINIFLOW_VALUES_SIZE(flow_u32s); +} + +static inline bool +netdev_flow_key_equal(const struct netdev_flow_key *a, + const struct netdev_flow_key *b) +{ + /* 'b->len' may be not set yet. */ + return a->hash == b->hash && !memcmp(&a->mf, &b->mf, a->len); +} + +/* Used to compare 'netdev_flow_key' in the exact match cache to a miniflow. + * The maps are compared bitwise, so both 'key->mf' 'mf' must have been + * generated by miniflow_extract. */ +static inline bool +netdev_flow_key_equal_mf(const struct netdev_flow_key *key, + const struct miniflow *mf) +{ + return !memcmp(&key->mf, mf, key->len); +} + +static inline void +netdev_flow_key_clone(struct netdev_flow_key *dst, + const struct netdev_flow_key *src) +{ + memcpy(dst, src, + offsetof(struct netdev_flow_key, mf) + src->len); +} + +/* Slow. */ +static void +netdev_flow_key_from_flow(struct netdev_flow_key *dst, + const struct flow *src) +{ + struct dp_packet packet; + uint64_t buf_stub[512 / 8]; + + miniflow_initialize(&dst->mf, dst->buf); + + dp_packet_use_stub(&packet, buf_stub, sizeof buf_stub); + pkt_metadata_from_flow(&packet.md, src); + flow_compose(&packet, src); + miniflow_extract(&packet, &dst->mf); + dp_packet_uninit(&packet); + + dst->len = netdev_flow_key_size(count_1bits(dst->mf.map)); + dst->hash = 0; /* Not computed yet. */ +} + +/* Initialize a netdev_flow_key 'mask' from 'match'. */ +static inline void +netdev_flow_mask_init(struct netdev_flow_key *mask, + const struct match *match) +{ + const uint64_t *mask_u64 = (const uint64_t *) &match->wc.masks; + uint64_t *dst = mask->mf.inline_values; + uint64_t map, mask_map = 0; + uint32_t hash = 0; + int n; + + /* Only check masks that make sense for the flow. */ + map = flow_wc_map(&match->flow); + + while (map) { + uint64_t rm1bit = rightmost_1bit(map); + int i = raw_ctz(map); + + if (mask_u64[i]) { + mask_map |= rm1bit; + *dst++ = mask_u64[i]; + hash = hash_add64(hash, mask_u64[i]); + } + map -= rm1bit; + } + + mask->mf.values_inline = true; + mask->mf.map = mask_map; + + hash = hash_add64(hash, mask_map); + + n = dst - mask->mf.inline_values; + + mask->hash = hash_finish(hash, n * 8); + mask->len = netdev_flow_key_size(n); +} + +/* Initializes 'dst' as a copy of 'src' masked with 'mask'. */ +static inline void +netdev_flow_key_init_masked(struct netdev_flow_key *dst, + const struct flow *flow, + const struct netdev_flow_key *mask) +{ + uint64_t *dst_u64 = dst->mf.inline_values; + const uint64_t *mask_u64 = mask->mf.inline_values; + uint32_t hash = 0; + uint64_t value; + + dst->len = mask->len; + dst->mf.values_inline = true; + dst->mf.map = mask->mf.map; + + FLOW_FOR_EACH_IN_MAP(value, flow, mask->mf.map) { + *dst_u64 = value & *mask_u64++; + hash = hash_add64(hash, *dst_u64++); + } + dst->hash = hash_finish(hash, (dst_u64 - dst->mf.inline_values) * 8); +} + +/* Iterate through all netdev_flow_key u64 values specified by 'MAP' */ +#define NETDEV_FLOW_KEY_FOR_EACH_IN_MAP(VALUE, KEY, MAP) \ + for (struct mf_for_each_in_map_aux aux__ \ + = { (KEY)->mf.inline_values, (KEY)->mf.map, MAP }; \ + mf_get_next_in_map(&aux__, &(VALUE)); \ + ) + +/* Returns a hash value for the bits of 'key' where there are 1-bits in + * 'mask'. */ +static inline uint32_t +netdev_flow_key_hash_in_mask(const struct netdev_flow_key *key, + const struct netdev_flow_key *mask) +{ + const uint64_t *p = mask->mf.inline_values; + uint32_t hash = 0; + uint64_t key_u64; + + NETDEV_FLOW_KEY_FOR_EACH_IN_MAP(key_u64, key, mask->mf.map) { + hash = hash_add64(hash, key_u64 & *p++); + } + + return hash_finish(hash, (p - mask->mf.inline_values) * 8); +} + +static inline bool +emc_entry_alive(struct emc_entry *ce) +{ + return ce->flow && !ce->flow->dead; +} + +static void +emc_clear_entry(struct emc_entry *ce) +{ + if (ce->flow) { + dp_netdev_flow_unref(ce->flow); + ce->flow = NULL; + } +} + +static inline void +emc_change_entry(struct emc_entry *ce, struct dp_netdev_flow *flow, + const struct netdev_flow_key *key) +{ + if (ce->flow != flow) { + if (ce->flow) { + dp_netdev_flow_unref(ce->flow); + } + + if (dp_netdev_flow_ref(flow)) { + ce->flow = flow; + } else { + ce->flow = NULL; + } + } + if (key) { + netdev_flow_key_clone(&ce->key, key); + } +} + +static inline void +emc_insert(struct emc_cache *cache, const struct netdev_flow_key *key, + struct dp_netdev_flow *flow) +{ + struct emc_entry *to_be_replaced = NULL; + struct emc_entry *current_entry; + + EMC_FOR_EACH_POS_WITH_HASH(cache, current_entry, key->hash) { + if (netdev_flow_key_equal(¤t_entry->key, key)) { + /* We found the entry with the 'mf' miniflow */ + emc_change_entry(current_entry, flow, NULL); + return; + } + + /* Replacement policy: put the flow in an empty (not alive) entry, or + * in the first entry where it can be */ + if (!to_be_replaced + || (emc_entry_alive(to_be_replaced) + && !emc_entry_alive(current_entry)) + || current_entry->key.hash < to_be_replaced->key.hash) { + to_be_replaced = current_entry; + } + } + /* We didn't find the miniflow in the cache. + * The 'to_be_replaced' entry is where the new flow will be stored */ + + emc_change_entry(to_be_replaced, flow, key); +} + +static inline struct dp_netdev_flow * +emc_lookup(struct emc_cache *cache, const struct netdev_flow_key *key) +{ + struct emc_entry *current_entry; + + EMC_FOR_EACH_POS_WITH_HASH(cache, current_entry, key->hash) { + if (current_entry->key.hash == key->hash + && emc_entry_alive(current_entry) + && netdev_flow_key_equal_mf(¤t_entry->key, &key->mf)) { + + /* We found the entry with the 'key->mf' miniflow */ + return current_entry->flow; + } + } + + return NULL; +} + +static struct dp_netdev_flow * +dp_netdev_pmd_lookup_flow(const struct dp_netdev_pmd_thread *pmd, + const struct netdev_flow_key *key) +{ + struct dp_netdev_flow *netdev_flow; + struct dpcls_rule *rule; + + dpcls_lookup(&pmd->cls, key, &rule, 1); + netdev_flow = dp_netdev_flow_cast(rule); return netdev_flow; } static struct dp_netdev_flow * -dp_netdev_find_flow(const struct dp_netdev *dp, const struct flow *flow) - OVS_REQ_RDLOCK(dp->cls.rwlock) +dp_netdev_pmd_find_flow(const struct dp_netdev_pmd_thread *pmd, + const ovs_u128 *ufidp, const struct nlattr *key, + size_t key_len) { struct dp_netdev_flow *netdev_flow; + struct flow flow; + ovs_u128 ufid; + + /* If a UFID is not provided, determine one based on the key. */ + if (!ufidp && key && key_len + && !dpif_netdev_flow_from_nlattrs(key, key_len, &flow)) { + dpif_flow_hash(pmd->dp->dpif, &flow, sizeof flow, &ufid); + ufidp = &ufid; + } - HMAP_FOR_EACH_WITH_HASH (netdev_flow, node, flow_hash(flow, 0), - &dp->flow_table) { - if (flow_equal(&netdev_flow->flow, flow)) { - return netdev_flow; + if (ufidp) { + CMAP_FOR_EACH_WITH_HASH (netdev_flow, node, dp_netdev_flow_hash(ufidp), + &pmd->flow_table) { + if (ovs_u128_equal(&netdev_flow->ufid, ufidp)) { + return netdev_flow; + } } } @@ -1094,21 +1772,69 @@ dp_netdev_find_flow(const struct dp_netdev *dp, const struct flow *flow) } static void -get_dpif_flow_stats(struct dp_netdev_flow *netdev_flow, +get_dpif_flow_stats(const struct dp_netdev_flow *netdev_flow_, struct dpif_flow_stats *stats) { - struct dp_netdev_flow_stats *bucket; - size_t i; + struct dp_netdev_flow *netdev_flow; + unsigned long long n; + long long used; + uint16_t flags; + + netdev_flow = CONST_CAST(struct dp_netdev_flow *, netdev_flow_); + + atomic_read_relaxed(&netdev_flow->stats.packet_count, &n); + stats->n_packets = n; + atomic_read_relaxed(&netdev_flow->stats.byte_count, &n); + stats->n_bytes = n; + atomic_read_relaxed(&netdev_flow->stats.used, &used); + stats->used = used; + atomic_read_relaxed(&netdev_flow->stats.tcp_flags, &flags); + stats->tcp_flags = flags; +} + +/* Converts to the dpif_flow format, using 'key_buf' and 'mask_buf' for + * storing the netlink-formatted key/mask. 'key_buf' may be the same as + * 'mask_buf'. Actions will be returned without copying, by relying on RCU to + * protect them. */ +static void +dp_netdev_flow_to_dpif_flow(const struct dp_netdev_flow *netdev_flow, + struct ofpbuf *key_buf, struct ofpbuf *mask_buf, + struct dpif_flow *flow, bool terse) +{ + if (terse) { + memset(flow, 0, sizeof *flow); + } else { + struct flow_wildcards wc; + struct dp_netdev_actions *actions; + size_t offset; + + miniflow_expand(&netdev_flow->cr.mask->mf, &wc.masks); + + /* Key */ + offset = key_buf->size; + flow->key = ofpbuf_tail(key_buf); + odp_flow_key_from_flow(key_buf, &netdev_flow->flow, &wc.masks, + netdev_flow->flow.in_port.odp_port, true); + flow->key_len = key_buf->size - offset; + + /* Mask */ + offset = mask_buf->size; + flow->mask = ofpbuf_tail(mask_buf); + odp_flow_key_from_mask(mask_buf, &wc.masks, &netdev_flow->flow, + odp_to_u32(wc.masks.in_port.odp_port), + SIZE_MAX, true); + flow->mask_len = mask_buf->size - offset; - memset(stats, 0, sizeof *stats); - OVSTHREAD_STATS_FOR_EACH_BUCKET (bucket, i, &netdev_flow->stats) { - ovs_mutex_lock(&bucket->mutex); - stats->n_packets += bucket->packet_count; - stats->n_bytes += bucket->byte_count; - stats->used = MAX(stats->used, bucket->used); - stats->tcp_flags |= bucket->tcp_flags; - ovs_mutex_unlock(&bucket->mutex); + /* Actions */ + actions = dp_netdev_flow_get_actions(netdev_flow); + flow->actions = actions->actions; + flow->actions_len = actions->size; } + + flow->ufid = netdev_flow->ufid; + flow->ufid_present = true; + flow->pmd_id = netdev_flow->pmd_id; + get_dpif_flow_stats(netdev_flow, &flow->stats); } static int @@ -1204,90 +1930,82 @@ dpif_netdev_flow_from_nlattrs(const struct nlattr *key, uint32_t key_len, } static int -dpif_netdev_flow_get(const struct dpif *dpif, - const struct nlattr *nl_key, size_t nl_key_len, - struct ofpbuf **actionsp, struct dpif_flow_stats *stats) +dpif_netdev_flow_get(const struct dpif *dpif, const struct dpif_flow_get *get) { struct dp_netdev *dp = get_dp_netdev(dpif); struct dp_netdev_flow *netdev_flow; - struct flow key; - int error; + struct dp_netdev_pmd_thread *pmd; + int pmd_id = get->pmd_id == PMD_ID_NULL ? NON_PMD_CORE_ID : get->pmd_id; + int error = 0; - error = dpif_netdev_flow_from_nlattrs(nl_key, nl_key_len, &key); - if (error) { - return error; + pmd = dp_netdev_get_pmd(dp, pmd_id); + if (!pmd) { + return EINVAL; } - fat_rwlock_rdlock(&dp->cls.rwlock); - netdev_flow = dp_netdev_find_flow(dp, &key); - fat_rwlock_unlock(&dp->cls.rwlock); - + netdev_flow = dp_netdev_pmd_find_flow(pmd, get->ufid, get->key, + get->key_len); if (netdev_flow) { - if (stats) { - get_dpif_flow_stats(netdev_flow, stats); - } - - if (actionsp) { - struct dp_netdev_actions *actions; - - actions = dp_netdev_flow_get_actions(netdev_flow); - *actionsp = ofpbuf_clone_data(actions->actions, actions->size); - } - } else { + dp_netdev_flow_to_dpif_flow(netdev_flow, get->buffer, get->buffer, + get->flow, false); + } else { error = ENOENT; } + dp_netdev_pmd_unref(pmd); + return error; } -static int -dp_netdev_flow_add(struct dp_netdev *dp, const struct flow *flow, - const struct flow_wildcards *wc, - const struct nlattr *actions, - size_t actions_len) - OVS_REQUIRES(dp->flow_mutex) +static struct dp_netdev_flow * +dp_netdev_flow_add(struct dp_netdev_pmd_thread *pmd, + struct match *match, const ovs_u128 *ufid, + const struct nlattr *actions, size_t actions_len) + OVS_REQUIRES(pmd->flow_mutex) { - struct dp_netdev_flow *netdev_flow; - struct match match; + struct dp_netdev_flow *flow; + struct netdev_flow_key mask; - netdev_flow = xzalloc(sizeof *netdev_flow); - *CONST_CAST(struct flow *, &netdev_flow->flow) = *flow; + netdev_flow_mask_init(&mask, match); + /* Make sure wc does not have metadata. */ + ovs_assert(!(mask.mf.map & (MINIFLOW_MAP(metadata) | MINIFLOW_MAP(regs)))); - ovs_mutex_init(&netdev_flow->mutex); + /* Do not allocate extra space. */ + flow = xmalloc(sizeof *flow - sizeof flow->cr.flow.mf + mask.len); + memset(&flow->stats, 0, sizeof flow->stats); + flow->dead = false; + *CONST_CAST(int *, &flow->pmd_id) = pmd->core_id; + *CONST_CAST(struct flow *, &flow->flow) = match->flow; + *CONST_CAST(ovs_u128 *, &flow->ufid) = *ufid; + ovs_refcount_init(&flow->ref_cnt); + ovsrcu_set(&flow->actions, dp_netdev_actions_create(actions, actions_len)); - ovsthread_stats_init(&netdev_flow->stats); + netdev_flow_key_init_masked(&flow->cr.flow, &match->flow, &mask); + dpcls_insert(&pmd->cls, &flow->cr, &mask); - ovsrcu_set(&netdev_flow->actions, - dp_netdev_actions_create(actions, actions_len)); + cmap_insert(&pmd->flow_table, CONST_CAST(struct cmap_node *, &flow->node), + dp_netdev_flow_hash(&flow->ufid)); - match_init(&match, flow, wc); - cls_rule_init(CONST_CAST(struct cls_rule *, &netdev_flow->cr), - &match, NETDEV_RULE_PRIORITY); - fat_rwlock_wrlock(&dp->cls.rwlock); - classifier_insert(&dp->cls, - CONST_CAST(struct cls_rule *, &netdev_flow->cr)); - hmap_insert(&dp->flow_table, - CONST_CAST(struct hmap_node *, &netdev_flow->node), - flow_hash(flow, 0)); - fat_rwlock_unlock(&dp->cls.rwlock); + if (OVS_UNLIKELY(VLOG_IS_DBG_ENABLED())) { + struct match match; + struct ds ds = DS_EMPTY_INITIALIZER; - return 0; -} + match.flow = flow->flow; + miniflow_expand(&flow->cr.mask->mf, &match.wc.masks); -static void -clear_stats(struct dp_netdev_flow *netdev_flow) -{ - struct dp_netdev_flow_stats *bucket; - size_t i; + ds_put_cstr(&ds, "flow_add: "); + odp_format_ufid(ufid, &ds); + ds_put_cstr(&ds, " "); + match_format(&match, &ds, OFP_DEFAULT_PRIORITY); + ds_put_cstr(&ds, ", actions:"); + format_odp_actions(&ds, actions, actions_len); + + VLOG_DBG_RL(&upcall_rl, "%s", ds_cstr(&ds)); - OVSTHREAD_STATS_FOR_EACH_BUCKET (bucket, i, &netdev_flow->stats) { - ovs_mutex_lock(&bucket->mutex); - bucket->used = 0; - bucket->packet_count = 0; - bucket->byte_count = 0; - bucket->tcp_flags = 0; - ovs_mutex_unlock(&bucket->mutex); + ds_destroy(&ds); } + + return flow; } static int @@ -1295,33 +2013,51 @@ dpif_netdev_flow_put(struct dpif *dpif, const struct dpif_flow_put *put) { struct dp_netdev *dp = get_dp_netdev(dpif); struct dp_netdev_flow *netdev_flow; - struct flow flow; - struct miniflow miniflow; - struct flow_wildcards wc; + struct netdev_flow_key key; + struct dp_netdev_pmd_thread *pmd; + struct match match; + ovs_u128 ufid; + int pmd_id = put->pmd_id == PMD_ID_NULL ? NON_PMD_CORE_ID : put->pmd_id; int error; - error = dpif_netdev_flow_from_nlattrs(put->key, put->key_len, &flow); + error = dpif_netdev_flow_from_nlattrs(put->key, put->key_len, &match.flow); if (error) { return error; } error = dpif_netdev_mask_from_nlattrs(put->key, put->key_len, put->mask, put->mask_len, - &flow, &wc.masks); + &match.flow, &match.wc.masks); if (error) { return error; } - miniflow_init(&miniflow, &flow); - ovs_mutex_lock(&dp->flow_mutex); - netdev_flow = dp_netdev_lookup_flow(dp, &miniflow); + pmd = dp_netdev_get_pmd(dp, pmd_id); + if (!pmd) { + return EINVAL; + } + + /* Must produce a netdev_flow_key for lookup. + * This interface is no longer performance critical, since it is not used + * for upcall processing any more. */ + netdev_flow_key_from_flow(&key, &match.flow); + + if (put->ufid) { + ufid = *put->ufid; + } else { + dpif_flow_hash(dpif, &match.flow, sizeof match.flow, &ufid); + } + + ovs_mutex_lock(&pmd->flow_mutex); + netdev_flow = dp_netdev_pmd_lookup_flow(pmd, &key); if (!netdev_flow) { if (put->flags & DPIF_FP_CREATE) { - if (hmap_count(&dp->flow_table) < MAX_FLOWS) { + if (cmap_count(&pmd->flow_table) < MAX_FLOWS) { if (put->stats) { memset(put->stats, 0, sizeof *put->stats); } - error = dp_netdev_flow_add(dp, &flow, &wc, put->actions, - put->actions_len); + dp_netdev_flow_add(pmd, &match, &ufid, put->actions, + put->actions_len); + error = 0; } else { error = EFBIG; } @@ -1330,7 +2066,7 @@ dpif_netdev_flow_put(struct dpif *dpif, const struct dpif_flow_put *put) } } else { if (put->flags & DPIF_FP_MODIFY - && flow_equal(&flow, &netdev_flow->flow)) { + && flow_equal(&match.flow, &netdev_flow->flow)) { struct dp_netdev_actions *new_actions; struct dp_netdev_actions *old_actions; @@ -1344,7 +2080,16 @@ dpif_netdev_flow_put(struct dpif *dpif, const struct dpif_flow_put *put) get_dpif_flow_stats(netdev_flow, put->stats); } if (put->flags & DPIF_FP_ZERO_STATS) { - clear_stats(netdev_flow); + /* XXX: The userspace datapath uses thread local statistics + * (for flows), which should be updated only by the owning + * thread. Since we cannot write on stats memory here, + * we choose not to support this flag. Please note: + * - This feature is currently used only by dpctl commands with + * option --clear. + * - Should the need arise, this operation can be implemented + * by keeping a base value (to be update here) for each + * counter, and subtracting it before outputting the stats */ + error = EOPNOTSUPP; } ovsrcu_postpone(dp_netdev_actions_free, old_actions); @@ -1355,7 +2100,8 @@ dpif_netdev_flow_put(struct dpif *dpif, const struct dpif_flow_put *put) error = EINVAL; } } - ovs_mutex_unlock(&dp->flow_mutex); + ovs_mutex_unlock(&pmd->flow_mutex); + dp_netdev_pmd_unref(pmd); return error; } @@ -1365,260 +2111,317 @@ dpif_netdev_flow_del(struct dpif *dpif, const struct dpif_flow_del *del) { struct dp_netdev *dp = get_dp_netdev(dpif); struct dp_netdev_flow *netdev_flow; - struct flow key; - int error; + struct dp_netdev_pmd_thread *pmd; + int pmd_id = del->pmd_id == PMD_ID_NULL ? NON_PMD_CORE_ID : del->pmd_id; + int error = 0; - error = dpif_netdev_flow_from_nlattrs(del->key, del->key_len, &key); - if (error) { - return error; + pmd = dp_netdev_get_pmd(dp, pmd_id); + if (!pmd) { + return EINVAL; } - ovs_mutex_lock(&dp->flow_mutex); - fat_rwlock_wrlock(&dp->cls.rwlock); - netdev_flow = dp_netdev_find_flow(dp, &key); + ovs_mutex_lock(&pmd->flow_mutex); + netdev_flow = dp_netdev_pmd_find_flow(pmd, del->ufid, del->key, + del->key_len); if (netdev_flow) { if (del->stats) { get_dpif_flow_stats(netdev_flow, del->stats); } - dp_netdev_remove_flow(dp, netdev_flow); + dp_netdev_pmd_remove_flow(pmd, netdev_flow); } else { error = ENOENT; } - fat_rwlock_unlock(&dp->cls.rwlock); - ovs_mutex_unlock(&dp->flow_mutex); + ovs_mutex_unlock(&pmd->flow_mutex); + dp_netdev_pmd_unref(pmd); return error; } -struct dp_netdev_flow_state { - struct dp_netdev_actions *actions; - struct odputil_keybuf keybuf; - struct odputil_keybuf maskbuf; - struct dpif_flow_stats stats; -}; - -struct dp_netdev_flow_iter { - uint32_t bucket; - uint32_t offset; +struct dpif_netdev_flow_dump { + struct dpif_flow_dump up; + struct cmap_position poll_thread_pos; + struct cmap_position flow_pos; + struct dp_netdev_pmd_thread *cur_pmd; int status; struct ovs_mutex mutex; }; -static void -dpif_netdev_flow_dump_state_init(void **statep) +static struct dpif_netdev_flow_dump * +dpif_netdev_flow_dump_cast(struct dpif_flow_dump *dump) { - struct dp_netdev_flow_state *state; - - *statep = state = xmalloc(sizeof *state); - state->actions = NULL; + return CONTAINER_OF(dump, struct dpif_netdev_flow_dump, up); } -static void -dpif_netdev_flow_dump_state_uninit(void *state_) +static struct dpif_flow_dump * +dpif_netdev_flow_dump_create(const struct dpif *dpif_, bool terse) { - struct dp_netdev_flow_state *state = state_; + struct dpif_netdev_flow_dump *dump; - free(state); + dump = xzalloc(sizeof *dump); + dpif_flow_dump_init(&dump->up, dpif_); + dump->up.terse = terse; + ovs_mutex_init(&dump->mutex); + + return &dump->up; } static int -dpif_netdev_flow_dump_start(const struct dpif *dpif OVS_UNUSED, void **iterp) +dpif_netdev_flow_dump_destroy(struct dpif_flow_dump *dump_) { - struct dp_netdev_flow_iter *iter; + struct dpif_netdev_flow_dump *dump = dpif_netdev_flow_dump_cast(dump_); - *iterp = iter = xmalloc(sizeof *iter); - iter->bucket = 0; - iter->offset = 0; - iter->status = 0; - ovs_mutex_init(&iter->mutex); + ovs_mutex_destroy(&dump->mutex); + free(dump); return 0; } -/* XXX the caller must use 'actions' without quiescing */ -static int -dpif_netdev_flow_dump_next(const struct dpif *dpif, void *iter_, void *state_, - const struct nlattr **key, size_t *key_len, - const struct nlattr **mask, size_t *mask_len, - const struct nlattr **actions, size_t *actions_len, - const struct dpif_flow_stats **stats) -{ - struct dp_netdev_flow_iter *iter = iter_; - struct dp_netdev_flow_state *state = state_; - struct dp_netdev *dp = get_dp_netdev(dpif); - struct dp_netdev_flow *netdev_flow; - struct flow_wildcards wc; - int error; - - ovs_mutex_lock(&iter->mutex); - error = iter->status; - if (!error) { - struct hmap_node *node; +struct dpif_netdev_flow_dump_thread { + struct dpif_flow_dump_thread up; + struct dpif_netdev_flow_dump *dump; + struct odputil_keybuf keybuf[FLOW_DUMP_MAX_BATCH]; + struct odputil_keybuf maskbuf[FLOW_DUMP_MAX_BATCH]; +}; - fat_rwlock_rdlock(&dp->cls.rwlock); - node = hmap_at_position(&dp->flow_table, &iter->bucket, &iter->offset); - if (node) { - netdev_flow = CONTAINER_OF(node, struct dp_netdev_flow, node); - } - fat_rwlock_unlock(&dp->cls.rwlock); - if (!node) { - iter->status = error = EOF; - } - } - ovs_mutex_unlock(&iter->mutex); - if (error) { - return error; - } +static struct dpif_netdev_flow_dump_thread * +dpif_netdev_flow_dump_thread_cast(struct dpif_flow_dump_thread *thread) +{ + return CONTAINER_OF(thread, struct dpif_netdev_flow_dump_thread, up); +} - minimask_expand(&netdev_flow->cr.match.mask, &wc); +static struct dpif_flow_dump_thread * +dpif_netdev_flow_dump_thread_create(struct dpif_flow_dump *dump_) +{ + struct dpif_netdev_flow_dump *dump = dpif_netdev_flow_dump_cast(dump_); + struct dpif_netdev_flow_dump_thread *thread; - if (key) { - struct ofpbuf buf; + thread = xmalloc(sizeof *thread); + dpif_flow_dump_thread_init(&thread->up, &dump->up); + thread->dump = dump; + return &thread->up; +} - ofpbuf_use_stack(&buf, &state->keybuf, sizeof state->keybuf); - odp_flow_key_from_flow(&buf, &netdev_flow->flow, &wc.masks, - netdev_flow->flow.in_port.odp_port); +static void +dpif_netdev_flow_dump_thread_destroy(struct dpif_flow_dump_thread *thread_) +{ + struct dpif_netdev_flow_dump_thread *thread + = dpif_netdev_flow_dump_thread_cast(thread_); - *key = ofpbuf_data(&buf); - *key_len = ofpbuf_size(&buf); - } + free(thread); +} - if (key && mask) { - struct ofpbuf buf; +static int +dpif_netdev_flow_dump_next(struct dpif_flow_dump_thread *thread_, + struct dpif_flow *flows, int max_flows) +{ + struct dpif_netdev_flow_dump_thread *thread + = dpif_netdev_flow_dump_thread_cast(thread_); + struct dpif_netdev_flow_dump *dump = thread->dump; + struct dp_netdev_flow *netdev_flows[FLOW_DUMP_MAX_BATCH]; + int n_flows = 0; + int i; - ofpbuf_use_stack(&buf, &state->maskbuf, sizeof state->maskbuf); - odp_flow_key_from_mask(&buf, &wc.masks, &netdev_flow->flow, - odp_to_u32(wc.masks.in_port.odp_port), - SIZE_MAX); + ovs_mutex_lock(&dump->mutex); + if (!dump->status) { + struct dpif_netdev *dpif = dpif_netdev_cast(thread->up.dpif); + struct dp_netdev *dp = get_dp_netdev(&dpif->dpif); + struct dp_netdev_pmd_thread *pmd = dump->cur_pmd; + int flow_limit = MIN(max_flows, FLOW_DUMP_MAX_BATCH); + + /* First call to dump_next(), extracts the first pmd thread. + * If there is no pmd thread, returns immediately. */ + if (!pmd) { + pmd = dp_netdev_pmd_get_next(dp, &dump->poll_thread_pos); + if (!pmd) { + ovs_mutex_unlock(&dump->mutex); + return n_flows; - *mask = ofpbuf_data(&buf); - *mask_len = ofpbuf_size(&buf); - } + } + } - if (actions || stats) { - state->actions = NULL; + do { + for (n_flows = 0; n_flows < flow_limit; n_flows++) { + struct cmap_node *node; - if (actions) { - state->actions = dp_netdev_flow_get_actions(netdev_flow); - *actions = state->actions->actions; - *actions_len = state->actions->size; - } + node = cmap_next_position(&pmd->flow_table, &dump->flow_pos); + if (!node) { + break; + } + netdev_flows[n_flows] = CONTAINER_OF(node, + struct dp_netdev_flow, + node); + } + /* When finishing dumping the current pmd thread, moves to + * the next. */ + if (n_flows < flow_limit) { + memset(&dump->flow_pos, 0, sizeof dump->flow_pos); + dp_netdev_pmd_unref(pmd); + pmd = dp_netdev_pmd_get_next(dp, &dump->poll_thread_pos); + if (!pmd) { + dump->status = EOF; + break; + } + } + /* Keeps the reference to next caller. */ + dump->cur_pmd = pmd; - if (stats) { - get_dpif_flow_stats(netdev_flow, &state->stats); - *stats = &state->stats; - } + /* If the current dump is empty, do not exit the loop, since the + * remaining pmds could have flows to be dumped. Just dumps again + * on the new 'pmd'. */ + } while (!n_flows); } + ovs_mutex_unlock(&dump->mutex); - return 0; -} + for (i = 0; i < n_flows; i++) { + struct odputil_keybuf *maskbuf = &thread->maskbuf[i]; + struct odputil_keybuf *keybuf = &thread->keybuf[i]; + struct dp_netdev_flow *netdev_flow = netdev_flows[i]; + struct dpif_flow *f = &flows[i]; + struct ofpbuf key, mask; -static int -dpif_netdev_flow_dump_done(const struct dpif *dpif OVS_UNUSED, void *iter_) -{ - struct dp_netdev_flow_iter *iter = iter_; + ofpbuf_use_stack(&key, keybuf, sizeof *keybuf); + ofpbuf_use_stack(&mask, maskbuf, sizeof *maskbuf); + dp_netdev_flow_to_dpif_flow(netdev_flow, &key, &mask, f, + dump->up.terse); + } - ovs_mutex_destroy(&iter->mutex); - free(iter); - return 0; + return n_flows; } static int dpif_netdev_execute(struct dpif *dpif, struct dpif_execute *execute) + OVS_NO_THREAD_SAFETY_ANALYSIS { struct dp_netdev *dp = get_dp_netdev(dpif); - struct pkt_metadata *md = &execute->md; - struct { - struct miniflow flow; - uint32_t buf[FLOW_U32S]; - } key; - - if (ofpbuf_size(execute->packet) < ETH_HEADER_LEN || - ofpbuf_size(execute->packet) > UINT16_MAX) { + struct dp_netdev_pmd_thread *pmd; + struct dp_packet *pp; + + if (dp_packet_size(execute->packet) < ETH_HEADER_LEN || + dp_packet_size(execute->packet) > UINT16_MAX) { return EINVAL; } - /* Extract flow key. */ - miniflow_initialize(&key.flow, key.buf); - miniflow_extract(execute->packet, md, &key.flow); + /* Tries finding the 'pmd'. If NULL is returned, that means + * the current thread is a non-pmd thread and should use + * dp_netdev_get_pmd(dp, NON_PMD_CORE_ID). */ + pmd = ovsthread_getspecific(dp->per_pmd_key); + if (!pmd) { + pmd = dp_netdev_get_pmd(dp, NON_PMD_CORE_ID); + } + + /* If the current thread is non-pmd thread, acquires + * the 'non_pmd_mutex'. */ + if (pmd->core_id == NON_PMD_CORE_ID) { + ovs_mutex_lock(&dp->non_pmd_mutex); + ovs_mutex_lock(&dp->port_mutex); + } - ovs_rwlock_rdlock(&dp->port_rwlock); - dp_netdev_execute_actions(dp, &key.flow, execute->packet, false, md, - execute->actions, execute->actions_len); - ovs_rwlock_unlock(&dp->port_rwlock); + pp = execute->packet; + dp_netdev_execute_actions(pmd, &pp, 1, false, execute->actions, + execute->actions_len); + if (pmd->core_id == NON_PMD_CORE_ID) { + dp_netdev_pmd_unref(pmd); + ovs_mutex_unlock(&dp->port_mutex); + ovs_mutex_unlock(&dp->non_pmd_mutex); + } return 0; } static void -dp_netdev_destroy_all_queues(struct dp_netdev *dp) - OVS_REQ_WRLOCK(dp->queue_rwlock) +dpif_netdev_operate(struct dpif *dpif, struct dpif_op **ops, size_t n_ops) { size_t i; - dp_netdev_purge_queues(dp); + for (i = 0; i < n_ops; i++) { + struct dpif_op *op = ops[i]; + + switch (op->type) { + case DPIF_OP_FLOW_PUT: + op->error = dpif_netdev_flow_put(dpif, &op->u.flow_put); + break; + + case DPIF_OP_FLOW_DEL: + op->error = dpif_netdev_flow_del(dpif, &op->u.flow_del); + break; - for (i = 0; i < dp->n_handlers; i++) { - struct dp_netdev_queue *q = &dp->handler_queues[i]; + case DPIF_OP_EXECUTE: + op->error = dpif_netdev_execute(dpif, &op->u.execute); + break; - ovs_mutex_destroy(&q->mutex); - seq_destroy(q->seq); + case DPIF_OP_FLOW_GET: + op->error = dpif_netdev_flow_get(dpif, &op->u.flow_get); + break; + } } - free(dp->handler_queues); - dp->handler_queues = NULL; - dp->n_handlers = 0; } -static void -dp_netdev_refresh_queues(struct dp_netdev *dp, uint32_t n_handlers) - OVS_REQ_WRLOCK(dp->queue_rwlock) +/* Returns true if the configuration for rx queues or cpu mask + * is changed. */ +static bool +pmd_config_changed(const struct dp_netdev *dp, size_t rxqs, const char *cmask) { - if (dp->n_handlers != n_handlers) { - size_t i; - - dp_netdev_destroy_all_queues(dp); - - dp->n_handlers = n_handlers; - dp->handler_queues = xzalloc(n_handlers * sizeof *dp->handler_queues); - - for (i = 0; i < n_handlers; i++) { - struct dp_netdev_queue *q = &dp->handler_queues[i]; - - ovs_mutex_init(&q->mutex); - q->seq = seq_create(); + if (dp->n_dpdk_rxqs != rxqs) { + return true; + } else { + if (dp->pmd_cmask != NULL && cmask != NULL) { + return strcmp(dp->pmd_cmask, cmask); + } else { + return (dp->pmd_cmask != NULL || cmask != NULL); } } } +/* Resets pmd threads if the configuration for 'rxq's or cpu mask changes. */ static int -dpif_netdev_recv_set(struct dpif *dpif, bool enable) +dpif_netdev_pmd_set(struct dpif *dpif, unsigned int n_rxqs, const char *cmask) { struct dp_netdev *dp = get_dp_netdev(dpif); - if ((dp->handler_queues != NULL) == enable) { - return 0; - } + if (pmd_config_changed(dp, n_rxqs, cmask)) { + struct dp_netdev_port *port; - fat_rwlock_wrlock(&dp->queue_rwlock); - if (!enable) { - dp_netdev_destroy_all_queues(dp); - } else { - dp_netdev_refresh_queues(dp, 1); - } - fat_rwlock_unlock(&dp->queue_rwlock); + dp_netdev_destroy_all_pmds(dp); - return 0; -} + CMAP_FOR_EACH (port, node, &dp->ports) { + if (netdev_is_pmd(port->netdev)) { + int i, err; -static int -dpif_netdev_handlers_set(struct dpif *dpif, uint32_t n_handlers) -{ - struct dp_netdev *dp = get_dp_netdev(dpif); + /* Closes the existing 'rxq's. */ + for (i = 0; i < netdev_n_rxq(port->netdev); i++) { + netdev_rxq_close(port->rxq[i]); + port->rxq[i] = NULL; + } + + /* Sets the new rx queue config. */ + err = netdev_set_multiq(port->netdev, ovs_numa_get_n_cores(), + n_rxqs); + if (err && (err != EOPNOTSUPP)) { + VLOG_ERR("Failed to set dpdk interface %s rx_queue to:" + " %u", netdev_get_name(port->netdev), + n_rxqs); + return err; + } - fat_rwlock_wrlock(&dp->queue_rwlock); - if (dp->handler_queues) { - dp_netdev_refresh_queues(dp, n_handlers); + /* If the set_multiq() above succeeds, reopens the 'rxq's. */ + port->rxq = xrealloc(port->rxq, sizeof *port->rxq + * netdev_n_rxq(port->netdev)); + for (i = 0; i < netdev_n_rxq(port->netdev); i++) { + netdev_rxq_open(port->netdev, &port->rxq[i], i); + } + } + } + dp->n_dpdk_rxqs = n_rxqs; + + /* Reconfigures the cpu mask. */ + ovs_numa_set_cpu_mask(cmask); + free(dp->pmd_cmask); + dp->pmd_cmask = cmask ? xstrdup(cmask) : NULL; + + /* Restores the non-pmd. */ + dp_netdev_set_nonpmd(dp); + /* Restores all pmd threads. */ + dp_netdev_reset_pmd_threads(dp); } - fat_rwlock_unlock(&dp->queue_rwlock); return 0; } @@ -1631,172 +2434,129 @@ dpif_netdev_queue_to_priority(const struct dpif *dpif OVS_UNUSED, return 0; } -static bool -dp_netdev_recv_check(const struct dp_netdev *dp, const uint32_t handler_id) - OVS_REQ_RDLOCK(dp->queue_rwlock) + +/* Creates and returns a new 'struct dp_netdev_actions', whose actions are + * a copy of the 'ofpacts_len' bytes of 'ofpacts'. */ +struct dp_netdev_actions * +dp_netdev_actions_create(const struct nlattr *actions, size_t size) { - static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5); - - if (!dp->handler_queues) { - VLOG_WARN_RL(&rl, "receiving upcall disabled"); - return false; - } + struct dp_netdev_actions *netdev_actions; - if (handler_id >= dp->n_handlers) { - VLOG_WARN_RL(&rl, "handler index out of bound"); - return false; - } + netdev_actions = xmalloc(sizeof *netdev_actions + size); + memcpy(netdev_actions->actions, actions, size); + netdev_actions->size = size; - return true; + return netdev_actions; } -static int -dpif_netdev_recv(struct dpif *dpif, uint32_t handler_id, - struct dpif_upcall *upcall, struct ofpbuf *buf) +struct dp_netdev_actions * +dp_netdev_flow_get_actions(const struct dp_netdev_flow *flow) { - struct dp_netdev *dp = get_dp_netdev(dpif); - struct dp_netdev_queue *q; - int error = 0; - - fat_rwlock_rdlock(&dp->queue_rwlock); - - if (!dp_netdev_recv_check(dp, handler_id)) { - error = EAGAIN; - goto out; - } - - q = &dp->handler_queues[handler_id]; - ovs_mutex_lock(&q->mutex); - if (q->head != q->tail) { - struct dp_netdev_upcall *u = &q->upcalls[q->tail++ & QUEUE_MASK]; - - *upcall = u->upcall; - - ofpbuf_uninit(buf); - *buf = u->buf; - } else { - error = EAGAIN; - } - ovs_mutex_unlock(&q->mutex); - -out: - fat_rwlock_unlock(&dp->queue_rwlock); - - return error; -} + return ovsrcu_get(struct dp_netdev_actions *, &flow->actions); +} static void -dpif_netdev_recv_wait(struct dpif *dpif, uint32_t handler_id) -{ - struct dp_netdev *dp = get_dp_netdev(dpif); - struct dp_netdev_queue *q; - uint64_t seq; - - fat_rwlock_rdlock(&dp->queue_rwlock); - - if (!dp_netdev_recv_check(dp, handler_id)) { - goto out; - } - - q = &dp->handler_queues[handler_id]; - ovs_mutex_lock(&q->mutex); - seq = seq_read(q->seq); - if (q->head != q->tail) { - poll_immediate_wake(); - } else { - seq_wait(q->seq, seq); - } - - ovs_mutex_unlock(&q->mutex); - -out: - fat_rwlock_unlock(&dp->queue_rwlock); -} - -static void -dpif_netdev_recv_purge(struct dpif *dpif) +dp_netdev_actions_free(struct dp_netdev_actions *actions) { - struct dpif_netdev *dpif_netdev = dpif_netdev_cast(dpif); - - fat_rwlock_wrlock(&dpif_netdev->dp->queue_rwlock); - dp_netdev_purge_queues(dpif_netdev->dp); - fat_rwlock_unlock(&dpif_netdev->dp->queue_rwlock); + free(actions); } -/* Creates and returns a new 'struct dp_netdev_actions', with a reference count - * of 1, whose actions are a copy of from the 'ofpacts_len' bytes of - * 'ofpacts'. */ -struct dp_netdev_actions * -dp_netdev_actions_create(const struct nlattr *actions, size_t size) +static inline unsigned long long +cycles_counter(void) { - struct dp_netdev_actions *netdev_actions; - - netdev_actions = xmalloc(sizeof *netdev_actions); - netdev_actions->actions = xmemdup(actions, size); - netdev_actions->size = size; - - return netdev_actions; +#ifdef DPDK_NETDEV + return rte_get_tsc_cycles(); +#else + return 0; +#endif } -struct dp_netdev_actions * -dp_netdev_flow_get_actions(const struct dp_netdev_flow *flow) +/* Fake mutex to make sure that the calls to cycles_count_* are balanced */ +extern struct ovs_mutex cycles_counter_fake_mutex; + +/* Start counting cycles. Must be followed by 'cycles_count_end()' */ +static inline void +cycles_count_start(struct dp_netdev_pmd_thread *pmd) + OVS_ACQUIRES(&cycles_counter_fake_mutex) + OVS_NO_THREAD_SAFETY_ANALYSIS { - return ovsrcu_get(struct dp_netdev_actions *, &flow->actions); + pmd->last_cycles = cycles_counter(); } -static void -dp_netdev_actions_free(struct dp_netdev_actions *actions) +/* Stop counting cycles and add them to the counter 'type' */ +static inline void +cycles_count_end(struct dp_netdev_pmd_thread *pmd, + enum pmd_cycles_counter_type type) + OVS_RELEASES(&cycles_counter_fake_mutex) + OVS_NO_THREAD_SAFETY_ANALYSIS { - free(actions->actions); - free(actions); + unsigned long long interval = cycles_counter() - pmd->last_cycles; + + non_atomic_ullong_add(&pmd->cycles.n[type], interval); } - static void -dp_netdev_process_rxq_port(struct dp_netdev *dp, - struct dp_netdev_port *port, - struct netdev_rxq *rxq) +dp_netdev_process_rxq_port(struct dp_netdev_pmd_thread *pmd, + struct dp_netdev_port *port, + struct netdev_rxq *rxq) { - struct ofpbuf *packet[NETDEV_MAX_RX_BATCH]; - int error, c; + struct dp_packet *packets[NETDEV_MAX_RX_BATCH]; + int error, cnt; - error = netdev_rxq_recv(rxq, packet, &c); + cycles_count_start(pmd); + error = netdev_rxq_recv(rxq, packets, &cnt); + cycles_count_end(pmd, PMD_CYCLES_POLLING); if (!error) { - struct pkt_metadata md = PKT_METADATA_INITIALIZER(port->port_no); int i; - for (i = 0; i < c; i++) { - dp_netdev_port_input(dp, packet[i], &md); + *recirc_depth_get() = 0; + + /* XXX: initialize md in netdev implementation. */ + for (i = 0; i < cnt; i++) { + packets[i]->md = PKT_METADATA_INITIALIZER(port->port_no); } + cycles_count_start(pmd); + dp_netdev_input(pmd, packets, cnt); + cycles_count_end(pmd, PMD_CYCLES_PROCESSING); } else if (error != EAGAIN && error != EOPNOTSUPP) { - static struct vlog_rate_limit rl - = VLOG_RATE_LIMIT_INIT(1, 5); + static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5); VLOG_ERR_RL(&rl, "error receiving data from %s: %s", - netdev_get_name(port->netdev), - ovs_strerror(error)); + netdev_get_name(port->netdev), ovs_strerror(error)); } } -static void +/* Return true if needs to revalidate datapath flows. */ +static bool dpif_netdev_run(struct dpif *dpif) { struct dp_netdev_port *port; struct dp_netdev *dp = get_dp_netdev(dpif); + struct dp_netdev_pmd_thread *non_pmd = dp_netdev_get_pmd(dp, + NON_PMD_CORE_ID); + uint64_t new_tnl_seq; - ovs_rwlock_rdlock(&dp->port_rwlock); - - HMAP_FOR_EACH (port, node, &dp->ports) { + ovs_mutex_lock(&dp->non_pmd_mutex); + CMAP_FOR_EACH (port, node, &dp->ports) { if (!netdev_is_pmd(port->netdev)) { int i; for (i = 0; i < netdev_n_rxq(port->netdev); i++) { - dp_netdev_process_rxq_port(dp, port, port->rxq[i]); + dp_netdev_process_rxq_port(non_pmd, port, port->rxq[i]); } } } + ovs_mutex_unlock(&dp->non_pmd_mutex); + dp_netdev_pmd_unref(non_pmd); - ovs_rwlock_unlock(&dp->port_rwlock); + tnl_arp_cache_run(); + new_tnl_seq = seq_read(tnl_conf_seq); + + if (dp->last_tnl_conf_seq != new_tnl_seq) { + dp->last_tnl_conf_seq = new_tnl_seq; + return true; + } + return false; } static void @@ -1805,9 +2565,8 @@ dpif_netdev_wait(struct dpif *dpif) struct dp_netdev_port *port; struct dp_netdev *dp = get_dp_netdev(dpif); - ovs_rwlock_rdlock(&dp->port_rwlock); - - HMAP_FOR_EACH (port, node, &dp->ports) { + ovs_mutex_lock(&dp_netdev_mutex); + CMAP_FOR_EACH (port, node, &dp->ports) { if (!netdev_is_pmd(port->netdev)) { int i; @@ -1816,7 +2575,8 @@ dpif_netdev_wait(struct dpif *dpif) } } } - ovs_rwlock_unlock(&dp->port_rwlock); + ovs_mutex_unlock(&dp_netdev_mutex); + seq_wait(tnl_conf_seq, dp->last_tnl_conf_seq); } struct rxq_poll { @@ -1825,44 +2585,48 @@ struct rxq_poll { }; static int -pmd_load_queues(struct pmd_thread *f, +pmd_load_queues(struct dp_netdev_pmd_thread *pmd, struct rxq_poll **ppoll_list, int poll_cnt) { - struct dp_netdev *dp = f->dp; struct rxq_poll *poll_list = *ppoll_list; struct dp_netdev_port *port; - int id = f->id; - int index; - int i; + int n_pmds_on_numa, index, i; /* Simple scheduler for netdev rx polling. */ - ovs_rwlock_rdlock(&dp->port_rwlock); for (i = 0; i < poll_cnt; i++) { - port_unref(poll_list[i].port); + port_unref(poll_list[i].port); } poll_cnt = 0; + n_pmds_on_numa = get_n_pmd_threads_on_numa(pmd->dp, pmd->numa_id); index = 0; - HMAP_FOR_EACH (port, node, &f->dp->ports) { - if (netdev_is_pmd(port->netdev)) { - int i; - - for (i = 0; i < netdev_n_rxq(port->netdev); i++) { - if ((index % dp->n_pmd_threads) == id) { - poll_list = xrealloc(poll_list, sizeof *poll_list * (poll_cnt + 1)); - - port_ref(port); - poll_list[poll_cnt].port = port; - poll_list[poll_cnt].rx = port->rxq[i]; - poll_cnt++; + CMAP_FOR_EACH (port, node, &pmd->dp->ports) { + /* Calls port_try_ref() to prevent the main thread + * from deleting the port. */ + if (port_try_ref(port)) { + if (netdev_is_pmd(port->netdev) + && netdev_get_numa_id(port->netdev) == pmd->numa_id) { + int i; + + for (i = 0; i < netdev_n_rxq(port->netdev); i++) { + if ((index % n_pmds_on_numa) == pmd->index) { + poll_list = xrealloc(poll_list, + sizeof *poll_list * (poll_cnt + 1)); + + port_ref(port); + poll_list[poll_cnt].port = port; + poll_list[poll_cnt].rx = port->rxq[i]; + poll_cnt++; + } + index++; } - index++; } + /* Unrefs the port_try_ref(). */ + port_unref(port); } } - ovs_rwlock_unlock(&dp->port_rwlock); *ppoll_list = poll_list; return poll_cnt; } @@ -1870,45 +2634,53 @@ pmd_load_queues(struct pmd_thread *f, static void * pmd_thread_main(void *f_) { - struct pmd_thread *f = f_; - struct dp_netdev *dp = f->dp; + struct dp_netdev_pmd_thread *pmd = f_; unsigned int lc = 0; struct rxq_poll *poll_list; - unsigned int port_seq; + unsigned int port_seq = PMD_INITIAL_SEQ; int poll_cnt; int i; poll_cnt = 0; poll_list = NULL; - pmd_thread_setaffinity_cpu(f->id); + /* Stores the pmd thread's 'pmd' to 'per_pmd_key'. */ + ovsthread_setspecific(pmd->dp->per_pmd_key, pmd); + pmd_thread_setaffinity_cpu(pmd->core_id); reload: - poll_cnt = pmd_load_queues(f, &poll_list, poll_cnt); - atomic_read(&f->change_seq, &port_seq); + emc_cache_init(&pmd->flow_cache); + poll_cnt = pmd_load_queues(pmd, &poll_list, poll_cnt); + + /* Signal here to make sure the pmd finishes + * reloading the updated configuration. */ + dp_netdev_pmd_reload_done(pmd); for (;;) { - unsigned int c_port_seq; int i; for (i = 0; i < poll_cnt; i++) { - dp_netdev_process_rxq_port(dp, poll_list[i].port, poll_list[i].rx); + dp_netdev_process_rxq_port(pmd, poll_list[i].port, poll_list[i].rx); } if (lc++ > 1024) { - ovsrcu_quiesce(); + unsigned int seq; - /* TODO: need completely userspace based signaling method. - * to keep this thread entirely in userspace. - * For now using atomic counter. */ lc = 0; - atomic_read_explicit(&f->change_seq, &c_port_seq, memory_order_consume); - if (c_port_seq != port_seq) { + + emc_cache_slow_sweep(&pmd->flow_cache); + ovsrcu_quiesce(); + + atomic_read_relaxed(&pmd->change_seq, &seq); + if (seq != port_seq) { + port_seq = seq; break; } } } - if (!latch_is_set(&f->dp->exit_latch)){ + emc_cache_uninit(&pmd->flow_cache); + + if (!latch_is_set(&pmd->exit_latch)){ goto reload; } @@ -1916,280 +2688,821 @@ reload: port_unref(poll_list[i].port); } + dp_netdev_pmd_reload_done(pmd); + free(poll_list); return NULL; } static void -dp_netdev_set_pmd_threads(struct dp_netdev *dp, int n) +dp_netdev_disable_upcall(struct dp_netdev *dp) + OVS_ACQUIRES(dp->upcall_rwlock) { - int i; + fat_rwlock_wrlock(&dp->upcall_rwlock); +} - if (n == dp->n_pmd_threads) { - return; - } +static void +dpif_netdev_disable_upcall(struct dpif *dpif) + OVS_NO_THREAD_SAFETY_ANALYSIS +{ + struct dp_netdev *dp = get_dp_netdev(dpif); + dp_netdev_disable_upcall(dp); +} - /* Stop existing threads. */ - latch_set(&dp->exit_latch); - dp_netdev_reload_pmd_threads(dp); - for (i = 0; i < dp->n_pmd_threads; i++) { - struct pmd_thread *f = &dp->pmd_threads[i]; +static void +dp_netdev_enable_upcall(struct dp_netdev *dp) + OVS_RELEASES(dp->upcall_rwlock) +{ + fat_rwlock_unlock(&dp->upcall_rwlock); +} + +static void +dpif_netdev_enable_upcall(struct dpif *dpif) + OVS_NO_THREAD_SAFETY_ANALYSIS +{ + struct dp_netdev *dp = get_dp_netdev(dpif); + dp_netdev_enable_upcall(dp); +} + +void +dp_netdev_pmd_reload_done(struct dp_netdev_pmd_thread *pmd) +{ + ovs_mutex_lock(&pmd->cond_mutex); + xpthread_cond_signal(&pmd->cond); + ovs_mutex_unlock(&pmd->cond_mutex); +} - xpthread_join(f->thread, NULL); +/* Finds and refs the dp_netdev_pmd_thread on core 'core_id'. Returns + * the pointer if succeeds, otherwise, NULL. + * + * Caller must unrefs the returned reference. */ +static struct dp_netdev_pmd_thread * +dp_netdev_get_pmd(struct dp_netdev *dp, int core_id) +{ + struct dp_netdev_pmd_thread *pmd; + const struct cmap_node *pnode; + + pnode = cmap_find(&dp->poll_threads, hash_int(core_id, 0)); + if (!pnode) { + return NULL; } - latch_poll(&dp->exit_latch); - free(dp->pmd_threads); + pmd = CONTAINER_OF(pnode, struct dp_netdev_pmd_thread, node); - /* Start new threads. */ - dp->pmd_threads = xmalloc(n * sizeof *dp->pmd_threads); - dp->n_pmd_threads = n; + return dp_netdev_pmd_try_ref(pmd) ? pmd : NULL; +} + +/* Sets the 'struct dp_netdev_pmd_thread' for non-pmd threads. */ +static void +dp_netdev_set_nonpmd(struct dp_netdev *dp) +{ + struct dp_netdev_pmd_thread *non_pmd; - for (i = 0; i < n; i++) { - struct pmd_thread *f = &dp->pmd_threads[i]; + non_pmd = xzalloc(sizeof *non_pmd); + dp_netdev_configure_pmd(non_pmd, dp, 0, NON_PMD_CORE_ID, + OVS_NUMA_UNSPEC); +} - f->dp = dp; - f->id = i; - atomic_store(&f->change_seq, 1); +/* Caller must have valid pointer to 'pmd'. */ +static bool +dp_netdev_pmd_try_ref(struct dp_netdev_pmd_thread *pmd) +{ + return ovs_refcount_try_ref_rcu(&pmd->ref_cnt); +} - /* Each thread will distribute all devices rx-queues among - * themselves. */ - f->thread = ovs_thread_create("pmd", pmd_thread_main, f); +static void +dp_netdev_pmd_unref(struct dp_netdev_pmd_thread *pmd) +{ + if (pmd && ovs_refcount_unref(&pmd->ref_cnt) == 1) { + ovsrcu_postpone(dp_netdev_destroy_pmd, pmd); } } - -static void * -dp_netdev_flow_stats_new_cb(void) +/* Given cmap position 'pos', tries to ref the next node. If try_ref() + * fails, keeps checking for next node until reaching the end of cmap. + * + * Caller must unrefs the returned reference. */ +static struct dp_netdev_pmd_thread * +dp_netdev_pmd_get_next(struct dp_netdev *dp, struct cmap_position *pos) { - struct dp_netdev_flow_stats *bucket = xzalloc_cacheline(sizeof *bucket); - ovs_mutex_init(&bucket->mutex); - return bucket; + struct dp_netdev_pmd_thread *next; + + do { + struct cmap_node *node; + + node = cmap_next_position(&dp->poll_threads, pos); + next = node ? CONTAINER_OF(node, struct dp_netdev_pmd_thread, node) + : NULL; + } while (next && !dp_netdev_pmd_try_ref(next)); + + return next; } +/* Configures the 'pmd' based on the input argument. */ static void -dp_netdev_flow_used(struct dp_netdev_flow *netdev_flow, - const struct ofpbuf *packet, - const struct miniflow *key) -{ - uint16_t tcp_flags = miniflow_get_tcp_flags(key); - long long int now = time_msec(); - struct dp_netdev_flow_stats *bucket; +dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd, struct dp_netdev *dp, + int index, int core_id, int numa_id) +{ + pmd->dp = dp; + pmd->index = index; + pmd->core_id = core_id; + pmd->numa_id = numa_id; + + ovs_refcount_init(&pmd->ref_cnt); + latch_init(&pmd->exit_latch); + atomic_init(&pmd->change_seq, PMD_INITIAL_SEQ); + xpthread_cond_init(&pmd->cond, NULL); + ovs_mutex_init(&pmd->cond_mutex); + ovs_mutex_init(&pmd->flow_mutex); + dpcls_init(&pmd->cls); + cmap_init(&pmd->flow_table); + /* init the 'flow_cache' since there is no + * actual thread created for NON_PMD_CORE_ID. */ + if (core_id == NON_PMD_CORE_ID) { + emc_cache_init(&pmd->flow_cache); + } + cmap_insert(&dp->poll_threads, CONST_CAST(struct cmap_node *, &pmd->node), + hash_int(core_id, 0)); +} - bucket = ovsthread_stats_bucket_get(&netdev_flow->stats, - dp_netdev_flow_stats_new_cb); +static void +dp_netdev_destroy_pmd(struct dp_netdev_pmd_thread *pmd) +{ + dp_netdev_pmd_flow_flush(pmd); + dpcls_destroy(&pmd->cls); + cmap_destroy(&pmd->flow_table); + ovs_mutex_destroy(&pmd->flow_mutex); + latch_destroy(&pmd->exit_latch); + xpthread_cond_destroy(&pmd->cond); + ovs_mutex_destroy(&pmd->cond_mutex); + free(pmd); +} - ovs_mutex_lock(&bucket->mutex); - bucket->used = MAX(now, bucket->used); - bucket->packet_count++; - bucket->byte_count += ofpbuf_size(packet); - bucket->tcp_flags |= tcp_flags; - ovs_mutex_unlock(&bucket->mutex); +/* Stops the pmd thread, removes it from the 'dp->poll_threads', + * and unrefs the struct. */ +static void +dp_netdev_del_pmd(struct dp_netdev_pmd_thread *pmd) +{ + /* Uninit the 'flow_cache' since there is + * no actual thread uninit it for NON_PMD_CORE_ID. */ + if (pmd->core_id == NON_PMD_CORE_ID) { + emc_cache_uninit(&pmd->flow_cache); + } else { + latch_set(&pmd->exit_latch); + dp_netdev_reload_pmd__(pmd); + ovs_numa_unpin_core(pmd->core_id); + xpthread_join(pmd->thread, NULL); + } + cmap_remove(&pmd->dp->poll_threads, &pmd->node, hash_int(pmd->core_id, 0)); + dp_netdev_pmd_unref(pmd); } -static void * -dp_netdev_stats_new_cb(void) +/* Destroys all pmd threads. */ +static void +dp_netdev_destroy_all_pmds(struct dp_netdev *dp) { - struct dp_netdev_stats *bucket = xzalloc_cacheline(sizeof *bucket); - ovs_mutex_init(&bucket->mutex); - return bucket; + struct dp_netdev_pmd_thread *pmd; + + CMAP_FOR_EACH (pmd, node, &dp->poll_threads) { + dp_netdev_del_pmd(pmd); + } } +/* Deletes all pmd threads on numa node 'numa_id'. */ static void -dp_netdev_count_packet(struct dp_netdev *dp, enum dp_stat_type type) +dp_netdev_del_pmds_on_numa(struct dp_netdev *dp, int numa_id) { - struct dp_netdev_stats *bucket; + struct dp_netdev_pmd_thread *pmd; - bucket = ovsthread_stats_bucket_get(&dp->stats, dp_netdev_stats_new_cb); - ovs_mutex_lock(&bucket->mutex); - bucket->n[type]++; - ovs_mutex_unlock(&bucket->mutex); + CMAP_FOR_EACH (pmd, node, &dp->poll_threads) { + if (pmd->numa_id == numa_id) { + dp_netdev_del_pmd(pmd); + } + } } +/* Checks the numa node id of 'netdev' and starts pmd threads for + * the numa node. */ static void -dp_netdev_input(struct dp_netdev *dp, struct ofpbuf *packet, - struct pkt_metadata *md) - OVS_REQ_RDLOCK(dp->port_rwlock) +dp_netdev_set_pmds_on_numa(struct dp_netdev *dp, int numa_id) { - struct dp_netdev_flow *netdev_flow; - struct { - struct miniflow flow; - uint32_t buf[FLOW_U32S]; - } key; + int n_pmds; - if (ofpbuf_size(packet) < ETH_HEADER_LEN) { - ofpbuf_delete(packet); - return; + if (!ovs_numa_numa_id_is_valid(numa_id)) { + VLOG_ERR("Cannot create pmd threads due to numa id (%d)" + "invalid", numa_id); + return ; } - miniflow_initialize(&key.flow, key.buf); - miniflow_extract(packet, md, &key.flow); - netdev_flow = dp_netdev_lookup_flow(dp, &key.flow); - if (netdev_flow) { - struct dp_netdev_actions *actions; + n_pmds = get_n_pmd_threads_on_numa(dp, numa_id); - dp_netdev_flow_used(netdev_flow, packet, &key.flow); + /* If there are already pmd threads created for the numa node + * in which 'netdev' is on, do nothing. Else, creates the + * pmd threads for the numa node. */ + if (!n_pmds) { + int can_have, n_unpinned, i; - actions = dp_netdev_flow_get_actions(netdev_flow); - dp_netdev_execute_actions(dp, &key.flow, packet, true, md, - actions->actions, actions->size); - dp_netdev_count_packet(dp, DP_STAT_HIT); - } else if (dp->handler_queues) { - dp_netdev_count_packet(dp, DP_STAT_MISS); - dp_netdev_output_userspace(dp, packet, - miniflow_hash_5tuple(&key.flow, 0) - % dp->n_handlers, - DPIF_UC_MISS, &key.flow, NULL); - ofpbuf_delete(packet); + n_unpinned = ovs_numa_get_n_unpinned_cores_on_numa(numa_id); + if (!n_unpinned) { + VLOG_ERR("Cannot create pmd threads due to out of unpinned " + "cores on numa node"); + return; + } + + /* If cpu mask is specified, uses all unpinned cores, otherwise + * tries creating NR_PMD_THREADS pmd threads. */ + can_have = dp->pmd_cmask ? n_unpinned : MIN(n_unpinned, NR_PMD_THREADS); + for (i = 0; i < can_have; i++) { + struct dp_netdev_pmd_thread *pmd = xzalloc(sizeof *pmd); + int core_id = ovs_numa_get_unpinned_core_on_numa(numa_id); + + dp_netdev_configure_pmd(pmd, dp, i, core_id, numa_id); + /* Each thread will distribute all devices rx-queues among + * themselves. */ + pmd->thread = ovs_thread_create("pmd", pmd_thread_main, pmd); + } + VLOG_INFO("Created %d pmd threads on numa node %d", can_have, numa_id); } } + +/* Called after pmd threads config change. Restarts pmd threads with + * new configuration. */ static void -dp_netdev_port_input(struct dp_netdev *dp, struct ofpbuf *packet, - struct pkt_metadata *md) - OVS_REQ_RDLOCK(dp->port_rwlock) +dp_netdev_reset_pmd_threads(struct dp_netdev *dp) { - uint32_t *recirc_depth = recirc_depth_get(); + struct dp_netdev_port *port; - *recirc_depth = 0; - dp_netdev_input(dp, packet, md); + CMAP_FOR_EACH (port, node, &dp->ports) { + if (netdev_is_pmd(port->netdev)) { + int numa_id = netdev_get_numa_id(port->netdev); + + dp_netdev_set_pmds_on_numa(dp, numa_id); + } + } +} + +static char * +dpif_netdev_get_datapath_version(void) +{ + return xstrdup(""); +} + +static void +dp_netdev_flow_used(struct dp_netdev_flow *netdev_flow, int cnt, int size, + uint16_t tcp_flags, long long now) +{ + uint16_t flags; + + atomic_store_relaxed(&netdev_flow->stats.used, now); + non_atomic_ullong_add(&netdev_flow->stats.packet_count, cnt); + non_atomic_ullong_add(&netdev_flow->stats.byte_count, size); + atomic_read_relaxed(&netdev_flow->stats.tcp_flags, &flags); + flags |= tcp_flags; + atomic_store_relaxed(&netdev_flow->stats.tcp_flags, flags); +} + +static void +dp_netdev_count_packet(struct dp_netdev_pmd_thread *pmd, + enum dp_stat_type type, int cnt) +{ + non_atomic_ullong_add(&pmd->stats.n[type], cnt); } static int -dp_netdev_output_userspace(struct dp_netdev *dp, struct ofpbuf *packet, - int queue_no, int type, const struct miniflow *key, - const struct nlattr *userdata) +dp_netdev_upcall(struct dp_netdev_pmd_thread *pmd, struct dp_packet *packet_, + struct flow *flow, struct flow_wildcards *wc, ovs_u128 *ufid, + enum dpif_upcall_type type, const struct nlattr *userdata, + struct ofpbuf *actions, struct ofpbuf *put_actions) { - struct dp_netdev_queue *q; - int error; + struct dp_netdev *dp = pmd->dp; + + if (OVS_UNLIKELY(!dp->upcall_cb)) { + return ENODEV; + } + + if (OVS_UNLIKELY(!VLOG_DROP_DBG(&upcall_rl))) { + struct ds ds = DS_EMPTY_INITIALIZER; + char *packet_str; + struct ofpbuf key; + + ofpbuf_init(&key, 0); + odp_flow_key_from_flow(&key, flow, &wc->masks, flow->in_port.odp_port, + true); + packet_str = ofp_packet_to_string(dp_packet_data(packet_), + dp_packet_size(packet_)); + + odp_flow_key_format(key.data, key.size, &ds); + + VLOG_DBG("%s: %s upcall:\n%s\n%s", dp->name, + dpif_upcall_type_to_string(type), ds_cstr(&ds), packet_str); + + ofpbuf_uninit(&key); + free(packet_str); + + ds_destroy(&ds); + } + + return dp->upcall_cb(packet_, flow, ufid, pmd->core_id, type, userdata, + actions, wc, put_actions, dp->upcall_aux); +} + +static inline uint32_t +dpif_netdev_packet_get_dp_hash(struct dp_packet *packet, + const struct miniflow *mf) +{ + uint32_t hash; + + hash = dp_packet_get_rss_hash(packet); + if (OVS_UNLIKELY(!hash)) { + hash = miniflow_hash_5tuple(mf, 0); + dp_packet_set_rss_hash(packet, hash); + } + return hash; +} + +struct packet_batch { + unsigned int packet_count; + unsigned int byte_count; + uint16_t tcp_flags; + + struct dp_netdev_flow *flow; + + struct dp_packet *packets[NETDEV_MAX_RX_BATCH]; +}; + +static inline void +packet_batch_update(struct packet_batch *batch, struct dp_packet *packet, + const struct miniflow *mf) +{ + batch->tcp_flags |= miniflow_get_tcp_flags(mf); + batch->packets[batch->packet_count++] = packet; + batch->byte_count += dp_packet_size(packet); +} + +static inline void +packet_batch_init(struct packet_batch *batch, struct dp_netdev_flow *flow) +{ + batch->flow = flow; + + batch->packet_count = 0; + batch->byte_count = 0; + batch->tcp_flags = 0; +} + +static inline void +packet_batch_execute(struct packet_batch *batch, + struct dp_netdev_pmd_thread *pmd, + enum dp_stat_type hit_type, + long long now) +{ + struct dp_netdev_actions *actions; + struct dp_netdev_flow *flow = batch->flow; - fat_rwlock_rdlock(&dp->queue_rwlock); - q = &dp->handler_queues[queue_no]; - ovs_mutex_lock(&q->mutex); - if (q->head - q->tail < MAX_QUEUE_LEN) { - struct dp_netdev_upcall *u = &q->upcalls[q->head++ & QUEUE_MASK]; - struct dpif_upcall *upcall = &u->upcall; - struct ofpbuf *buf = &u->buf; - size_t buf_size; - struct flow flow; + dp_netdev_flow_used(batch->flow, batch->packet_count, batch->byte_count, + batch->tcp_flags, now); - upcall->type = type; + actions = dp_netdev_flow_get_actions(flow); + + dp_netdev_execute_actions(pmd, batch->packets, batch->packet_count, true, + actions->actions, actions->size); + + dp_netdev_count_packet(pmd, hit_type, batch->packet_count); +} - /* Allocate buffer big enough for everything. */ - buf_size = ODPUTIL_FLOW_KEY_BYTES; - if (userdata) { - buf_size += NLA_ALIGN(userdata->nla_len); +static inline bool +dp_netdev_queue_batches(struct dp_packet *pkt, + struct dp_netdev_flow *flow, const struct miniflow *mf, + struct packet_batch *batches, size_t *n_batches, + size_t max_batches) +{ + struct packet_batch *batch = NULL; + int j; + + if (OVS_UNLIKELY(!flow)) { + return false; + } + /* XXX: This O(n^2) algortihm makes sense if we're operating under the + * assumption that the number of distinct flows (and therefore the + * number of distinct batches) is quite small. If this turns out not + * to be the case, it may make sense to pre sort based on the + * netdev_flow pointer. That done we can get the appropriate batching + * in O(n * log(n)) instead. */ + for (j = *n_batches - 1; j >= 0; j--) { + if (batches[j].flow == flow) { + batch = &batches[j]; + packet_batch_update(batch, pkt, mf); + return true; } - buf_size += ofpbuf_size(packet); - ofpbuf_init(buf, buf_size); + } + if (OVS_UNLIKELY(*n_batches >= max_batches)) { + return false; + } + + batch = &batches[(*n_batches)++]; + packet_batch_init(batch, flow); + packet_batch_update(batch, pkt, mf); + return true; +} - /* Put ODP flow. */ - miniflow_expand(key, &flow); - odp_flow_key_from_flow(buf, &flow, NULL, flow.in_port.odp_port); - upcall->key = ofpbuf_data(buf); - upcall->key_len = ofpbuf_size(buf); +static inline void +dp_packet_swap(struct dp_packet **a, struct dp_packet **b) +{ + struct dp_packet *tmp = *a; + *a = *b; + *b = tmp; +} - /* Put userdata. */ - if (userdata) { - upcall->userdata = ofpbuf_put(buf, userdata, - NLA_ALIGN(userdata->nla_len)); +/* Try to process all ('cnt') the 'packets' using only the exact match cache + * 'flow_cache'. If a flow is not found for a packet 'packets[i]', or if there + * is no matching batch for a packet's flow, the miniflow is copied into 'keys' + * and the packet pointer is moved at the beginning of the 'packets' array. + * + * The function returns the number of packets that needs to be processed in the + * 'packets' array (they have been moved to the beginning of the vector). + */ +static inline size_t +emc_processing(struct dp_netdev_pmd_thread *pmd, struct dp_packet **packets, + size_t cnt, struct netdev_flow_key *keys, long long now) +{ + struct netdev_flow_key key; + struct packet_batch batches[4]; + struct emc_cache *flow_cache = &pmd->flow_cache; + size_t n_batches, i; + size_t notfound_cnt = 0; + + n_batches = 0; + miniflow_initialize(&key.mf, key.buf); + for (i = 0; i < cnt; i++) { + struct dp_netdev_flow *flow; + + if (OVS_UNLIKELY(dp_packet_size(packets[i]) < ETH_HEADER_LEN)) { + dp_packet_delete(packets[i]); + continue; } - ofpbuf_set_data(&upcall->packet, - ofpbuf_put(buf, ofpbuf_data(packet), ofpbuf_size(packet))); - ofpbuf_set_size(&upcall->packet, ofpbuf_size(packet)); + miniflow_extract(packets[i], &key.mf); + key.len = 0; /* Not computed yet. */ + key.hash = dpif_netdev_packet_get_dp_hash(packets[i], &key.mf); - seq_change(q->seq); + flow = emc_lookup(flow_cache, &key); + if (OVS_UNLIKELY(!dp_netdev_queue_batches(packets[i], flow, &key.mf, + batches, &n_batches, + ARRAY_SIZE(batches)))) { + if (i != notfound_cnt) { + dp_packet_swap(&packets[i], &packets[notfound_cnt]); + } - error = 0; - } else { - dp_netdev_count_packet(dp, DP_STAT_LOST); - error = ENOBUFS; + keys[notfound_cnt++] = key; + } } - ovs_mutex_unlock(&q->mutex); - fat_rwlock_unlock(&dp->queue_rwlock); - return error; + for (i = 0; i < n_batches; i++) { + packet_batch_execute(&batches[i], pmd, DP_STAT_EXACT_HIT, now); + } + + return notfound_cnt; +} + +static inline void +fast_path_processing(struct dp_netdev_pmd_thread *pmd, + struct dp_packet **packets, size_t cnt, + struct netdev_flow_key *keys, long long now) +{ +#if !defined(__CHECKER__) && !defined(_WIN32) + const size_t PKT_ARRAY_SIZE = cnt; +#else + /* Sparse or MSVC doesn't like variable length array. */ + enum { PKT_ARRAY_SIZE = NETDEV_MAX_RX_BATCH }; +#endif + struct packet_batch batches[PKT_ARRAY_SIZE]; + struct dpcls_rule *rules[PKT_ARRAY_SIZE]; + struct dp_netdev *dp = pmd->dp; + struct emc_cache *flow_cache = &pmd->flow_cache; + size_t n_batches, i; + bool any_miss; + + for (i = 0; i < cnt; i++) { + /* Key length is needed in all the cases, hash computed on demand. */ + keys[i].len = netdev_flow_key_size(count_1bits(keys[i].mf.map)); + } + any_miss = !dpcls_lookup(&pmd->cls, keys, rules, cnt); + if (OVS_UNLIKELY(any_miss) && !fat_rwlock_tryrdlock(&dp->upcall_rwlock)) { + uint64_t actions_stub[512 / 8], slow_stub[512 / 8]; + struct ofpbuf actions, put_actions; + int miss_cnt = 0, lost_cnt = 0; + ovs_u128 ufid; + + ofpbuf_use_stub(&actions, actions_stub, sizeof actions_stub); + ofpbuf_use_stub(&put_actions, slow_stub, sizeof slow_stub); + + for (i = 0; i < cnt; i++) { + struct dp_netdev_flow *netdev_flow; + struct ofpbuf *add_actions; + struct match match; + int error; + + if (OVS_LIKELY(rules[i])) { + continue; + } + + /* It's possible that an earlier slow path execution installed + * a rule covering this flow. In this case, it's a lot cheaper + * to catch it here than execute a miss. */ + netdev_flow = dp_netdev_pmd_lookup_flow(pmd, &keys[i]); + if (netdev_flow) { + rules[i] = &netdev_flow->cr; + continue; + } + + miss_cnt++; + + miniflow_expand(&keys[i].mf, &match.flow); + + ofpbuf_clear(&actions); + ofpbuf_clear(&put_actions); + + dpif_flow_hash(dp->dpif, &match.flow, sizeof match.flow, &ufid); + error = dp_netdev_upcall(pmd, packets[i], &match.flow, &match.wc, + &ufid, DPIF_UC_MISS, NULL, &actions, + &put_actions); + if (OVS_UNLIKELY(error && error != ENOSPC)) { + dp_packet_delete(packets[i]); + lost_cnt++; + continue; + } + + /* We can't allow the packet batching in the next loop to execute + * the actions. Otherwise, if there are any slow path actions, + * we'll send the packet up twice. */ + dp_netdev_execute_actions(pmd, &packets[i], 1, true, + actions.data, actions.size); + + add_actions = put_actions.size ? &put_actions : &actions; + if (OVS_LIKELY(error != ENOSPC)) { + /* XXX: There's a race window where a flow covering this packet + * could have already been installed since we last did the flow + * lookup before upcall. This could be solved by moving the + * mutex lock outside the loop, but that's an awful long time + * to be locking everyone out of making flow installs. If we + * move to a per-core classifier, it would be reasonable. */ + ovs_mutex_lock(&pmd->flow_mutex); + netdev_flow = dp_netdev_pmd_lookup_flow(pmd, &keys[i]); + if (OVS_LIKELY(!netdev_flow)) { + netdev_flow = dp_netdev_flow_add(pmd, &match, &ufid, + add_actions->data, + add_actions->size); + } + ovs_mutex_unlock(&pmd->flow_mutex); + + emc_insert(flow_cache, &keys[i], netdev_flow); + } + } + + ofpbuf_uninit(&actions); + ofpbuf_uninit(&put_actions); + fat_rwlock_unlock(&dp->upcall_rwlock); + dp_netdev_count_packet(pmd, DP_STAT_MISS, miss_cnt); + dp_netdev_count_packet(pmd, DP_STAT_LOST, lost_cnt); + } else if (OVS_UNLIKELY(any_miss)) { + int dropped_cnt = 0; + + for (i = 0; i < cnt; i++) { + if (OVS_UNLIKELY(!rules[i])) { + dp_packet_delete(packets[i]); + dropped_cnt++; + } + } + + dp_netdev_count_packet(pmd, DP_STAT_MISS, dropped_cnt); + dp_netdev_count_packet(pmd, DP_STAT_LOST, dropped_cnt); + } + + n_batches = 0; + for (i = 0; i < cnt; i++) { + struct dp_packet *packet = packets[i]; + struct dp_netdev_flow *flow; + + if (OVS_UNLIKELY(!rules[i])) { + continue; + } + + flow = dp_netdev_flow_cast(rules[i]); + + emc_insert(flow_cache, &keys[i], flow); + dp_netdev_queue_batches(packet, flow, &keys[i].mf, batches, + &n_batches, ARRAY_SIZE(batches)); + } + + for (i = 0; i < n_batches; i++) { + packet_batch_execute(&batches[i], pmd, DP_STAT_MASKED_HIT, now); + } +} + +static void +dp_netdev_input(struct dp_netdev_pmd_thread *pmd, + struct dp_packet **packets, int cnt) +{ +#if !defined(__CHECKER__) && !defined(_WIN32) + const size_t PKT_ARRAY_SIZE = cnt; +#else + /* Sparse or MSVC doesn't like variable length array. */ + enum { PKT_ARRAY_SIZE = NETDEV_MAX_RX_BATCH }; +#endif + struct netdev_flow_key keys[PKT_ARRAY_SIZE]; + long long now = time_msec(); + size_t newcnt; + + newcnt = emc_processing(pmd, packets, cnt, keys, now); + if (OVS_UNLIKELY(newcnt)) { + fast_path_processing(pmd, packets, newcnt, keys, now); + } } struct dp_netdev_execute_aux { - struct dp_netdev *dp; - const struct miniflow *key; + struct dp_netdev_pmd_thread *pmd; }; static void -dp_execute_cb(void *aux_, struct ofpbuf *packet, - struct pkt_metadata *md, +dpif_netdev_register_upcall_cb(struct dpif *dpif, upcall_callback *cb, + void *aux) +{ + struct dp_netdev *dp = get_dp_netdev(dpif); + dp->upcall_aux = aux; + dp->upcall_cb = cb; +} + +static void +dp_netdev_drop_packets(struct dp_packet ** packets, int cnt, bool may_steal) +{ + if (may_steal) { + int i; + + for (i = 0; i < cnt; i++) { + dp_packet_delete(packets[i]); + } + } +} + +static int +push_tnl_action(const struct dp_netdev *dp, + const struct nlattr *attr, + struct dp_packet **packets, int cnt) +{ + struct dp_netdev_port *tun_port; + const struct ovs_action_push_tnl *data; + + data = nl_attr_get(attr); + + tun_port = dp_netdev_lookup_port(dp, u32_to_odp(data->tnl_port)); + if (!tun_port) { + return -EINVAL; + } + netdev_push_header(tun_port->netdev, packets, cnt, data); + + return 0; +} + +static void +dp_netdev_clone_pkt_batch(struct dp_packet **tnl_pkt, + struct dp_packet **packets, int cnt) +{ + int i; + + for (i = 0; i < cnt; i++) { + tnl_pkt[i] = dp_packet_clone(packets[i]); + } +} + +static void +dp_execute_cb(void *aux_, struct dp_packet **packets, int cnt, const struct nlattr *a, bool may_steal) OVS_NO_THREAD_SAFETY_ANALYSIS { struct dp_netdev_execute_aux *aux = aux_; + uint32_t *depth = recirc_depth_get(); + struct dp_netdev_pmd_thread *pmd= aux->pmd; + struct dp_netdev *dp= pmd->dp; int type = nl_attr_type(a); struct dp_netdev_port *p; - uint32_t *depth = recirc_depth_get(); + int i; switch ((enum ovs_action_attr)type) { case OVS_ACTION_ATTR_OUTPUT: - p = dp_netdev_lookup_port(aux->dp, u32_to_odp(nl_attr_get_u32(a))); - if (p) { - netdev_send(p->netdev, packet, may_steal); + p = dp_netdev_lookup_port(dp, u32_to_odp(nl_attr_get_u32(a))); + if (OVS_LIKELY(p)) { + netdev_send(p->netdev, pmd->core_id, packets, cnt, may_steal); + return; } break; - case OVS_ACTION_ATTR_USERSPACE: { - const struct nlattr *userdata; - - userdata = nl_attr_find_nested(a, OVS_USERSPACE_ATTR_USERDATA); + case OVS_ACTION_ATTR_TUNNEL_PUSH: + if (*depth < MAX_RECIRC_DEPTH) { + struct dp_packet *tnl_pkt[NETDEV_MAX_RX_BATCH]; + int err; - dp_netdev_output_userspace(aux->dp, packet, - miniflow_hash_5tuple(aux->key, 0) - % aux->dp->n_handlers, - DPIF_UC_ACTION, aux->key, - userdata); + if (!may_steal) { + dp_netdev_clone_pkt_batch(tnl_pkt, packets, cnt); + packets = tnl_pkt; + } - if (may_steal) { - ofpbuf_delete(packet); + err = push_tnl_action(dp, a, packets, cnt); + if (!err) { + (*depth)++; + dp_netdev_input(pmd, packets, cnt); + (*depth)--; + } else { + dp_netdev_drop_packets(tnl_pkt, cnt, !may_steal); + } + return; } break; - } - case OVS_ACTION_ATTR_HASH: { - const struct ovs_action_hash *hash_act; - uint32_t hash; + case OVS_ACTION_ATTR_TUNNEL_POP: + if (*depth < MAX_RECIRC_DEPTH) { + odp_port_t portno = u32_to_odp(nl_attr_get_u32(a)); - hash_act = nl_attr_get(a); - if (hash_act->hash_alg == OVS_HASH_ALG_L4) { - /* Hash need not be symmetric, nor does it need to include - * L2 fields. */ - hash = miniflow_hash_5tuple(aux->key, hash_act->hash_basis); - if (!hash) { - hash = 1; /* 0 is not valid */ - } + p = dp_netdev_lookup_port(dp, portno); + if (p) { + struct dp_packet *tnl_pkt[NETDEV_MAX_RX_BATCH]; + int err; - } else { - VLOG_WARN("Unknown hash algorithm specified for the hash action."); - hash = 2; + if (!may_steal) { + dp_netdev_clone_pkt_batch(tnl_pkt, packets, cnt); + packets = tnl_pkt; + } + + err = netdev_pop_header(p->netdev, packets, cnt); + if (!err) { + + for (i = 0; i < cnt; i++) { + packets[i]->md.in_port.odp_port = portno; + } + + (*depth)++; + dp_netdev_input(pmd, packets, cnt); + (*depth)--; + } else { + dp_netdev_drop_packets(tnl_pkt, cnt, !may_steal); + } + return; + } } + break; + + case OVS_ACTION_ATTR_USERSPACE: + if (!fat_rwlock_tryrdlock(&dp->upcall_rwlock)) { + const struct nlattr *userdata; + struct ofpbuf actions; + struct flow flow; + ovs_u128 ufid; + + userdata = nl_attr_find_nested(a, OVS_USERSPACE_ATTR_USERDATA); + ofpbuf_init(&actions, 0); + + for (i = 0; i < cnt; i++) { + int error; + + ofpbuf_clear(&actions); + + flow_extract(packets[i], &flow); + dpif_flow_hash(dp->dpif, &flow, sizeof flow, &ufid); + error = dp_netdev_upcall(pmd, packets[i], &flow, NULL, &ufid, + DPIF_UC_ACTION, userdata,&actions, + NULL); + if (!error || error == ENOSPC) { + dp_netdev_execute_actions(pmd, &packets[i], 1, may_steal, + actions.data, actions.size); + } else if (may_steal) { + dp_packet_delete(packets[i]); + } + } + ofpbuf_uninit(&actions); + fat_rwlock_unlock(&dp->upcall_rwlock); - md->dp_hash = hash; + return; + } break; - } case OVS_ACTION_ATTR_RECIRC: if (*depth < MAX_RECIRC_DEPTH) { - struct pkt_metadata recirc_md = *md; - struct ofpbuf *recirc_packet; - - recirc_packet = may_steal ? packet : ofpbuf_clone(packet); - recirc_md.recirc_id = nl_attr_get_u32(a); (*depth)++; - dp_netdev_input(aux->dp, recirc_packet, &recirc_md); + for (i = 0; i < cnt; i++) { + struct dp_packet *recirc_pkt; + + recirc_pkt = (may_steal) ? packets[i] + : dp_packet_clone(packets[i]); + + recirc_pkt->md.recirc_id = nl_attr_get_u32(a); + + dp_netdev_input(pmd, &recirc_pkt, 1); + } (*depth)--; - break; - } else { - VLOG_WARN("Packet dropped. Max recirculation depth exceeded."); + return; } + + VLOG_WARN("Packet dropped. Max recirculation depth exceeded."); break; case OVS_ACTION_ATTR_PUSH_VLAN: @@ -2197,27 +3510,32 @@ dp_execute_cb(void *aux_, struct ofpbuf *packet, case OVS_ACTION_ATTR_PUSH_MPLS: case OVS_ACTION_ATTR_POP_MPLS: case OVS_ACTION_ATTR_SET: + case OVS_ACTION_ATTR_SET_MASKED: case OVS_ACTION_ATTR_SAMPLE: + case OVS_ACTION_ATTR_HASH: case OVS_ACTION_ATTR_UNSPEC: case __OVS_ACTION_ATTR_MAX: OVS_NOT_REACHED(); } + + dp_netdev_drop_packets(packets, cnt, may_steal); } static void -dp_netdev_execute_actions(struct dp_netdev *dp, const struct miniflow *key, - struct ofpbuf *packet, bool may_steal, - struct pkt_metadata *md, +dp_netdev_execute_actions(struct dp_netdev_pmd_thread *pmd, + struct dp_packet **packets, int cnt, + bool may_steal, const struct nlattr *actions, size_t actions_len) { - struct dp_netdev_execute_aux aux = {dp, key}; + struct dp_netdev_execute_aux aux = { pmd }; - odp_execute_actions(&aux, packet, may_steal, md, - actions, actions_len, dp_execute_cb); + odp_execute_actions(&aux, packets, cnt, may_steal, actions, + actions_len, dp_execute_cb); } const struct dpif_class dpif_netdev_class = { "netdev", + dpif_netdev_init, dpif_netdev_enumerate, dpif_netdev_port_open_type, dpif_netdev_open, @@ -2236,31 +3554,32 @@ const struct dpif_class dpif_netdev_class = { dpif_netdev_port_dump_done, dpif_netdev_port_poll, dpif_netdev_port_poll_wait, - dpif_netdev_flow_get, - dpif_netdev_flow_put, - dpif_netdev_flow_del, dpif_netdev_flow_flush, - dpif_netdev_flow_dump_state_init, - dpif_netdev_flow_dump_start, + dpif_netdev_flow_dump_create, + dpif_netdev_flow_dump_destroy, + dpif_netdev_flow_dump_thread_create, + dpif_netdev_flow_dump_thread_destroy, dpif_netdev_flow_dump_next, - NULL, - dpif_netdev_flow_dump_done, - dpif_netdev_flow_dump_state_uninit, - dpif_netdev_execute, - NULL, /* operate */ - dpif_netdev_recv_set, - dpif_netdev_handlers_set, + dpif_netdev_operate, + NULL, /* recv_set */ + NULL, /* handlers_set */ + dpif_netdev_pmd_set, dpif_netdev_queue_to_priority, - dpif_netdev_recv, - dpif_netdev_recv_wait, - dpif_netdev_recv_purge, + NULL, /* recv */ + NULL, /* recv_wait */ + NULL, /* recv_purge */ + dpif_netdev_register_upcall_cb, + dpif_netdev_enable_upcall, + dpif_netdev_disable_upcall, + dpif_netdev_get_datapath_version, }; static void dpif_dummy_change_port_number(struct unixctl_conn *conn, int argc OVS_UNUSED, const char *argv[], void *aux OVS_UNUSED) { - struct dp_netdev_port *port; + struct dp_netdev_port *old_port; + struct dp_netdev_port *new_port; struct dp_netdev *dp; odp_port_t port_no; @@ -2274,8 +3593,8 @@ dpif_dummy_change_port_number(struct unixctl_conn *conn, int argc OVS_UNUSED, ovs_refcount_ref(&dp->ref_cnt); ovs_mutex_unlock(&dp_netdev_mutex); - ovs_rwlock_wrlock(&dp->port_rwlock); - if (get_port_by_name(dp, argv[2], &port)) { + ovs_mutex_lock(&dp->port_mutex); + if (get_port_by_name(dp, argv[2], &old_port)) { unixctl_command_reply_error(conn, "unknown port"); goto exit; } @@ -2289,14 +3608,52 @@ dpif_dummy_change_port_number(struct unixctl_conn *conn, int argc OVS_UNUSED, unixctl_command_reply_error(conn, "port number already in use"); goto exit; } - hmap_remove(&dp->ports, &port->node); - port->port_no = port_no; - hmap_insert(&dp->ports, &port->node, hash_int(odp_to_u32(port_no), 0)); + + /* Remove old port. */ + cmap_remove(&dp->ports, &old_port->node, hash_port_no(old_port->port_no)); + ovsrcu_postpone(free, old_port); + + /* Insert new port (cmap semantics mean we cannot re-insert 'old_port'). */ + new_port = xmemdup(old_port, sizeof *old_port); + new_port->port_no = port_no; + cmap_insert(&dp->ports, &new_port->node, hash_port_no(port_no)); + seq_change(dp->port_seq); unixctl_command_reply(conn, NULL); exit: - ovs_rwlock_unlock(&dp->port_rwlock); + ovs_mutex_unlock(&dp->port_mutex); + dp_netdev_unref(dp); +} + +static void +dpif_dummy_delete_port(struct unixctl_conn *conn, int argc OVS_UNUSED, + const char *argv[], void *aux OVS_UNUSED) +{ + struct dp_netdev_port *port; + struct dp_netdev *dp; + + ovs_mutex_lock(&dp_netdev_mutex); + dp = shash_find_data(&dp_netdevs, argv[1]); + if (!dp || !dpif_netdev_class_is_dummy(dp->class)) { + ovs_mutex_unlock(&dp_netdev_mutex); + unixctl_command_reply_error(conn, "unknown datapath or not a dummy"); + return; + } + ovs_refcount_ref(&dp->ref_cnt); + ovs_mutex_unlock(&dp_netdev_mutex); + + ovs_mutex_lock(&dp->port_mutex); + if (get_port_by_name(dp, argv[2], &port)) { + unixctl_command_reply_error(conn, "unknown port"); + } else if (port->port_no == ODPP_LOCAL) { + unixctl_command_reply_error(conn, "can't delete local port"); + } else { + do_del_port(dp, port); + unixctl_command_reply(conn, NULL); + } + ovs_mutex_unlock(&dp->port_mutex); + dp_netdev_unref(dp); } @@ -2331,6 +3688,217 @@ dpif_dummy_register(bool override) dpif_dummy_register__("dummy"); unixctl_command_register("dpif-dummy/change-port-number", - "DP PORT NEW-NUMBER", + "dp port new-number", 3, 3, dpif_dummy_change_port_number, NULL); + unixctl_command_register("dpif-dummy/delete-port", "dp port", + 2, 2, dpif_dummy_delete_port, NULL); +} + +/* Datapath Classifier. */ + +/* A set of rules that all have the same fields wildcarded. */ +struct dpcls_subtable { + /* The fields are only used by writers. */ + struct cmap_node cmap_node OVS_GUARDED; /* Within dpcls 'subtables_map'. */ + + /* These fields are accessed by readers. */ + struct cmap rules; /* Contains "struct dpcls_rule"s. */ + struct netdev_flow_key mask; /* Wildcards for fields (const). */ + /* 'mask' must be the last field, additional space is allocated here. */ +}; + +/* Initializes 'cls' as a classifier that initially contains no classification + * rules. */ +static void +dpcls_init(struct dpcls *cls) +{ + cmap_init(&cls->subtables_map); + pvector_init(&cls->subtables); +} + +static void +dpcls_destroy_subtable(struct dpcls *cls, struct dpcls_subtable *subtable) +{ + pvector_remove(&cls->subtables, subtable); + cmap_remove(&cls->subtables_map, &subtable->cmap_node, + subtable->mask.hash); + cmap_destroy(&subtable->rules); + ovsrcu_postpone(free, subtable); +} + +/* Destroys 'cls'. Rules within 'cls', if any, are not freed; this is the + * caller's responsibility. + * May only be called after all the readers have been terminated. */ +static void +dpcls_destroy(struct dpcls *cls) +{ + if (cls) { + struct dpcls_subtable *subtable; + + CMAP_FOR_EACH (subtable, cmap_node, &cls->subtables_map) { + dpcls_destroy_subtable(cls, subtable); + } + cmap_destroy(&cls->subtables_map); + pvector_destroy(&cls->subtables); + } +} + +static struct dpcls_subtable * +dpcls_create_subtable(struct dpcls *cls, const struct netdev_flow_key *mask) +{ + struct dpcls_subtable *subtable; + + /* Need to add one. */ + subtable = xmalloc(sizeof *subtable + - sizeof subtable->mask.mf + mask->len); + cmap_init(&subtable->rules); + netdev_flow_key_clone(&subtable->mask, mask); + cmap_insert(&cls->subtables_map, &subtable->cmap_node, mask->hash); + pvector_insert(&cls->subtables, subtable, 0); + pvector_publish(&cls->subtables); + + return subtable; +} + +static inline struct dpcls_subtable * +dpcls_find_subtable(struct dpcls *cls, const struct netdev_flow_key *mask) +{ + struct dpcls_subtable *subtable; + + CMAP_FOR_EACH_WITH_HASH (subtable, cmap_node, mask->hash, + &cls->subtables_map) { + if (netdev_flow_key_equal(&subtable->mask, mask)) { + return subtable; + } + } + return dpcls_create_subtable(cls, mask); +} + +/* Insert 'rule' into 'cls'. */ +static void +dpcls_insert(struct dpcls *cls, struct dpcls_rule *rule, + const struct netdev_flow_key *mask) +{ + struct dpcls_subtable *subtable = dpcls_find_subtable(cls, mask); + + rule->mask = &subtable->mask; + cmap_insert(&subtable->rules, &rule->cmap_node, rule->flow.hash); +} + +/* Removes 'rule' from 'cls', also destructing the 'rule'. */ +static void +dpcls_remove(struct dpcls *cls, struct dpcls_rule *rule) +{ + struct dpcls_subtable *subtable; + + ovs_assert(rule->mask); + + INIT_CONTAINER(subtable, rule->mask, mask); + + if (cmap_remove(&subtable->rules, &rule->cmap_node, rule->flow.hash) + == 0) { + dpcls_destroy_subtable(cls, subtable); + pvector_publish(&cls->subtables); + } +} + +/* Returns true if 'target' satisifies 'key' in 'mask', that is, if each 1-bit + * in 'mask' the values in 'key' and 'target' are the same. + * + * Note: 'key' and 'mask' have the same mask, and 'key' is already masked. */ +static inline bool +dpcls_rule_matches_key(const struct dpcls_rule *rule, + const struct netdev_flow_key *target) +{ + const uint64_t *keyp = rule->flow.mf.inline_values; + const uint64_t *maskp = rule->mask->mf.inline_values; + uint64_t target_u64; + + NETDEV_FLOW_KEY_FOR_EACH_IN_MAP(target_u64, target, rule->flow.mf.map) { + if (OVS_UNLIKELY((target_u64 & *maskp++) != *keyp++)) { + return false; + } + } + return true; +} + +/* For each miniflow in 'flows' performs a classifier lookup writing the result + * into the corresponding slot in 'rules'. If a particular entry in 'flows' is + * NULL it is skipped. + * + * This function is optimized for use in the userspace datapath and therefore + * does not implement a lot of features available in the standard + * classifier_lookup() function. Specifically, it does not implement + * priorities, instead returning any rule which matches the flow. + * + * Returns true if all flows found a corresponding rule. */ +static bool +dpcls_lookup(const struct dpcls *cls, const struct netdev_flow_key keys[], + struct dpcls_rule **rules, const size_t cnt) +{ + /* The batch size 16 was experimentally found faster than 8 or 32. */ + typedef uint16_t map_type; +#define MAP_BITS (sizeof(map_type) * CHAR_BIT) + +#if !defined(__CHECKER__) && !defined(_WIN32) + const int N_MAPS = DIV_ROUND_UP(cnt, MAP_BITS); +#else + enum { N_MAPS = DIV_ROUND_UP(NETDEV_MAX_RX_BATCH, MAP_BITS) }; +#endif + map_type maps[N_MAPS]; + struct dpcls_subtable *subtable; + + memset(maps, 0xff, sizeof maps); + if (cnt % MAP_BITS) { + maps[N_MAPS - 1] >>= MAP_BITS - cnt % MAP_BITS; /* Clear extra bits. */ + } + memset(rules, 0, cnt * sizeof *rules); + + PVECTOR_FOR_EACH (subtable, &cls->subtables) { + const struct netdev_flow_key *mkeys = keys; + struct dpcls_rule **mrules = rules; + map_type remains = 0; + int m; + + BUILD_ASSERT_DECL(sizeof remains == sizeof *maps); + + for (m = 0; m < N_MAPS; m++, mkeys += MAP_BITS, mrules += MAP_BITS) { + uint32_t hashes[MAP_BITS]; + const struct cmap_node *nodes[MAP_BITS]; + unsigned long map = maps[m]; + int i; + + if (!map) { + continue; /* Skip empty maps. */ + } + + /* Compute hashes for the remaining keys. */ + ULONG_FOR_EACH_1(i, map) { + hashes[i] = netdev_flow_key_hash_in_mask(&mkeys[i], + &subtable->mask); + } + /* Lookup. */ + map = cmap_find_batch(&subtable->rules, map, hashes, nodes); + /* Check results. */ + ULONG_FOR_EACH_1(i, map) { + struct dpcls_rule *rule; + + CMAP_NODE_FOR_EACH (rule, cmap_node, nodes[i]) { + if (OVS_LIKELY(dpcls_rule_matches_key(rule, &mkeys[i]))) { + mrules[i] = rule; + goto next; + } + } + ULONG_SET0(map, i); /* Did not match. */ + next: + ; /* Keep Sparse happy. */ + } + maps[m] &= ~map; /* Clear the found rules. */ + remains |= maps[m]; + } + if (!remains) { + return true; /* All found. */ + } + } + return false; /* Some misses. */ }