lib/cmap: cmap_find_batch().
authorJarno Rajahalme <jrajahalme@nicira.com>
Wed, 24 Sep 2014 17:39:20 +0000 (10:39 -0700)
committerJarno Rajahalme <jrajahalme@nicira.com>
Mon, 6 Oct 2014 22:33:47 +0000 (15:33 -0700)
Batching the cmap find improves the memory behavior with large cmaps
and can make searches twice as fast:

$ tests/ovstest test-cmap benchmark 2000000 8 0.1 16
Benchmarking with n=2000000, 8 threads, 0.10% mutations, batch size 16:
cmap insert:    533 ms
cmap iterate:    57 ms
batch search:   146 ms
cmap destroy:   233 ms

cmap insert:    552 ms
cmap iterate:    56 ms
cmap search:    299 ms
cmap destroy:   229 ms

hmap insert:    222 ms
hmap iterate:   198 ms
hmap search:   2061 ms
hmap destroy:   209 ms

Batch size 1 has small performance penalty, but all other batch sizes
are faster than non-batched cmap_find().  The batch size 16 was
experimentally found better than 8 or 32, so now
classifier_lookup_miniflow_batch() performs the cmap find operations
in batches of 16.

Signed-off-by: Jarno Rajahalme <jrajahalme@nicira.com>
Acked-by: Ben Pfaff <blp@nicira.com>
lib/bitmap.h
lib/classifier.c
lib/classifier.h
lib/cmap.c
lib/cmap.h
lib/dpif-netdev.c
tests/test-cmap.c

index ace091f..eb3d575 100644 (file)
@@ -114,4 +114,13 @@ bool bitmap_is_all_zeros(const unsigned long *, size_t n);
     for ((IDX) = bitmap_scan(BITMAP, 1, 0, SIZE); (IDX) < (SIZE);    \
          (IDX) = bitmap_scan(BITMAP, 1, (IDX) + 1, SIZE))
 
+/* More efficient access to a map of single ulong. */
+#define ULONG_FOR_EACH_1(IDX, MAP)                  \
+    for (unsigned long map__ = (MAP);               \
+         map__ && (((IDX) = raw_ctz(map__)), true); \
+         map__ = zero_rightmost_1bit(map__))
+
+#define ULONG_SET0(MAP, OFFSET) ((MAP) &= ~(1UL << (OFFSET)))
+#define ULONG_SET1(MAP, OFFSET) ((MAP) |= 1UL << (OFFSET))
+
 #endif /* bitmap.h */
index 3b28e14..a5ffe85 100644 (file)
@@ -980,26 +980,8 @@ miniflow_and_mask_matches_miniflow(const struct miniflow *flow,
     return true;
 }
 
-static inline struct cls_match *
-find_match_miniflow(const struct cls_subtable *subtable,
-                    const struct miniflow *flow,
-                    uint32_t hash)
-{
-    struct cls_match *rule;
-
-    CMAP_FOR_EACH_WITH_HASH (rule, cmap_node, hash, &subtable->rules) {
-        if (miniflow_and_mask_matches_miniflow(&rule->flow, &subtable->mask,
-                                               flow)) {
-            return rule;
-        }
-    }
-
-    return NULL;
-}
-
 /* For each miniflow in 'flows' performs a classifier lookup writing the result
- * into the corresponding slot in 'rules'.  If a particular entry in 'flows' is
- * NULL it is skipped.
+ * into the corresponding slot in 'rules'.
  *
  * This function is optimized for use in the userspace datapath and therefore
  * does not implement a lot of features available in the standard
@@ -1009,37 +991,79 @@ find_match_miniflow(const struct cls_subtable *subtable,
  * Returns true if all flows found a corresponding rule. */
 bool
 classifier_lookup_miniflow_batch(const struct classifier *cls,
-                                 const struct miniflow **flows,
-                                 struct cls_rule **rules, size_t len)
+                                 const struct miniflow *flows[],
+                                 struct cls_rule *rules[], const size_t cnt)
 {
+    /* The batch size 16 was experimentally found faster than 8 or 32. */
+    typedef uint16_t map_type;
+#define MAP_BITS (sizeof(map_type) * CHAR_BIT)
+
     struct cls_subtable *subtable;
-    size_t i, begin = 0;
+    const int n_maps = DIV_ROUND_UP(cnt, MAP_BITS);
+
+#if !defined(__CHECKER__) && !defined(_WIN32)
+    map_type maps[n_maps];
+#else
+    map_type maps[DIV_ROUND_UP(CLASSIFIER_MAX_BATCH, MAP_BITS)];
+    ovs_assert(n_maps <= CLASSIFIER_MAX_BATCH);
+#endif
+    BUILD_ASSERT_DECL(sizeof *maps * CHAR_BIT == MAP_BITS);
+
+    memset(maps, 0xff, sizeof maps);
+    if (cnt % MAP_BITS) {
+        maps[n_maps - 1] >>= MAP_BITS - cnt % MAP_BITS; /* Clear extra bits. */
+    }
+    memset(rules, 0, cnt * sizeof *rules);
 
-    memset(rules, 0, len * sizeof *rules);
     PVECTOR_FOR_EACH (subtable, &cls->subtables) {
-        for (i = begin; i < len; i++) {
-            struct cls_match *match;
-            uint32_t hash;
+        const struct miniflow **mfs = flows;
+        struct cls_rule **results = rules;
+        map_type remains = 0;
+        int m;
 
-            if (OVS_UNLIKELY(rules[i] || !flows[i])) {
-                continue;
-            }
+        BUILD_ASSERT_DECL(sizeof remains == sizeof *maps);
+
+        for (m = 0; m < n_maps; m++, mfs += MAP_BITS, results += MAP_BITS) {
+            uint32_t hashes[MAP_BITS];
+            const struct cmap_node *nodes[MAP_BITS];
+            unsigned long map = maps[m];
+            int i;
 
-            hash = miniflow_hash_in_minimask(flows[i], &subtable->mask, 0);
-            match = find_match_miniflow(subtable, flows[i], hash);
-            if (OVS_UNLIKELY(match)) {
-                rules[i] = match->cls_rule;
+            if (!map) {
+                continue; /* Skip empty ones. */
             }
-        }
 
-        while (begin < len && (rules[begin] || !flows[begin])) {
-            begin++;
+            /* Compute hashes for the unfound flows. */
+            ULONG_FOR_EACH_1(i, map) {
+                hashes[i] = miniflow_hash_in_minimask(mfs[i], &subtable->mask,
+                                                      0);
+            }
+            /* Lookup. */
+            map = cmap_find_batch(&subtable->rules, map, hashes, nodes);
+            /* Check results. */
+            ULONG_FOR_EACH_1(i, map) {
+                struct cls_match *rule;
+
+                CMAP_NODE_FOR_EACH (rule, cmap_node, nodes[i]) {
+                    if (OVS_LIKELY(miniflow_and_mask_matches_miniflow(
+                                       &rule->flow, &subtable->mask,
+                                       mfs[i]))) {
+                        results[i] = rule->cls_rule;
+                        goto next;
+                    }
+                }
+                ULONG_SET0(map, i); /* Did not match. */
+            next:
+                ; /* Keep Sparse happy. */
+            }
+            maps[m] &= ~map; /* Clear the found rules. */
+            remains |= maps[m];
         }
-        if (begin >= len) {
-            return true;
+        if (!remains) {
+            return true; /* All found. */
         }
     }
-
+    /* Some misses. */
     return false;
 }
 
index d6ab144..f75d242 100644 (file)
@@ -297,7 +297,9 @@ struct cls_rule *classifier_lookup(const struct classifier *,
                                    struct flow_wildcards *);
 bool classifier_lookup_miniflow_batch(const struct classifier *cls,
                                       const struct miniflow **flows,
-                                      struct cls_rule **rules, size_t len);
+                                      struct cls_rule **rules,
+                                      const size_t cnt);
+enum { CLASSIFIER_MAX_BATCH = 256 };
 bool classifier_rule_overlaps(const struct classifier *,
                               const struct cls_rule *);
 
index 042cfce..a7a25f1 100644 (file)
@@ -16,6 +16,7 @@
 
 #include <config.h>
 #include "cmap.h"
+#include "bitmap.h"
 #include "hash.h"
 #include "ovs-rcu.h"
 #include "random.h"
@@ -275,15 +276,17 @@ static inline bool
 counter_changed(const struct cmap_bucket *b_, uint32_t c)
 {
     struct cmap_bucket *b = CONST_CAST(struct cmap_bucket *, b_);
+    uint32_t counter;
 
     /* Need to make sure the counter read is not moved up, before the hash and
-     * cmap_node_next().  The atomic_read_explicit() with memory_order_acquire
-     * in read_counter() still allows prior reads to be moved after the
-     * barrier.  atomic_thread_fence prevents all following memory accesses
-     * from moving prior to preceding loads. */
+     * cmap_node_next().  Using atomic_read_explicit with memory_order_acquire
+     * would allow prior reads to be moved after the barrier.
+     * atomic_thread_fence prevents all following memory accesses from moving
+     * prior to preceding loads. */
     atomic_thread_fence(memory_order_acquire);
+    atomic_read_relaxed(&b->counter, &counter);
 
-    return OVS_UNLIKELY(read_counter(b) != c);
+    return OVS_UNLIKELY(counter != c);
 }
 
 static inline const struct cmap_node *
@@ -345,6 +348,92 @@ cmap_find(const struct cmap *cmap, uint32_t hash)
                        hash);
 }
 
+/* Looks up multiple 'hashes', when the corresponding bit in 'map' is 1,
+ * and sets the corresponding pointer in 'nodes', if the hash value was
+ * found from the 'cmap'.  In other cases the 'nodes' values are not changed,
+ * i.e., no NULL pointers are stored there.
+ * Returns a map where a bit is set to 1 if the corresponding 'nodes' pointer
+ * was stored, 0 otherwise.
+ * Generally, the caller wants to use CMAP_NODE_FOR_EACH to verify for
+ * hash collisions. */
+unsigned long
+cmap_find_batch(const struct cmap *cmap, unsigned long map,
+                uint32_t hashes[], const struct cmap_node *nodes[])
+{
+    const struct cmap_impl *impl = cmap_get_impl(cmap);
+    unsigned long result = map;
+    int i;
+    uint32_t h1s[sizeof map * CHAR_BIT];
+    const struct cmap_bucket *b1s[sizeof map * CHAR_BIT];
+    const struct cmap_bucket *b2s[sizeof map * CHAR_BIT];
+    uint32_t c1s[sizeof map * CHAR_BIT];
+
+    /* Compute hashes and prefetch 1st buckets. */
+    ULONG_FOR_EACH_1(i, map) {
+        h1s[i] = rehash(impl, hashes[i]);
+        b1s[i] = &impl->buckets[h1s[i] & impl->mask];
+        OVS_PREFETCH(b1s[i]);
+    }
+    /* Lookups, Round 1. Only look up at the first bucket. */
+    ULONG_FOR_EACH_1(i, map) {
+        uint32_t c1;
+        const struct cmap_bucket *b1 = b1s[i];
+        const struct cmap_node *node;
+
+        do {
+            c1 = read_even_counter(b1);
+            node = cmap_find_in_bucket(b1, hashes[i]);
+        } while (OVS_UNLIKELY(counter_changed(b1, c1)));
+
+        if (!node) {
+            /* Not found (yet); Prefetch the 2nd bucket. */
+            b2s[i] = &impl->buckets[other_hash(h1s[i]) & impl->mask];
+            OVS_PREFETCH(b2s[i]);
+            c1s[i] = c1; /* We may need to check this after Round 2. */
+            continue;
+        }
+        /* Found. */
+        ULONG_SET0(map, i); /* Ignore this on round 2. */
+        OVS_PREFETCH(node);
+        nodes[i] = node;
+    }
+    /* Round 2. Look into the 2nd bucket, if needed. */
+    ULONG_FOR_EACH_1(i, map) {
+        uint32_t c2;
+        const struct cmap_bucket *b2 = b2s[i];
+        const struct cmap_node *node;
+
+        do {
+            c2 = read_even_counter(b2);
+            node = cmap_find_in_bucket(b2, hashes[i]);
+        } while (OVS_UNLIKELY(counter_changed(b2, c2)));
+
+        if (!node) {
+            /* Not found, but the node may have been moved from b2 to b1 right
+             * after we finished with b1 earlier.  We just got a clean reading
+             * of the 2nd bucket, so we check the counter of the 1st bucket
+             * only.  However, we need to check both buckets again, as the
+             * entry may be moved again to the 2nd bucket.  Basically, we
+             * need to loop as long as it takes to get stable readings of
+             * both buckets.  cmap_find__() does that, and now that we have
+             * fetched both buckets we can just use it. */
+            if (OVS_UNLIKELY(counter_changed(b1s[i], c1s[i]))) {
+                node = cmap_find__(b1s[i], b2s[i], hashes[i]);
+                if (node) {
+                    goto found;
+                }
+            }
+            /* Not found. */
+            ULONG_SET0(result, i); /* Fix the result. */
+            continue;
+        }
+found:
+        OVS_PREFETCH(node);
+        nodes[i] = node;
+    }
+    return result;
+}
+
 static int
 cmap_find_slot_protected(struct cmap_bucket *b, uint32_t hash)
 {
index 485014c..c653424 100644 (file)
@@ -106,6 +106,14 @@ size_t cmap_replace(struct cmap *, struct cmap_node *old_node,
  * Thread-safety
  * =============
  *
+ * CMAP_NODE_FOR_EACH will reliably visit each of the nodes starting with
+ * CMAP_NODE, even with concurrent insertions and deletions.  (Of
+ * course, if nodes are being inserted or deleted, it might or might not visit
+ * the nodes actually being inserted or deleted.)
+ *
+ * CMAP_NODE_FOR_EACH_PROTECTED may only be used if the containing CMAP is
+ * guaranteed not to change during iteration.  It may be only slightly faster.
+ *
  * CMAP_FOR_EACH_WITH_HASH will reliably visit each of the nodes with the
  * specified hash in CMAP, even with concurrent insertions and deletions.  (Of
  * course, if nodes with the given HASH are being inserted or deleted, it might
@@ -114,19 +122,35 @@ size_t cmap_replace(struct cmap *, struct cmap_node *old_node,
  * CMAP_FOR_EACH_WITH_HASH_PROTECTED may only be used if CMAP is guaranteed not
  * to change during iteration.  It may be very slightly faster.
  */
-#define CMAP_FOR_EACH_WITH_HASH(NODE, MEMBER, HASH, CMAP)       \
-    for (INIT_CONTAINER(NODE, cmap_find(CMAP, HASH), MEMBER);   \
-         (NODE) != OBJECT_CONTAINING(NULL, NODE, MEMBER);       \
+#define CMAP_NODE_FOR_EACH(NODE, MEMBER, CMAP_NODE)                     \
+    for (INIT_CONTAINER(NODE, CMAP_NODE, MEMBER);                       \
+         (NODE) != OBJECT_CONTAINING(NULL, NODE, MEMBER);               \
          ASSIGN_CONTAINER(NODE, cmap_node_next(&(NODE)->MEMBER), MEMBER))
-#define CMAP_FOR_EACH_WITH_HASH_PROTECTED(NODE, MEMBER, HASH, CMAP)        \
-    for (INIT_CONTAINER(NODE, cmap_find_locked(CMAP, HASH), MEMBER);    \
+#define CMAP_NODE_FOR_EACH_PROTECTED(NODE, MEMBER, CMAP_NODE)           \
+    for (INIT_CONTAINER(NODE, CMAP_NODE, MEMBER);                       \
          (NODE) != OBJECT_CONTAINING(NULL, NODE, MEMBER);               \
          ASSIGN_CONTAINER(NODE, cmap_node_next_protected(&(NODE)->MEMBER), \
                           MEMBER))
+#define CMAP_FOR_EACH_WITH_HASH(NODE, MEMBER, HASH, CMAP)   \
+    CMAP_NODE_FOR_EACH(NODE, MEMBER, cmap_find(CMAP, HASH))
+#define CMAP_FOR_EACH_WITH_HASH_PROTECTED(NODE, MEMBER, HASH, CMAP)     \
+    CMAP_NODE_FOR_EACH_PROTECTED(NODE, MEMBER, cmap_find_locked(CMAP, HASH))
 
 const struct cmap_node *cmap_find(const struct cmap *, uint32_t hash);
 struct cmap_node *cmap_find_protected(const struct cmap *, uint32_t hash);
 
+/* Looks up multiple 'hashes', when the corresponding bit in 'map' is 1,
+ * and sets the corresponding pointer in 'nodes', if the hash value was
+ * found from the 'cmap'.  In other cases the 'nodes' values are not changed,
+ * i.e., no NULL pointers are stored there.
+ * Returns a map where a bit is set to 1 if the corresponding 'nodes' pointer
+ * was stored, 0 otherwise.
+ * Generally, the caller wants to use CMAP_NODE_FOR_EACH to verify for
+ * hash collisions. */
+unsigned long cmap_find_batch(const struct cmap *cmap, unsigned long map,
+                              uint32_t hashes[],
+                              const struct cmap_node *nodes[]);
+
 /* Iteration.
  *
  *
index 3a1ecc6..bd1fb9d 100644 (file)
@@ -2644,7 +2644,7 @@ fast_path_processing(struct dp_netdev_pmd_thread *pmd,
     enum { PKT_ARRAY_SIZE = NETDEV_MAX_RX_BATCH };
 #endif
     struct packet_batch batches[PKT_ARRAY_SIZE];
-    const struct miniflow *mfs[PKT_ARRAY_SIZE]; /* NULL at bad packets. */
+    const struct miniflow *mfs[PKT_ARRAY_SIZE]; /* May NOT be NULL. */
     struct cls_rule *rules[PKT_ARRAY_SIZE];
     struct dp_netdev *dp = pmd->dp;
     struct emc_cache *flow_cache = &pmd->flow_cache;
@@ -2652,7 +2652,7 @@ fast_path_processing(struct dp_netdev_pmd_thread *pmd,
     bool any_miss;
 
     for (i = 0; i < cnt; i++) {
-        mfs[i] = &keys[i].flow;
+        mfs[i] = &keys[i].flow; /* No bad packets! */
     }
     any_miss = !classifier_lookup_miniflow_batch(&dp->cls, mfs, rules, cnt);
     if (OVS_UNLIKELY(any_miss) && !fat_rwlock_tryrdlock(&dp->upcall_rwlock)) {
index 3838e99..2976d5f 100644 (file)
@@ -18,6 +18,7 @@
  * cmap.h. */
 
 #include <config.h>
+#include "bitmap.h"
 #include "cmap.h"
 #include <getopt.h>
 #include <string.h>
@@ -57,7 +58,7 @@ check_cmap(struct cmap *cmap, const int values[], size_t n,
 {
     int *sort_values, *cmap_values, *cmap_values2;
     const struct element *e;
-    size_t i;
+    size_t i, batch_size;
 
     struct cmap_position pos = { 0, 0, 0 };
     struct cmap_node *node;
@@ -110,6 +111,33 @@ check_cmap(struct cmap *cmap, const int values[], size_t n,
         assert(count == 1);
     }
 
+    /* Check that all the values are there in batched lookup. */
+    batch_size = n % BITMAP_ULONG_BITS + 1;
+    for (i = 0; i < n; i += batch_size) {
+        unsigned long map;
+        uint32_t hashes[sizeof map * CHAR_BIT];
+        const struct cmap_node *nodes[sizeof map * CHAR_BIT];
+        size_t count = 0;
+        int k, j;
+
+        j = MIN(n - i, batch_size); /* Actual batch size. */
+        map = ~0UL >> (BITMAP_ULONG_BITS - j);
+
+        for (k = 0; k < j; k++) {
+            hashes[k] = hash(values[i + k]);
+        }
+        map = cmap_find_batch(cmap, map, hashes, nodes);
+
+        ULONG_FOR_EACH_1(k, map) {
+            struct element *e;
+
+            CMAP_NODE_FOR_EACH (e, node, nodes[k]) {
+                count += e->value == values[i + k];
+            }
+        }
+        assert(count == j); /* j elements in a batch. */
+    }
+
     /* Check that cmap_first() returns NULL only when cmap_is_empty(). */
     assert(!cmap_first(cmap) == cmap_is_empty(cmap));
 
@@ -237,8 +265,12 @@ run_tests(int argc, char *argv[])
 static int n_elems;             /* Number of elements to insert. */
 static int n_threads;           /* Number of threads to search and mutate. */
 static uint32_t mutation_frac;  /* % mutations, as fraction of UINT32_MAX. */
+static int n_batch;             /* Number of elements in each batch. */
+
+#define N_BATCH_MAX BITMAP_ULONG_BITS
 
 static void benchmark_cmap(void);
+static void benchmark_cmap_batched(void);
 static void benchmark_hmap(void);
 
 static int
@@ -251,15 +283,24 @@ elapsed(const struct timeval *start)
 }
 
 static void
-run_benchmarks(int argc OVS_UNUSED, char *argv[] OVS_UNUSED)
+run_benchmarks(int argc, char *argv[] OVS_UNUSED)
 {
     n_elems = strtol(argv[1], NULL, 10);
     n_threads = strtol(argv[2], NULL, 10);
     mutation_frac = strtod(argv[3], NULL) / 100.0 * UINT32_MAX;
+    n_batch = argc > 4 ? strtol(argv[4], NULL, 10) : 1;
 
-    printf("Benchmarking with n=%d, %d threads, %.2f%% mutations:\n",
-           n_elems, n_threads, (double) mutation_frac / UINT32_MAX * 100.);
+    if (n_batch > N_BATCH_MAX) {
+        n_batch = N_BATCH_MAX;
+    }
+    printf("Benchmarking with n=%d, %d threads, %.2f%% mutations, batch size %d:\n",
+           n_elems, n_threads, (double) mutation_frac / UINT32_MAX * 100.,
+           n_batch);
 
+    if (n_batch > 0) {
+        benchmark_cmap_batched();
+    }
+    putchar('\n');
     benchmark_cmap();
     putchar('\n');
     benchmark_hmap();
@@ -367,6 +408,127 @@ benchmark_cmap(void)
 
     free(elements);
 }
+
+static size_t
+find_batch(const struct cmap *cmap, const int value)
+{
+    size_t i, ret;
+    const size_t end = MIN(n_batch, n_elems - value);
+    unsigned long map = ~0;
+    uint32_t hashes[N_BATCH_MAX];
+    const struct cmap_node *nodes[N_BATCH_MAX];
+
+    if (mutation_frac) {
+        for (i = 0; i < end; i++) {
+            if (random_uint32() < mutation_frac) {
+                break;
+            }
+            hashes[i] = hash_int(value + i, 0);
+        }
+    } else {
+        for (i = 0; i < end; i++) {
+            hashes[i] = hash_int(value + i, 0);
+        }
+    }
+
+    ret = i;
+
+    map >>= BITMAP_ULONG_BITS - i; /* Clear excess bits. */
+    map = cmap_find_batch(cmap, map, hashes, nodes);
+
+    ULONG_FOR_EACH_1(i, map) {
+        struct element *e;
+
+        CMAP_NODE_FOR_EACH (e, node, nodes[i]) {
+            if (OVS_LIKELY(e->value == value + i)) {
+                ignore(e); /* Found result. */
+                break;
+            }
+        }
+    }
+    return ret;
+}
+
+static void *
+search_cmap_batched(void *aux_)
+{
+    struct cmap_aux *aux = aux_;
+    size_t i = 0, j;
+
+    for (;;) {
+        struct element *e;
+
+        j = find_batch(aux->cmap, i);
+        i += j;
+        if (i >= n_elems) {
+            break;
+        }
+        if (j < n_batch) {
+            ovs_mutex_lock(&aux->mutex);
+            e = find(aux->cmap, i);
+            if (e) {
+                cmap_remove(aux->cmap, &e->node, hash_int(e->value, 0));
+            }
+            ovs_mutex_unlock(&aux->mutex);
+        }
+    }
+
+    return NULL;
+}
+
+static void
+benchmark_cmap_batched(void)
+{
+    struct element *elements;
+    struct cmap cmap;
+    struct element *e;
+    struct timeval start;
+    pthread_t *threads;
+    struct cmap_aux aux;
+    size_t i;
+
+    elements = xmalloc(n_elems * sizeof *elements);
+
+    /* Insertions. */
+    xgettimeofday(&start);
+    cmap_init(&cmap);
+    for (i = 0; i < n_elems; i++) {
+        elements[i].value = i;
+        cmap_insert(&cmap, &elements[i].node, hash_int(i, 0));
+    }
+    printf("cmap insert:  %5d ms\n", elapsed(&start));
+
+    /* Iteration. */
+    xgettimeofday(&start);
+    CMAP_FOR_EACH (e, node, &cmap) {
+        ignore(e);
+    }
+    printf("cmap iterate: %5d ms\n", elapsed(&start));
+
+    /* Search and mutation. */
+    xgettimeofday(&start);
+    aux.cmap = &cmap;
+    ovs_mutex_init(&aux.mutex);
+    threads = xmalloc(n_threads * sizeof *threads);
+    for (i = 0; i < n_threads; i++) {
+        threads[i] = ovs_thread_create("search", search_cmap_batched, &aux);
+    }
+    for (i = 0; i < n_threads; i++) {
+        xpthread_join(threads[i], NULL);
+    }
+    free(threads);
+    printf("batch search: %5d ms\n", elapsed(&start));
+
+    /* Destruction. */
+    xgettimeofday(&start);
+    CMAP_FOR_EACH (e, node, &cmap) {
+        cmap_remove(&cmap, &e->node, hash_int(e->value, 0));
+    }
+    cmap_destroy(&cmap);
+    printf("cmap destroy: %5d ms\n", elapsed(&start));
+
+    free(elements);
+}
 \f
 /* hmap benchmark. */
 struct helement {
@@ -477,7 +639,7 @@ benchmark_hmap(void)
 \f
 static const struct command commands[] = {
     {"check", 0, 1, run_tests},
-    {"benchmark", 3, 3, run_benchmarks},
+    {"benchmark", 3, 4, run_benchmarks},
     {NULL, 0, 0, NULL},
 };