connmgr: Fix packet-in reason for OpenFlow1.3 table-miss flow entries.
[cascardo/ovs.git] / ofproto / ofproto-dpif-upcall.c
index ae856a4..24a5729 100644 (file)
 #include <stdbool.h>
 #include <inttypes.h>
 
+#include "connmgr.h"
 #include "coverage.h"
 #include "dynamic-string.h"
 #include "dpif.h"
 #include "fail-open.h"
+#include "guarded-list.h"
 #include "latch.h"
 #include "seq.h"
 #include "list.h"
 #include "netlink.h"
 #include "ofpbuf.h"
-#include "ofproto-dpif.h"
+#include "ofproto-dpif-ipfix.h"
+#include "ofproto-dpif-sflow.h"
 #include "packets.h"
 #include "poll-loop.h"
 #include "vlog.h"
 
 VLOG_DEFINE_THIS_MODULE(ofproto_dpif_upcall);
 
-COVERAGE_DEFINE(upcall_queue_overflow);
 COVERAGE_DEFINE(drop_queue_overflow);
-COVERAGE_DEFINE(miss_queue_overflow);
+COVERAGE_DEFINE(upcall_queue_overflow);
 COVERAGE_DEFINE(fmb_queue_overflow);
+COVERAGE_DEFINE(fmb_queue_revalidated);
 
 /* A thread that processes each upcall handed to it by the dispatcher thread,
  * forwards the upcall's packet, and then queues it to the main ofproto_dpif
@@ -51,11 +54,12 @@ struct handler {
 
     struct ovs_mutex mutex;            /* Mutex guarding the following. */
 
-    /* Atomic queue of unprocessed miss upcalls. */
+    /* Atomic queue of unprocessed upcalls. */
     struct list upcalls OVS_GUARDED;
     size_t n_upcalls OVS_GUARDED;
 
     size_t n_new_upcalls;              /* Only changed by the dispatcher. */
+    bool need_signal;                  /* Only changed by the dispatcher. */
 
     pthread_cond_t wake_cond;          /* Wakes 'thread' while holding
                                           'mutex'. */
@@ -76,40 +80,48 @@ struct udpif {
 
     pthread_t dispatcher;              /* Dispatcher thread ID. */
 
-    struct handler *handlers;          /* Miss handlers. */
+    struct handler *handlers;          /* Upcall handlers. */
     size_t n_handlers;
 
-    /* Atomic queue of unprocessed drop keys. */
-    struct ovs_mutex drop_key_mutex;
-    struct list drop_keys OVS_GUARDED;
-    size_t n_drop_keys OVS_GUARDED;
-
-    /* Atomic queue of special upcalls for ofproto-dpif to process. */
-    struct ovs_mutex upcall_mutex;
-    struct list upcalls OVS_GUARDED;
-    size_t n_upcalls OVS_GUARDED;
-
-    /* Atomic queue of flow_miss_batches. */
-    struct ovs_mutex fmb_mutex;
-    struct list fmbs OVS_GUARDED;
-    size_t n_fmbs OVS_GUARDED;
+    /* Queues to pass up to ofproto-dpif. */
+    struct guarded_list drop_keys; /* "struct drop key"s. */
+    struct guarded_list fmbs;      /* "struct flow_miss_batch"es. */
 
     /* Number of times udpif_revalidate() has been called. */
     atomic_uint reval_seq;
 
     struct seq *wait_seq;
-    uint64_t last_seq;
 
     struct latch exit_latch; /* Tells child threads to exit. */
 };
 
+enum upcall_type {
+    BAD_UPCALL,                 /* Some kind of bug somewhere. */
+    MISS_UPCALL,                /* A flow miss.  */
+    SFLOW_UPCALL,               /* sFlow sample. */
+    FLOW_SAMPLE_UPCALL,         /* Per-flow sampling. */
+    IPFIX_UPCALL                /* Per-bridge sampling. */
+};
+
+struct upcall {
+    struct list list_node;          /* For queuing upcalls. */
+    struct flow_miss *flow_miss;    /* This upcall's flow_miss. */
+
+    /* Raw upcall plus data for keeping track of the memory backing it. */
+    struct dpif_upcall dpif_upcall; /* As returned by dpif_recv() */
+    struct ofpbuf upcall_buf;       /* Owns some data in 'dpif_upcall'. */
+    uint64_t upcall_stub[512 / 8];  /* Buffer to reduce need for malloc(). */
+};
+
+static void upcall_destroy(struct upcall *);
+
 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
 
 static void recv_upcalls(struct udpif *);
-static void handle_miss_upcalls(struct udpif *, struct list *upcalls);
+static void handle_upcalls(struct udpif *, struct list *upcalls);
 static void miss_destroy(struct flow_miss *);
 static void *udpif_dispatcher(void *);
-static void *udpif_miss_handler(void *);
+static void *udpif_upcall_handler(void *);
 
 struct udpif *
 udpif_create(struct dpif_backer *backer, struct dpif *dpif)
@@ -121,13 +133,9 @@ udpif_create(struct dpif_backer *backer, struct dpif *dpif)
     udpif->secret = random_uint32();
     udpif->wait_seq = seq_create();
     latch_init(&udpif->exit_latch);
-    list_init(&udpif->drop_keys);
-    list_init(&udpif->upcalls);
-    list_init(&udpif->fmbs);
+    guarded_list_init(&udpif->drop_keys);
+    guarded_list_init(&udpif->fmbs);
     atomic_init(&udpif->reval_seq, 0);
-    ovs_mutex_init(&udpif->drop_key_mutex);
-    ovs_mutex_init(&udpif->upcall_mutex);
-    ovs_mutex_init(&udpif->fmb_mutex);
 
     return udpif;
 }
@@ -137,7 +145,6 @@ udpif_destroy(struct udpif *udpif)
 {
     struct flow_miss_batch *fmb;
     struct drop_key *drop_key;
-    struct upcall *upcall;
 
     udpif_recv_set(udpif, 0, false);
 
@@ -145,25 +152,21 @@ udpif_destroy(struct udpif *udpif)
         drop_key_destroy(drop_key);
     }
 
-    while ((upcall = upcall_next(udpif))) {
-        upcall_destroy(upcall);
-    }
-
     while ((fmb = flow_miss_batch_next(udpif))) {
         flow_miss_batch_destroy(fmb);
     }
 
-    ovs_mutex_destroy(&udpif->drop_key_mutex);
-    ovs_mutex_destroy(&udpif->upcall_mutex);
-    ovs_mutex_destroy(&udpif->fmb_mutex);
+    guarded_list_destroy(&udpif->drop_keys);
+    guarded_list_destroy(&udpif->fmbs);
     latch_destroy(&udpif->exit_latch);
     seq_destroy(udpif->wait_seq);
     free(udpif);
 }
 
 /* Tells 'udpif' to begin or stop handling flow misses depending on the value
- * of 'enable'.  'n_handlers' is the number of miss_handler threads to create.
- * Passing 'n_handlers' as zero is equivalent to passing 'enable' as false. */
+ * of 'enable'.  'n_handlers' is the number of upcall_handler threads to
+ * create.  Passing 'n_handlers' as zero is equivalent to passing 'enable' as
+ * false. */
 void
 udpif_recv_set(struct udpif *udpif, size_t n_handlers, bool enable)
 {
@@ -220,42 +223,26 @@ udpif_recv_set(struct udpif *udpif, size_t n_handlers, bool enable)
 
             handler->udpif = udpif;
             list_init(&handler->upcalls);
+            handler->need_signal = false;
             xpthread_cond_init(&handler->wake_cond, NULL);
             ovs_mutex_init(&handler->mutex);
-            xpthread_create(&handler->thread, NULL, udpif_miss_handler, handler);
+            xpthread_create(&handler->thread, NULL, udpif_upcall_handler,
+                            handler);
         }
         xpthread_create(&udpif->dispatcher, NULL, udpif_dispatcher, udpif);
     }
 }
 
-void
-udpif_run(struct udpif *udpif)
-{
-    udpif->last_seq = seq_read(udpif->wait_seq);
-}
-
 void
 udpif_wait(struct udpif *udpif)
 {
-    ovs_mutex_lock(&udpif->drop_key_mutex);
-    if (udpif->n_drop_keys) {
-        poll_immediate_wake();
-    }
-    ovs_mutex_unlock(&udpif->drop_key_mutex);
-
-    ovs_mutex_lock(&udpif->upcall_mutex);
-    if (udpif->n_upcalls) {
-        poll_immediate_wake();
-    }
-    ovs_mutex_unlock(&udpif->upcall_mutex);
-
-    ovs_mutex_lock(&udpif->fmb_mutex);
-    if (udpif->n_fmbs) {
+    uint64_t seq = seq_read(udpif->wait_seq);
+    if (!guarded_list_is_empty(&udpif->drop_keys) ||
+        !guarded_list_is_empty(&udpif->fmbs)) {
         poll_immediate_wake();
+    } else {
+        seq_wait(udpif->wait_seq, seq);
     }
-    ovs_mutex_unlock(&udpif->fmb_mutex);
-
-    seq_wait(udpif->wait_seq, udpif->last_seq);
 }
 
 /* Notifies 'udpif' that something changed which may render previous
@@ -265,43 +252,26 @@ udpif_revalidate(struct udpif *udpif)
 {
     struct flow_miss_batch *fmb, *next_fmb;
     unsigned int junk;
+    struct list fmbs;
 
     /* Since we remove each miss on revalidation, their statistics won't be
      * accounted to the appropriate 'facet's in the upper layer.  In most
      * cases, this is alright because we've already pushed the stats to the
      * relevant rules.  However, NetFlow requires absolute packet counts on
      * 'facet's which could now be incorrect. */
-    ovs_mutex_lock(&udpif->fmb_mutex);
     atomic_add(&udpif->reval_seq, 1, &junk);
-    LIST_FOR_EACH_SAFE (fmb, next_fmb, list_node, &udpif->fmbs) {
+
+    guarded_list_pop_all(&udpif->fmbs, &fmbs);
+    LIST_FOR_EACH_SAFE (fmb, next_fmb, list_node, &fmbs) {
         list_remove(&fmb->list_node);
         flow_miss_batch_destroy(fmb);
-        udpif->n_fmbs--;
     }
-    ovs_mutex_unlock(&udpif->fmb_mutex);
-    udpif_drop_key_clear(udpif);
-}
 
-/* Retreives the next upcall which ofproto-dpif is responsible for handling.
- * The caller is responsible for destroying the returned upcall with
- * upcall_destroy(). */
-struct upcall *
-upcall_next(struct udpif *udpif)
-{
-    struct upcall *next = NULL;
-
-    ovs_mutex_lock(&udpif->upcall_mutex);
-    if (udpif->n_upcalls) {
-        udpif->n_upcalls--;
-        next = CONTAINER_OF(list_pop_front(&udpif->upcalls), struct upcall,
-                            list_node);
-    }
-    ovs_mutex_unlock(&udpif->upcall_mutex);
-    return next;
+    udpif_drop_key_clear(udpif);
 }
 
 /* Destroys and deallocates 'upcall'. */
-void
+static void
 upcall_destroy(struct upcall *upcall)
 {
     if (upcall) {
@@ -310,22 +280,34 @@ upcall_destroy(struct upcall *upcall)
     }
 }
 
-/* Retreives the next batch of processed flow misses for 'udpif' to install.
+/* Retrieves the next batch of processed flow misses for 'udpif' to install.
  * The caller is responsible for destroying it with flow_miss_batch_destroy().
  */
 struct flow_miss_batch *
 flow_miss_batch_next(struct udpif *udpif)
 {
-    struct flow_miss_batch *next = NULL;
+    int i;
 
-    ovs_mutex_lock(&udpif->fmb_mutex);
-    if (udpif->n_fmbs) {
-        udpif->n_fmbs--;
-        next = CONTAINER_OF(list_pop_front(&udpif->fmbs),
-                            struct flow_miss_batch, list_node);
+    for (i = 0; i < 50; i++) {
+        struct flow_miss_batch *next;
+        unsigned int reval_seq;
+        struct list *next_node;
+
+        next_node = guarded_list_pop_front(&udpif->fmbs);
+        if (!next_node) {
+            break;
+        }
+
+        next = CONTAINER_OF(next_node, struct flow_miss_batch, list_node);
+        atomic_read(&udpif->reval_seq, &reval_seq);
+        if (next->reval_seq == reval_seq) {
+            return next;
+        }
+
+        flow_miss_batch_destroy(next);
     }
-    ovs_mutex_unlock(&udpif->fmb_mutex);
-    return next;
+
+    return NULL;
 }
 
 /* Destroys and deallocates 'fmb'. */
@@ -333,6 +315,7 @@ void
 flow_miss_batch_destroy(struct flow_miss_batch *fmb)
 {
     struct flow_miss *miss, *next;
+    struct upcall *upcall, *next_upcall;
 
     if (!fmb) {
         return;
@@ -343,59 +326,25 @@ flow_miss_batch_destroy(struct flow_miss_batch *fmb)
         miss_destroy(miss);
     }
 
+    LIST_FOR_EACH_SAFE (upcall, next_upcall, list_node, &fmb->upcalls) {
+        list_remove(&upcall->list_node);
+        upcall_destroy(upcall);
+    }
+
     hmap_destroy(&fmb->misses);
     free(fmb);
 }
 
-/* Discards any flow miss batches queued up in 'udpif' for 'ofproto' (because
- * 'ofproto' is being destroyed).
- *
- * 'ofproto''s xports must already have been removed, otherwise new flow miss
- * batches could still end up getting queued. */
-void
-flow_miss_batch_ofproto_destroyed(struct udpif *udpif,
-                                  const struct ofproto_dpif *ofproto)
-{
-    struct flow_miss_batch *fmb, *next_fmb;
-
-    ovs_mutex_lock(&udpif->fmb_mutex);
-    LIST_FOR_EACH_SAFE (fmb, next_fmb, list_node, &udpif->fmbs) {
-        struct flow_miss *miss, *next_miss;
-
-        HMAP_FOR_EACH_SAFE (miss, next_miss, hmap_node, &fmb->misses) {
-            if (miss->ofproto == ofproto) {
-                hmap_remove(&fmb->misses, &miss->hmap_node);
-                miss_destroy(miss);
-            }
-        }
-
-        if (hmap_is_empty(&fmb->misses)) {
-            list_remove(&fmb->list_node);
-            flow_miss_batch_destroy(fmb);
-            udpif->n_fmbs--;
-        }
-    }
-    ovs_mutex_unlock(&udpif->fmb_mutex);
-}
-
-/* Retreives the next drop key which ofproto-dpif needs to process.  The caller
+/* Retrieves the next drop key which ofproto-dpif needs to process.  The caller
  * is responsible for destroying it with drop_key_destroy(). */
 struct drop_key *
 drop_key_next(struct udpif *udpif)
 {
-    struct drop_key *next = NULL;
-
-    ovs_mutex_lock(&udpif->drop_key_mutex);
-    if (udpif->n_drop_keys) {
-        udpif->n_drop_keys--;
-        next = CONTAINER_OF(list_pop_front(&udpif->drop_keys), struct drop_key,
-                            list_node);
-    }
-    ovs_mutex_unlock(&udpif->drop_key_mutex);
-    return next;
+    struct list *next = guarded_list_pop_front(&udpif->drop_keys);
+    return next ? CONTAINER_OF(next, struct drop_key, list_node) : NULL;
 }
 
-/* Destorys and deallocates 'drop_key'. */
+/* Destroys and deallocates 'drop_key'. */
 void
 drop_key_destroy(struct drop_key *drop_key)
 {
@@ -410,19 +359,17 @@ void
 udpif_drop_key_clear(struct udpif *udpif)
 {
     struct drop_key *drop_key, *next;
+    struct list list;
 
-    ovs_mutex_lock(&udpif->drop_key_mutex);
-    LIST_FOR_EACH_SAFE (drop_key, next, list_node, &udpif->drop_keys) {
+    guarded_list_pop_all(&udpif->drop_keys, &list);
+    LIST_FOR_EACH_SAFE (drop_key, next, list_node, &list) {
         list_remove(&drop_key->list_node);
         drop_key_destroy(drop_key);
-        udpif->n_drop_keys--;
     }
-    ovs_mutex_unlock(&udpif->drop_key_mutex);
 }
 \f
-/* The dispatcher thread is responsible for receving upcalls from the kernel,
- * assigning the miss upcalls to a miss_handler thread, and assigning the more
- * complex ones to ofproto-dpif directly. */
+/* The dispatcher thread is responsible for receiving upcalls from the kernel,
+ * assigning them to a upcall_handler thread. */
 static void *
 udpif_dispatcher(void *arg)
 {
@@ -439,17 +386,17 @@ udpif_dispatcher(void *arg)
     return NULL;
 }
 
-/* The miss handler thread is responsible for processing miss upcalls retreived
+/* The miss handler thread is responsible for processing miss upcalls retrieved
  * by the dispatcher thread.  Once finished it passes the processed miss
  * upcalls to ofproto-dpif where they're installed in the datapath. */
 static void *
-udpif_miss_handler(void *arg)
+udpif_upcall_handler(void *arg)
 {
-    struct list misses = LIST_INITIALIZER(&misses);
     struct handler *handler = arg;
 
-    set_subprogram_name("miss_handler");
+    set_subprogram_name("upcall_handler");
     for (;;) {
+        struct list misses = LIST_INITIALIZER(&misses);
         size_t i;
 
         ovs_mutex_lock(&handler->mutex);
@@ -473,19 +420,15 @@ udpif_miss_handler(void *arg)
         }
         ovs_mutex_unlock(&handler->mutex);
 
-        handle_miss_upcalls(handler->udpif, &misses);
+        handle_upcalls(handler->udpif, &misses);
+
+        coverage_clear();
     }
 }
 \f
 static void
 miss_destroy(struct flow_miss *miss)
 {
-    struct upcall *upcall, *next;
-
-    LIST_FOR_EACH_SAFE (upcall, next, list_node, &miss->upcalls) {
-        list_remove(&upcall->list_node);
-        upcall_destroy(upcall);
-    }
     xlate_out_uninit(&miss->xout);
 }
 
@@ -547,13 +490,14 @@ classify_upcall(const struct upcall *upcall)
 static void
 recv_upcalls(struct udpif *udpif)
 {
-    static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(60, 60);
-    size_t n_udpif_new_upcalls = 0;
-    struct handler *handler;
     int n;
 
     for (;;) {
+        uint32_t hash = udpif->secret;
+        struct handler *handler;
         struct upcall *upcall;
+        size_t n_bytes, left;
+        struct nlattr *nla;
         int error;
 
         upcall = xmalloc(sizeof *upcall);
@@ -566,86 +510,65 @@ recv_upcalls(struct udpif *udpif)
             break;
         }
 
-        upcall->type = classify_upcall(upcall);
-        if (upcall->type == BAD_UPCALL) {
-            upcall_destroy(upcall);
-        } else if (upcall->type == MISS_UPCALL) {
-            struct dpif_upcall *dupcall = &upcall->dpif_upcall;
-            uint32_t hash = udpif->secret;
-            struct nlattr *nla;
-            size_t n_bytes, left;
-
-            n_bytes = 0;
-            NL_ATTR_FOR_EACH (nla, left, dupcall->key, dupcall->key_len) {
-                enum ovs_key_attr type = nl_attr_type(nla);
-                if (type == OVS_KEY_ATTR_IN_PORT
-                    || type == OVS_KEY_ATTR_TCP
-                    || type == OVS_KEY_ATTR_UDP) {
-                    if (nl_attr_get_size(nla) == 4) {
-                        ovs_be32 attr = nl_attr_get_be32(nla);
-                        hash = mhash_add(hash, (OVS_FORCE uint32_t) attr);
-                        n_bytes += 4;
-                    } else {
-                        VLOG_WARN("Netlink attribute with incorrect size.");
-                    }
+        n_bytes = 0;
+        NL_ATTR_FOR_EACH (nla, left, upcall->dpif_upcall.key,
+                          upcall->dpif_upcall.key_len) {
+            enum ovs_key_attr type = nl_attr_type(nla);
+            if (type == OVS_KEY_ATTR_IN_PORT
+                || type == OVS_KEY_ATTR_TCP
+                || type == OVS_KEY_ATTR_UDP) {
+                if (nl_attr_get_size(nla) == 4) {
+                    hash = mhash_add(hash, nl_attr_get_u32(nla));
+                    n_bytes += 4;
+                } else {
+                    VLOG_WARN_RL(&rl,
+                                 "Netlink attribute with incorrect size.");
                 }
             }
-            hash =  mhash_finish(hash, n_bytes);
-
-            handler = &udpif->handlers[hash % udpif->n_handlers];
+        }
+        hash =  mhash_finish(hash, n_bytes);
 
-            ovs_mutex_lock(&handler->mutex);
-            if (handler->n_upcalls < MAX_QUEUE_LENGTH) {
-                list_push_back(&handler->upcalls, &upcall->list_node);
-                handler->n_new_upcalls = ++handler->n_upcalls;
+        handler = &udpif->handlers[hash % udpif->n_handlers];
 
-                if (handler->n_new_upcalls >= FLOW_MISS_MAX_BATCH) {
-                    xpthread_cond_signal(&handler->wake_cond);
-                }
-                ovs_mutex_unlock(&handler->mutex);
-                if (!VLOG_DROP_DBG(&rl)) {
-                    struct ds ds = DS_EMPTY_INITIALIZER;
-
-                    odp_flow_key_format(upcall->dpif_upcall.key,
-                                        upcall->dpif_upcall.key_len,
-                                        &ds);
-                    VLOG_DBG("dispatcher: miss enqueue (%s)", ds_cstr(&ds));
-                    ds_destroy(&ds);
-                }
-            } else {
-                ovs_mutex_unlock(&handler->mutex);
-                COVERAGE_INC(miss_queue_overflow);
-                upcall_destroy(upcall);
+        ovs_mutex_lock(&handler->mutex);
+        if (handler->n_upcalls < MAX_QUEUE_LENGTH) {
+            list_push_back(&handler->upcalls, &upcall->list_node);
+            if (handler->n_upcalls == 0) {
+                handler->need_signal = true;
             }
-        } else {
-            ovs_mutex_lock(&udpif->upcall_mutex);
-            if (udpif->n_upcalls < MAX_QUEUE_LENGTH) {
-                n_udpif_new_upcalls = ++udpif->n_upcalls;
-                list_push_back(&udpif->upcalls, &upcall->list_node);
-                ovs_mutex_unlock(&udpif->upcall_mutex);
-
-                if (n_udpif_new_upcalls >= FLOW_MISS_MAX_BATCH) {
-                    seq_change(udpif->wait_seq);
-                }
-            } else {
-                ovs_mutex_unlock(&udpif->upcall_mutex);
-                COVERAGE_INC(upcall_queue_overflow);
-                upcall_destroy(upcall);
+            handler->n_upcalls++;
+            if (handler->need_signal &&
+                handler->n_upcalls >= FLOW_MISS_MAX_BATCH) {
+                handler->need_signal = false;
+                xpthread_cond_signal(&handler->wake_cond);
+            }
+            ovs_mutex_unlock(&handler->mutex);
+            if (!VLOG_DROP_DBG(&rl)) {
+                struct ds ds = DS_EMPTY_INITIALIZER;
+
+                odp_flow_key_format(upcall->dpif_upcall.key,
+                                    upcall->dpif_upcall.key_len,
+                                    &ds);
+                VLOG_DBG("dispatcher: enqueue (%s)", ds_cstr(&ds));
+                ds_destroy(&ds);
             }
+        } else {
+            ovs_mutex_unlock(&handler->mutex);
+            COVERAGE_INC(upcall_queue_overflow);
+            upcall_destroy(upcall);
         }
     }
+
     for (n = 0; n < udpif->n_handlers; ++n) {
-        handler = &udpif->handlers[n];
-        if (handler->n_new_upcalls) {
-            handler->n_new_upcalls = 0;
+        struct handler *handler = &udpif->handlers[n];
+
+        if (handler->need_signal) {
+            handler->need_signal = false;
             ovs_mutex_lock(&handler->mutex);
             xpthread_cond_signal(&handler->wake_cond);
             ovs_mutex_unlock(&handler->mutex);
         }
     }
-    if (n_udpif_new_upcalls) {
-        seq_change(udpif->wait_seq);
-    }
 }
 
 static struct flow_miss *
@@ -663,204 +586,241 @@ flow_miss_find(struct hmap *todo, const struct ofproto_dpif *ofproto,
     return NULL;
 }
 
-/* Executes flow miss 'miss'.  May add any required datapath operations
- * to 'ops', incrementing '*n_ops' for each new op. */
 static void
-execute_flow_miss(struct flow_miss *miss, struct dpif_op *ops, size_t *n_ops)
-{
-    struct ofproto_dpif *ofproto = miss->ofproto;
-    struct flow_wildcards wc;
-    struct rule_dpif *rule;
-    struct ofpbuf *packet;
-    struct xlate_in xin;
-
-    memset(&miss->stats, 0, sizeof miss->stats);
-    miss->stats.used = time_msec();
-    LIST_FOR_EACH (packet, list_node, &miss->packets) {
-        miss->stats.tcp_flags |= packet_get_tcp_flags(packet, &miss->flow);
-        miss->stats.n_bytes += packet->size;
-        miss->stats.n_packets++;
-    }
-
-    flow_wildcards_init_catchall(&wc);
-    rule_dpif_lookup(ofproto, &miss->flow, &wc, &rule);
-    rule_dpif_credit_stats(rule, &miss->stats);
-    xlate_in_init(&xin, ofproto, &miss->flow, rule, miss->stats.tcp_flags,
-                  NULL);
-    xin.may_learn = true;
-    xin.resubmit_stats = &miss->stats;
-    xlate_actions(&xin, &miss->xout);
-    flow_wildcards_or(&miss->xout.wc, &miss->xout.wc, &wc);
-
-    if (rule_dpif_fail_open(rule)) {
-        LIST_FOR_EACH (packet, list_node, &miss->packets) {
-            struct ofputil_packet_in *pin;
-
-            /* Extra-special case for fail-open mode.
-             *
-             * We are in fail-open mode and the packet matched the fail-open
-             * rule, but we are connected to a controller too.  We should send
-             * the packet up to the controller in the hope that it will try to
-             * set up a flow and thereby allow us to exit fail-open.
-             *
-             * See the top-level comment in fail-open.c for more information. */
-            pin = xmalloc(sizeof(*pin));
-            pin->packet = xmemdup(packet->data, packet->size);
-            pin->packet_len = packet->size;
-            pin->reason = OFPR_NO_MATCH;
-            pin->controller_id = 0;
-            pin->table_id = 0;
-            pin->cookie = 0;
-            pin->send_len = 0; /* Not used for flow table misses. */
-            flow_get_metadata(&miss->flow, &pin->fmd);
-            ofproto_dpif_send_packet_in(ofproto, pin);
-        }
-    }
-
-    if (miss->xout.slow) {
-        LIST_FOR_EACH (packet, list_node, &miss->packets) {
-            struct xlate_in xin;
-
-            xlate_in_init(&xin, miss->ofproto, &miss->flow, rule, 0, packet);
-            xlate_actions_for_side_effects(&xin);
-        }
-    }
-    rule_dpif_unref(rule);
-
-    if (miss->xout.odp_actions.size) {
-        LIST_FOR_EACH (packet, list_node, &miss->packets) {
-            struct dpif_op *op = &ops[*n_ops];
-            struct dpif_execute *execute = &op->u.execute;
-
-            if (miss->flow.in_port.ofp_port
-                != vsp_realdev_to_vlandev(miss->ofproto,
-                                          miss->flow.in_port.ofp_port,
-                                          miss->flow.vlan_tci)) {
-                /* This packet was received on a VLAN splinter port.  We
-                 * added a VLAN to the packet to make the packet resemble
-                 * the flow, but the actions were composed assuming that
-                 * the packet contained no VLAN.  So, we must remove the
-                 * VLAN header from the packet before trying to execute the
-                 * actions. */
-                eth_pop_vlan(packet);
-            }
-
-            op->type = DPIF_OP_EXECUTE;
-            execute->key = miss->key;
-            execute->key_len = miss->key_len;
-            execute->packet = packet;
-            execute->actions = miss->xout.odp_actions.data;
-            execute->actions_len = miss->xout.odp_actions.size;
-
-            (*n_ops)++;
-        }
-    }
-}
-
-static void
-handle_miss_upcalls(struct udpif *udpif, struct list *upcalls)
+handle_upcalls(struct udpif *udpif, struct list *upcalls)
 {
     struct dpif_op *opsp[FLOW_MISS_MAX_BATCH];
     struct dpif_op ops[FLOW_MISS_MAX_BATCH];
-    unsigned int old_reval_seq, new_reval_seq;
     struct upcall *upcall, *next;
     struct flow_miss_batch *fmb;
-    size_t n_upcalls, n_ops, i;
+    size_t n_misses, n_ops, i;
     struct flow_miss *miss;
+    unsigned int reval_seq;
+    enum upcall_type type;
+    bool fail_open;
 
-    atomic_read(&udpif->reval_seq, &old_reval_seq);
-
-    /* Construct the to-do list.
+    /* Extract the flow from each upcall.  Construct in fmb->misses a hash
+     * table that maps each unique flow to a 'struct flow_miss'.
      *
-     * This just amounts to extracting the flow from each packet and sticking
-     * the packets that have the same flow in the same "flow_miss" structure so
-     * that we can process them together. */
+     * Most commonly there is a single packet per flow_miss, but there are
+     * several reasons why there might be more than one, e.g.:
+     *
+     *   - The dpif packet interface does not support TSO (or UFO, etc.), so a
+     *     large packet sent to userspace is split into a sequence of smaller
+     *     ones.
+     *
+     *   - A stream of quickly arriving packets in an established "slow-pathed"
+     *     flow.
+     *
+     *   - Rarely, a stream of quickly arriving packets in a flow not yet
+     *     established.  (This is rare because most protocols do not send
+     *     multiple back-to-back packets before receiving a reply from the
+     *     other end of the connection, which gives OVS a chance to set up a
+     *     datapath flow.)
+     */
     fmb = xmalloc(sizeof *fmb);
+    atomic_read(&udpif->reval_seq, &fmb->reval_seq);
     hmap_init(&fmb->misses);
-    n_upcalls = 0;
+    list_init(&fmb->upcalls);
+    n_misses = 0;
     LIST_FOR_EACH_SAFE (upcall, next, list_node, upcalls) {
         struct dpif_upcall *dupcall = &upcall->dpif_upcall;
-        struct flow_miss *miss = &fmb->miss_buf[n_upcalls];
+        struct ofpbuf *packet = dupcall->packet;
+        struct flow_miss *miss = &fmb->miss_buf[n_misses];
         struct flow_miss *existing_miss;
         struct ofproto_dpif *ofproto;
+        struct dpif_sflow *sflow;
+        struct dpif_ipfix *ipfix;
         odp_port_t odp_in_port;
         struct flow flow;
-        uint32_t hash;
         int error;
 
-        error = xlate_receive(udpif->backer, dupcall->packet, dupcall->key,
+        error = xlate_receive(udpif->backer, packet, dupcall->key,
                               dupcall->key_len, &flow, &miss->key_fitness,
                               &ofproto, &odp_in_port);
+        if (error) {
+            if (error == ENODEV) {
+                struct drop_key *drop_key;
+
+                /* Received packet on datapath port for which we couldn't
+                 * associate an ofproto.  This can happen if a port is removed
+                 * while traffic is being received.  Print a rate-limited
+                 * message in case it happens frequently.  Install a drop flow
+                 * so that future packets of the flow are inexpensively dropped
+                 * in the kernel. */
+                VLOG_INFO_RL(&rl, "received packet on unassociated datapath "
+                             "port %"PRIu32, odp_in_port);
+
+                drop_key = xmalloc(sizeof *drop_key);
+                drop_key->key = xmemdup(dupcall->key, dupcall->key_len);
+                drop_key->key_len = dupcall->key_len;
+
+                if (guarded_list_push_back(&udpif->drop_keys,
+                                           &drop_key->list_node,
+                                           MAX_QUEUE_LENGTH)) {
+                    seq_change(udpif->wait_seq);
+                } else {
+                    COVERAGE_INC(drop_queue_overflow);
+                    drop_key_destroy(drop_key);
+                }
+            }
+            list_remove(&upcall->list_node);
+            upcall_destroy(upcall);
+            continue;
+        }
 
-        if (error == ENODEV) {
-            struct drop_key *drop_key;
-
-            /* Received packet on datapath port for which we couldn't
-             * associate an ofproto.  This can happen if a port is removed
-             * while traffic is being received.  Print a rate-limited message
-             * in case it happens frequently.  Install a drop flow so
-             * that future packets of the flow are inexpensively dropped
-             * in the kernel. */
-            VLOG_INFO_RL(&rl, "received packet on unassociated datapath port "
-                              "%"PRIu32, odp_in_port);
-
-            drop_key = xmalloc(sizeof *drop_key);
-            drop_key->key = xmemdup(dupcall->key, dupcall->key_len);
-            drop_key->key_len = dupcall->key_len;
-
-            ovs_mutex_lock(&udpif->drop_key_mutex);
-            if (udpif->n_drop_keys < MAX_QUEUE_LENGTH) {
-                udpif->n_drop_keys++;
-                list_push_back(&udpif->drop_keys, &drop_key->list_node);
-                ovs_mutex_unlock(&udpif->drop_key_mutex);
-                seq_change(udpif->wait_seq);
+        type = classify_upcall(upcall);
+        if (type == MISS_UPCALL) {
+            uint32_t hash;
+
+            flow_extract(packet, flow.skb_priority, flow.pkt_mark,
+                         &flow.tunnel, &flow.in_port, &miss->flow);
+
+            hash = flow_hash(&miss->flow, 0);
+            existing_miss = flow_miss_find(&fmb->misses, ofproto, &miss->flow,
+                                           hash);
+            if (!existing_miss) {
+                hmap_insert(&fmb->misses, &miss->hmap_node, hash);
+                miss->ofproto = ofproto;
+                miss->key = dupcall->key;
+                miss->key_len = dupcall->key_len;
+                miss->upcall_type = dupcall->type;
+                miss->stats.n_packets = 0;
+                miss->stats.n_bytes = 0;
+                miss->stats.used = time_msec();
+                miss->stats.tcp_flags = 0;
+
+                n_misses++;
             } else {
-                ovs_mutex_unlock(&udpif->drop_key_mutex);
-                COVERAGE_INC(drop_queue_overflow);
-                drop_key_destroy(drop_key);
+                miss = existing_miss;
             }
-            continue;
-        } else if (error) {
+            miss->stats.tcp_flags |= packet_get_tcp_flags(packet, &miss->flow);
+            miss->stats.n_bytes += packet->size;
+            miss->stats.n_packets++;
+
+            upcall->flow_miss = miss;
             continue;
         }
 
-        flow_extract(dupcall->packet, flow.skb_priority, flow.pkt_mark,
-                     &flow.tunnel, &flow.in_port, &miss->flow);
-
-        /* Add other packets to a to-do list. */
-        hash = flow_hash(&miss->flow, 0);
-        existing_miss = flow_miss_find(&fmb->misses, ofproto, &miss->flow, hash);
-        if (!existing_miss) {
-            hmap_insert(&fmb->misses, &miss->hmap_node, hash);
-            miss->ofproto = ofproto;
-            miss->key = dupcall->key;
-            miss->key_len = dupcall->key_len;
-            miss->upcall_type = dupcall->type;
-            list_init(&miss->packets);
-            list_init(&miss->upcalls);
-
-            n_upcalls++;
-        } else {
-            miss = existing_miss;
+        switch (type) {
+        case SFLOW_UPCALL:
+            sflow = xlate_get_sflow(ofproto);
+            if (sflow) {
+                union user_action_cookie cookie;
+
+                memset(&cookie, 0, sizeof cookie);
+                memcpy(&cookie, nl_attr_get(dupcall->userdata),
+                       sizeof cookie.sflow);
+                dpif_sflow_received(sflow, dupcall->packet, &flow, odp_in_port,
+                                    &cookie);
+                dpif_sflow_unref(sflow);
+            }
+            break;
+        case IPFIX_UPCALL:
+            ipfix = xlate_get_ipfix(ofproto);
+            if (ipfix) {
+                dpif_ipfix_bridge_sample(ipfix, dupcall->packet, &flow);
+                dpif_ipfix_unref(ipfix);
+            }
+            break;
+        case FLOW_SAMPLE_UPCALL:
+            ipfix = xlate_get_ipfix(ofproto);
+            if (ipfix) {
+                union user_action_cookie cookie;
+
+                memset(&cookie, 0, sizeof cookie);
+                memcpy(&cookie, nl_attr_get(dupcall->userdata),
+                       sizeof cookie.flow_sample);
+
+                /* The flow reflects exactly the contents of the packet.
+                 * Sample the packet using it. */
+                dpif_ipfix_flow_sample(ipfix, dupcall->packet, &flow,
+                                       cookie.flow_sample.collector_set_id,
+                                       cookie.flow_sample.probability,
+                                       cookie.flow_sample.obs_domain_id,
+                                       cookie.flow_sample.obs_point_id);
+                dpif_ipfix_unref(ipfix);
+            }
+            break;
+        case BAD_UPCALL:
+            break;
+        case MISS_UPCALL:
+            NOT_REACHED();
         }
-        list_push_back(&miss->packets, &dupcall->packet->list_node);
 
         list_remove(&upcall->list_node);
-        list_push_back(&miss->upcalls, &upcall->list_node);
+        upcall_destroy(upcall);
     }
 
-    LIST_FOR_EACH_SAFE (upcall, next, list_node, upcalls) {
-        list_remove(&upcall->list_node);
-        upcall_destroy(upcall);
+    /* Initialize each 'struct flow_miss's ->xout.
+     *
+     * We do this per-flow_miss rather than per-packet because, most commonly,
+     * all the packets in a flow can use the same translation.
+     *
+     * We can't do this in the previous loop because we need the TCP flags for
+     * all the packets in each miss. */
+    fail_open = false;
+    HMAP_FOR_EACH (miss, hmap_node, &fmb->misses) {
+        struct xlate_in xin;
+
+        xlate_in_init(&xin, miss->ofproto, &miss->flow, NULL,
+                      miss->stats.tcp_flags, NULL);
+        xin.may_learn = true;
+        xin.resubmit_stats = &miss->stats;
+        xlate_actions(&xin, &miss->xout);
+        fail_open = fail_open || miss->xout.fail_open;
     }
 
-    /* Process each element in the to-do list, constructing the set of
-     * operations to batch. */
+    /* Now handle the packets individually in order of arrival.  In the common
+     * case each packet of a miss can share the same actions, but slow-pathed
+     * packets need to be translated individually:
+     *
+     *   - For SLOW_CFM, SLOW_LACP, SLOW_STP, and SLOW_BFD, translation is what
+     *     processes received packets for these protocols.
+     *
+     *   - For SLOW_CONTROLLER, translation sends the packet to the OpenFlow
+     *     controller.
+     *
+     * The loop fills 'ops' with an array of operations to execute in the
+     * datapath. */
     n_ops = 0;
-    HMAP_FOR_EACH (miss, hmap_node, &fmb->misses) {
-        execute_flow_miss(miss, ops, &n_ops);
+    LIST_FOR_EACH (upcall, list_node, upcalls) {
+        struct flow_miss *miss = upcall->flow_miss;
+        struct ofpbuf *packet = upcall->dpif_upcall.packet;
+
+        if (miss->xout.slow) {
+            struct xlate_in xin;
+
+            xlate_in_init(&xin, miss->ofproto, &miss->flow, NULL, 0, packet);
+            xlate_actions_for_side_effects(&xin);
+        }
+
+        if (miss->xout.odp_actions.size) {
+            struct dpif_op *op;
+
+            if (miss->flow.in_port.ofp_port
+                != vsp_realdev_to_vlandev(miss->ofproto,
+                                          miss->flow.in_port.ofp_port,
+                                          miss->flow.vlan_tci)) {
+                /* This packet was received on a VLAN splinter port.  We
+                 * added a VLAN to the packet to make the packet resemble
+                 * the flow, but the actions were composed assuming that
+                 * the packet contained no VLAN.  So, we must remove the
+                 * VLAN header from the packet before trying to execute the
+                 * actions. */
+                eth_pop_vlan(packet);
+            }
+
+            op = &ops[n_ops++];
+            op->type = DPIF_OP_EXECUTE;
+            op->u.execute.key = miss->key;
+            op->u.execute.key_len = miss->key_len;
+            op->u.execute.packet = packet;
+            op->u.execute.actions = miss->xout.odp_actions.data;
+            op->u.execute.actions_len = miss->xout.odp_actions.size;
+            op->u.execute.needs_help = (miss->xout.slow & SLOW_ACTION) != 0;
+        }
     }
-    ovs_assert(n_ops <= ARRAY_SIZE(ops));
 
     /* Execute batch. */
     for (i = 0; i < n_ops; i++) {
@@ -868,21 +828,43 @@ handle_miss_upcalls(struct udpif *udpif, struct list *upcalls)
     }
     dpif_operate(udpif->dpif, opsp, n_ops);
 
-    ovs_mutex_lock(&udpif->fmb_mutex);
-    atomic_read(&udpif->reval_seq, &new_reval_seq);
-    if (old_reval_seq != new_reval_seq) {
-        /* udpif_revalidate() was called as we were calculating the actions.
-         * To be safe, we need to assume all the misses need revalidation. */
-        ovs_mutex_unlock(&udpif->fmb_mutex);
+    /* Special case for fail-open mode.
+     *
+     * If we are in fail-open mode, but we are connected to a controller too,
+     * then we should send the packet up to the controller in the hope that it
+     * will try to set up a flow and thereby allow us to exit fail-open.
+     *
+     * See the top-level comment in fail-open.c for more information. */
+    if (fail_open) {
+        LIST_FOR_EACH (upcall, list_node, upcalls) {
+            struct flow_miss *miss = upcall->flow_miss;
+            struct ofpbuf *packet = upcall->dpif_upcall.packet;
+            struct ofproto_packet_in *pin;
+
+            pin = xmalloc(sizeof *pin);
+            pin->up.packet = xmemdup(packet->data, packet->size);
+            pin->up.packet_len = packet->size;
+            pin->up.reason = OFPR_NO_MATCH;
+            pin->up.table_id = 0;
+            pin->up.cookie = OVS_BE64_MAX;
+            flow_get_metadata(&miss->flow, &pin->up.fmd);
+            pin->send_len = 0; /* Not used for flow table misses. */
+            pin->generated_by_table_miss = false;
+            ofproto_dpif_send_packet_in(miss->ofproto, pin);
+        }
+    }
+
+    list_move(&fmb->upcalls, upcalls);
+
+    atomic_read(&udpif->reval_seq, &reval_seq);
+    if (reval_seq != fmb->reval_seq) {
+        COVERAGE_INC(fmb_queue_revalidated);
         flow_miss_batch_destroy(fmb);
-    } else if (udpif->n_fmbs < MAX_QUEUE_LENGTH) {
-        udpif->n_fmbs++;
-        list_push_back(&udpif->fmbs, &fmb->list_node);
-        ovs_mutex_unlock(&udpif->fmb_mutex);
-        seq_change(udpif->wait_seq);
-    } else {
+    } else if (!guarded_list_push_back(&udpif->fmbs, &fmb->list_node,
+                                       MAX_QUEUE_LENGTH)) {
         COVERAGE_INC(fmb_queue_overflow);
-        ovs_mutex_unlock(&udpif->fmb_mutex);
         flow_miss_batch_destroy(fmb);
+    } else {
+        seq_change(udpif->wait_seq);
     }
 }