dpif: Refactor flow dumping interface to make better sense for batching.
[cascardo/ovs.git] / ofproto / ofproto-dpif-upcall.c
index 522aa8c..03b1332 100644 (file)
@@ -102,7 +102,7 @@ struct udpif {
     bool need_revalidate;              /* As indicated by 'reval_seq'. */
     bool reval_exit;                   /* Set by leader on 'exit_latch. */
     pthread_barrier_t reval_barrier;   /* Barrier used by revalidators. */
-    struct dpif_flow_dump dump;        /* DPIF flow dump state. */
+    struct dpif_flow_dump *dump;       /* DPIF flow dump state. */
     long long int dump_duration;       /* Duration of the last flow dump. */
     struct seq *dump_seq;              /* Increments each dump iteration. */
 
@@ -580,7 +580,7 @@ udpif_revalidator(void *arg)
 
             start_time = time_msec();
             if (!udpif->reval_exit) {
-                dpif_flow_dump_start(&udpif->dump, udpif->dpif);
+                udpif->dump = dpif_flow_dump_create(udpif->dpif);
             }
         }
 
@@ -601,7 +601,7 @@ udpif_revalidator(void *arg)
         if (leader) {
             long long int duration;
 
-            dpif_flow_dump_done(&udpif->dump);
+            dpif_flow_dump_destroy(udpif->dump);
             seq_change(udpif->dump_seq);
 
             duration = MAX(time_msec() - start_time, 1);
@@ -1205,9 +1205,7 @@ should_revalidate(uint64_t packets, long long int used)
 
 static bool
 revalidate_ukey(struct udpif *udpif, struct udpif_key *ukey,
-                const struct nlattr *mask, size_t mask_len,
-                const struct nlattr *actions, size_t actions_len,
-                const struct dpif_flow_stats *stats)
+                const struct dpif_flow *f)
 {
     uint64_t slow_path_buf[128 / 8];
     struct xlate_out xout, *xoutp;
@@ -1230,14 +1228,14 @@ revalidate_ukey(struct udpif *udpif, struct udpif_key *ukey,
 
     ovs_mutex_lock(&ukey->mutex);
     last_used = ukey->stats.used;
-    push.used = stats->used;
-    push.tcp_flags = stats->tcp_flags;
-    push.n_packets = stats->n_packets > ukey->stats.n_packets
-        ? stats->n_packets - ukey->stats.n_packets
-        : 0;
-    push.n_bytes = stats->n_bytes > ukey->stats.n_bytes
-        ? stats->n_bytes - ukey->stats.n_bytes
-        : 0;
+    push.used = f->stats.used;
+    push.tcp_flags = f->stats.tcp_flags;
+    push.n_packets = (f->stats.n_packets > ukey->stats.n_packets
+                      ? f->stats.n_packets - ukey->stats.n_packets
+                      : 0);
+    push.n_bytes = (f->stats.n_bytes > ukey->stats.n_bytes
+                    ? f->stats.n_bytes - ukey->stats.n_bytes
+                    : 0);
 
     if (!ukey->flow_exists) {
         /* Don't bother revalidating if the flow was already deleted. */
@@ -1251,7 +1249,7 @@ revalidate_ukey(struct udpif *udpif, struct udpif_key *ukey,
     }
 
     /* We will push the stats, so update the ukey stats cache. */
-    ukey->stats = *stats;
+    ukey->stats = f->stats;
     if (!push.n_packets && !udpif->need_revalidate) {
         ok = true;
         goto exit;
@@ -1298,12 +1296,12 @@ revalidate_ukey(struct udpif *udpif, struct udpif_key *ukey,
         compose_slow_path(udpif, &xout, &flow, odp_in_port, &xout_actions);
     }
 
-    if (actions_len != ofpbuf_size(&xout_actions)
-        || memcmp(ofpbuf_data(&xout_actions), actions, actions_len)) {
+    if (f->actions_len != ofpbuf_size(&xout_actions)
+        || memcmp(ofpbuf_data(&xout_actions), f->actions, f->actions_len)) {
         goto exit;
     }
 
-    if (odp_flow_key_to_mask(mask, mask_len, &dp_mask, &flow)
+    if (odp_flow_key_to_mask(f->mask, f->mask_len, &dp_mask, &flow)
         == ODP_FIT_ERROR) {
         goto exit;
     }
@@ -1436,109 +1434,105 @@ static void
 revalidate(struct revalidator *revalidator)
 {
     struct udpif *udpif = revalidator->udpif;
-
-    struct dump_op ops[REVALIDATE_MAX_BATCH];
-    const struct nlattr *key, *mask, *actions;
-    size_t key_len, mask_len, actions_len;
-    const struct dpif_flow_stats *stats;
-    long long int now;
+    struct dpif_flow_dump_thread *dump_thread;
     unsigned int flow_limit;
-    size_t n_ops;
-    void *state;
 
-    n_ops = 0;
-    now = time_msec();
     atomic_read(&udpif->flow_limit, &flow_limit);
+    dump_thread = dpif_flow_dump_thread_create(udpif->dump);
+    for (;;) {
+        struct dump_op ops[REVALIDATE_MAX_BATCH];
+        int n_ops = 0;
 
-    dpif_flow_dump_state_init(udpif->dpif, &state);
-    while (dpif_flow_dump_next(&udpif->dump, state, &key, &key_len, &mask,
-                               &mask_len, &actions, &actions_len, &stats)) {
-        struct udpif_key *ukey;
-        bool mark, may_destroy;
-        long long int used, max_idle;
-        uint32_t hash;
-        size_t n_flows;
-
-        hash = hash_bytes(key, key_len, udpif->secret);
-        ukey = ukey_lookup(udpif, key, key_len, hash);
-
-        used = stats->used;
-        if (!used && ukey) {
-            ovs_mutex_lock(&ukey->mutex);
-
-            if (ukey->mark || !ukey->flow_exists) {
-                /* The flow has already been dumped. This can occasionally
-                 * occur if the datapath is changed in the middle of a flow
-                 * dump. Rather than perform the same work twice, skip the
-                 * flow this time. */
-                ovs_mutex_unlock(&ukey->mutex);
-                COVERAGE_INC(upcall_duplicate_flow);
-                goto next;
-            }
+        struct dpif_flow flows[REVALIDATE_MAX_BATCH];
+        const struct dpif_flow *f;
+        int n_dumped;
 
-            used = ukey->created;
-            ovs_mutex_unlock(&ukey->mutex);
-        }
+        long long int max_idle;
+        long long int now;
+        size_t n_dp_flows;
+        bool kill_them_all;
 
-        n_flows = udpif_get_n_flows(udpif);
-        max_idle = ofproto_max_idle;
-        if (n_flows > flow_limit) {
-            max_idle = 100;
+        n_dumped = dpif_flow_dump_next(dump_thread, flows, ARRAY_SIZE(flows));
+        if (!n_dumped) {
+            break;
         }
 
-        if ((used && used < now - max_idle) || n_flows > flow_limit * 2) {
-            mark = false;
-        } else {
-            if (!ukey) {
-                ukey = ukey_create(key, key_len, used);
-                if (!udpif_insert_ukey(udpif, ukey, hash)) {
-                    /* The same ukey has already been created. This means that
-                     * another revalidator is processing this flow
-                     * concurrently, so don't bother processing it. */
-                    ukey_delete(NULL, ukey);
-                    goto next;
+        now = time_msec();
+
+        /* In normal operation we want to keep flows around until they have
+         * been idle for 'ofproto_max_idle' milliseconds.  However:
+         *
+         *     - If the number of datapath flows climbs above 'flow_limit',
+         *       drop that down to 100 ms to try to bring the flows down to
+         *       the limit.
+         *
+         *     - If the number of datapath flows climbs above twice
+         *       'flow_limit', delete all the datapath flows as an emergency
+         *       measure.  (We reassess this condition for the next batch of
+         *       datapath flows, so we will recover before all the flows are
+         *       gone.) */
+        n_dp_flows = udpif_get_n_flows(udpif);
+        kill_them_all = n_dp_flows > flow_limit * 2;
+        max_idle = n_dp_flows > flow_limit ? 100 : ofproto_max_idle;
+
+        for (f = flows; f < &flows[n_dumped]; f++) {
+            long long int used = f->stats.used;
+            uint32_t hash = hash_bytes(f->key, f->key_len, udpif->secret);
+            struct udpif_key *ukey = ukey_lookup(udpif, f->key, f->key_len,
+                                                 hash);
+            bool mark;
+
+            if (!used && ukey) {
+                bool already_dumped;
+
+                ovs_mutex_lock(&ukey->mutex);
+                already_dumped = ukey->mark || !ukey->flow_exists;
+                used = ukey->created;
+                ovs_mutex_unlock(&ukey->mutex);
+
+                if (already_dumped) {
+                    /* The flow has already been dumped. This can occasionally
+                     * occur if the datapath is changed in the middle of a flow
+                     * dump. Rather than perform the same work twice, skip the
+                     * flow this time. */
+                    COVERAGE_INC(upcall_duplicate_flow);
+                    continue;
                 }
             }
 
-            mark = revalidate_ukey(udpif, ukey, mask, mask_len, actions,
-                                   actions_len, stats);
-        }
-
-        if (ukey) {
-            ovs_mutex_lock(&ukey->mutex);
-            ukey->mark = ukey->flow_exists = mark;
-            ovs_mutex_unlock(&ukey->mutex);
-        }
+            if (kill_them_all || (used && used < now - max_idle)) {
+                mark = false;
+            } else {
+                if (!ukey) {
+                    ukey = ukey_create(f->key, f->key_len, used);
+                    if (!udpif_insert_ukey(udpif, ukey, hash)) {
+                        /* The same ukey has already been created. This means
+                         * that another revalidator is processing this flow
+                         * concurrently, so don't bother processing it. */
+                        ukey_delete(NULL, ukey);
+                        continue;
+                    }
+                }
 
-        if (!mark) {
-            dump_op_init(&ops[n_ops++], key, key_len, ukey);
-        }
+                mark = revalidate_ukey(udpif, ukey, f);
+            }
 
-    next:
-        may_destroy = dpif_flow_dump_next_may_destroy_keys(&udpif->dump,
-                                                           state);
+            if (ukey) {
+                ovs_mutex_lock(&ukey->mutex);
+                ukey->mark = ukey->flow_exists = mark;
+                ovs_mutex_unlock(&ukey->mutex);
+            }
 
-        /* Only update 'now' immediately before 'buffer' will be updated.
-         * This gives us the current time relative to the time the datapath
-         * will write into 'stats'. */
-        if (may_destroy) {
-            now = time_msec();
+            if (!mark) {
+                dump_op_init(&ops[n_ops++], f->key, f->key_len, ukey);
+            }
         }
 
-        /* Only do a dpif_operate when we've hit our maximum batch, or when our
-         * memory is about to be clobbered by the next call to
-         * dpif_flow_dump_next(). */
-        if (n_ops == REVALIDATE_MAX_BATCH || (n_ops && may_destroy)) {
+        if (n_ops) {
             push_dump_ops__(udpif, ops, n_ops);
-            n_ops = 0;
         }
     }
-
-    if (n_ops) {
-        push_dump_ops__(udpif, ops, n_ops);
-    }
-
-    dpif_flow_dump_state_uninit(udpif->dpif, state);
+    dpif_flow_dump_thread_destroy(dump_thread);
 }
 
 static void