9b8f8a043570ef2df414eb32f3d12f04aba497d7
[cascardo/ovs.git] / ofproto / ofproto-dpif-upcall.c
1 /* Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014 Nicira, Inc.
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.  */
14
15 #include <config.h>
16 #include "ofproto-dpif-upcall.h"
17
18 #include <errno.h>
19 #include <stdbool.h>
20 #include <inttypes.h>
21
22 #include "connmgr.h"
23 #include "coverage.h"
24 #include "cmap.h"
25 #include "dpif.h"
26 #include "dynamic-string.h"
27 #include "fail-open.h"
28 #include "guarded-list.h"
29 #include "latch.h"
30 #include "list.h"
31 #include "netlink.h"
32 #include "ofpbuf.h"
33 #include "ofproto-dpif-ipfix.h"
34 #include "ofproto-dpif-sflow.h"
35 #include "ofproto-dpif-xlate.h"
36 #include "ovs-rcu.h"
37 #include "packets.h"
38 #include "poll-loop.h"
39 #include "seq.h"
40 #include "unixctl.h"
41 #include "vlog.h"
42
43 #define MAX_QUEUE_LENGTH 512
44 #define UPCALL_MAX_BATCH 64
45 #define REVALIDATE_MAX_BATCH 50
46
47 VLOG_DEFINE_THIS_MODULE(ofproto_dpif_upcall);
48
49 COVERAGE_DEFINE(dumped_duplicate_flow);
50 COVERAGE_DEFINE(dumped_new_flow);
51 COVERAGE_DEFINE(handler_duplicate_upcall);
52 COVERAGE_DEFINE(upcall_ukey_contention);
53 COVERAGE_DEFINE(revalidate_missed_dp_flow);
54
55 /* A thread that reads upcalls from dpif, forwards each upcall's packet,
56  * and possibly sets up a kernel flow as a cache. */
57 struct handler {
58     struct udpif *udpif;               /* Parent udpif. */
59     pthread_t thread;                  /* Thread ID. */
60     uint32_t handler_id;               /* Handler id. */
61 };
62
63 /* In the absence of a multiple-writer multiple-reader datastructure for
64  * storing ukeys, we use a large number of cmaps, each with its own lock for
65  * writing. */
66 #define N_UMAPS 512 /* per udpif. */
67 struct umap {
68     struct ovs_mutex mutex;            /* Take for writing to the following. */
69     struct cmap cmap;                  /* Datapath flow keys. */
70 };
71
72 /* A thread that processes datapath flows, updates OpenFlow statistics, and
73  * updates or removes them if necessary. */
74 struct revalidator {
75     struct udpif *udpif;               /* Parent udpif. */
76     pthread_t thread;                  /* Thread ID. */
77     unsigned int id;                   /* ovsthread_id_self(). */
78 };
79
80 /* An upcall handler for ofproto_dpif.
81  *
82  * udpif keeps records of two kind of logically separate units:
83  *
84  * upcall handling
85  * ---------------
86  *
87  *    - An array of 'struct handler's for upcall handling and flow
88  *      installation.
89  *
90  * flow revalidation
91  * -----------------
92  *
93  *    - Revalidation threads which read the datapath flow table and maintains
94  *      them.
95  */
96 struct udpif {
97     struct list list_node;             /* In all_udpifs list. */
98
99     struct dpif *dpif;                 /* Datapath handle. */
100     struct dpif_backer *backer;        /* Opaque dpif_backer pointer. */
101
102     uint32_t secret;                   /* Random seed for upcall hash. */
103
104     struct handler *handlers;          /* Upcall handlers. */
105     size_t n_handlers;
106
107     struct revalidator *revalidators;  /* Flow revalidators. */
108     size_t n_revalidators;
109
110     struct latch exit_latch;           /* Tells child threads to exit. */
111
112     /* Revalidation. */
113     struct seq *reval_seq;             /* Incremented to force revalidation. */
114     bool reval_exit;                   /* Set by leader on 'exit_latch. */
115     struct ovs_barrier reval_barrier;  /* Barrier used by revalidators. */
116     struct dpif_flow_dump *dump;       /* DPIF flow dump state. */
117     long long int dump_duration;       /* Duration of the last flow dump. */
118     struct seq *dump_seq;              /* Increments each dump iteration. */
119
120     /* There are 'N_UMAPS' maps containing 'struct udpif_key' elements.
121      *
122      * During the flow dump phase, revalidators insert into these with a random
123      * distribution. During the garbage collection phase, each revalidator
124      * takes care of garbage collecting a slice of these maps. */
125     struct umap *ukeys;
126
127     /* Datapath flow statistics. */
128     unsigned int max_n_flows;
129     unsigned int avg_n_flows;
130
131     /* Following fields are accessed and modified by different threads. */
132     atomic_uint flow_limit;            /* Datapath flow hard limit. */
133
134     /* n_flows_mutex prevents multiple threads updating these concurrently. */
135     atomic_uint n_flows;               /* Number of flows in the datapath. */
136     atomic_llong n_flows_timestamp;    /* Last time n_flows was updated. */
137     struct ovs_mutex n_flows_mutex;
138
139     /* Following fields are accessed and modified only from the main thread. */
140     struct unixctl_conn **conns;       /* Connections waiting on dump_seq. */
141     uint64_t conn_seq;                 /* Corresponds to 'dump_seq' when
142                                           conns[n_conns-1] was stored. */
143     size_t n_conns;                    /* Number of connections waiting. */
144 };
145
146 enum upcall_type {
147     BAD_UPCALL,                 /* Some kind of bug somewhere. */
148     MISS_UPCALL,                /* A flow miss.  */
149     SFLOW_UPCALL,               /* sFlow sample. */
150     FLOW_SAMPLE_UPCALL,         /* Per-flow sampling. */
151     IPFIX_UPCALL                /* Per-bridge sampling. */
152 };
153
154 struct upcall {
155     struct ofproto_dpif *ofproto;  /* Parent ofproto. */
156
157     /* The flow and packet are only required to be constant when using
158      * dpif-netdev.  If a modification is absolutely necessary, a const cast
159      * may be used with other datapaths. */
160     const struct flow *flow;       /* Parsed representation of the packet. */
161     const struct ofpbuf *packet;   /* Packet associated with this upcall. */
162     ofp_port_t in_port;            /* OpenFlow in port, or OFPP_NONE. */
163
164     enum dpif_upcall_type type;    /* Datapath type of the upcall. */
165     const struct nlattr *userdata; /* Userdata for DPIF_UC_ACTION Upcalls. */
166
167     bool xout_initialized;         /* True if 'xout' must be uninitialized. */
168     struct xlate_out xout;         /* Result of xlate_actions(). */
169     struct ofpbuf put_actions;     /* Actions 'put' in the fastapath. */
170
171     struct dpif_ipfix *ipfix;      /* IPFIX pointer or NULL. */
172     struct dpif_sflow *sflow;      /* SFlow pointer or NULL. */
173
174     bool vsp_adjusted;             /* 'packet' and 'flow' were adjusted for
175                                       VLAN splinters if true. */
176
177     struct udpif_key *ukey;        /* Revalidator flow cache. */
178     bool ukey_persists;            /* Set true to keep 'ukey' beyond the
179                                       lifetime of this upcall. */
180
181     uint64_t dump_seq;             /* udpif->dump_seq at translation time. */
182     uint64_t reval_seq;            /* udpif->reval_seq at translation time. */
183
184     /* Not used by the upcall callback interface. */
185     const struct nlattr *key;      /* Datapath flow key. */
186     size_t key_len;                /* Datapath flow key length. */
187     const struct nlattr *out_tun_key;  /* Datapath output tunnel key. */
188 };
189
190 /* 'udpif_key's are responsible for tracking the little bit of state udpif
191  * needs to do flow expiration which can't be pulled directly from the
192  * datapath.  They may be created by any handler or revalidator thread at any
193  * time, and read by any revalidator during the dump phase. They are however
194  * each owned by a single revalidator which takes care of destroying them
195  * during the garbage-collection phase.
196  *
197  * The mutex within the ukey protects some members of the ukey. The ukey
198  * itself is protected by RCU and is held within a umap in the parent udpif.
199  * Adding or removing a ukey from a umap is only safe when holding the
200  * corresponding umap lock. */
201 struct udpif_key {
202     struct cmap_node cmap_node;     /* In parent revalidator 'ukeys' map. */
203
204     /* These elements are read only once created, and therefore aren't
205      * protected by a mutex. */
206     const struct nlattr *key;      /* Datapath flow key. */
207     size_t key_len;                /* Length of 'key'. */
208     const struct nlattr *mask;     /* Datapath flow mask. */
209     size_t mask_len;               /* Length of 'mask'. */
210     struct ofpbuf *actions;        /* Datapath flow actions as nlattrs. */
211     uint32_t hash;                 /* Pre-computed hash for 'key'. */
212
213     struct ovs_mutex mutex;                   /* Guards the following. */
214     struct dpif_flow_stats stats OVS_GUARDED; /* Last known stats.*/
215     long long int created OVS_GUARDED;        /* Estimate of creation time. */
216     uint64_t dump_seq OVS_GUARDED;            /* Tracks udpif->dump_seq. */
217     uint64_t reval_seq OVS_GUARDED;           /* Tracks udpif->reval_seq. */
218     bool flow_exists OVS_GUARDED;             /* Ensures flows are only deleted
219                                                  once. */
220
221     struct xlate_cache *xcache OVS_GUARDED;   /* Cache for xlate entries that
222                                                * are affected by this ukey.
223                                                * Used for stats and learning.*/
224     union {
225         struct odputil_keybuf buf;
226         struct nlattr nla;
227     } keybuf, maskbuf;
228 };
229
230 /* Datapath operation with optional ukey attached. */
231 struct ukey_op {
232     struct udpif_key *ukey;
233     struct dpif_flow_stats stats; /* Stats for 'op'. */
234     struct dpif_op dop;           /* Flow operation. */
235 };
236
237 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
238 static struct list all_udpifs = LIST_INITIALIZER(&all_udpifs);
239
240 static size_t recv_upcalls(struct handler *);
241 static int process_upcall(struct udpif *, struct upcall *,
242                           struct ofpbuf *odp_actions);
243 static void handle_upcalls(struct udpif *, struct upcall *, size_t n_upcalls);
244 static void udpif_stop_threads(struct udpif *);
245 static void udpif_start_threads(struct udpif *, size_t n_handlers,
246                                 size_t n_revalidators);
247 static void *udpif_upcall_handler(void *);
248 static void *udpif_revalidator(void *);
249 static unsigned long udpif_get_n_flows(struct udpif *);
250 static void revalidate(struct revalidator *);
251 static void revalidator_sweep(struct revalidator *);
252 static void revalidator_purge(struct revalidator *);
253 static void upcall_unixctl_show(struct unixctl_conn *conn, int argc,
254                                 const char *argv[], void *aux);
255 static void upcall_unixctl_disable_megaflows(struct unixctl_conn *, int argc,
256                                              const char *argv[], void *aux);
257 static void upcall_unixctl_enable_megaflows(struct unixctl_conn *, int argc,
258                                             const char *argv[], void *aux);
259 static void upcall_unixctl_set_flow_limit(struct unixctl_conn *conn, int argc,
260                                             const char *argv[], void *aux);
261 static void upcall_unixctl_dump_wait(struct unixctl_conn *conn, int argc,
262                                      const char *argv[], void *aux);
263 static void upcall_unixctl_purge(struct unixctl_conn *conn, int argc,
264                                  const char *argv[], void *aux);
265
266 static struct udpif_key *ukey_create_from_upcall(const struct udpif *,
267                                                  const struct upcall *);
268 static struct udpif_key *ukey_create_from_dpif_flow(const struct udpif *,
269                                                     const struct dpif_flow *);
270 static bool ukey_install_start(struct udpif *, struct udpif_key *ukey);
271 static bool ukey_install_finish(struct udpif_key *ukey, int error);
272 static bool ukey_install(struct udpif *udpif, struct udpif_key *ukey);
273 static struct udpif_key *ukey_lookup(struct udpif *udpif, uint32_t hash,
274                                      const struct nlattr *key, size_t key_len);
275 static int ukey_acquire(struct udpif *, const struct dpif_flow *,
276                         struct udpif_key **result);
277 static void ukey_delete__(struct udpif_key *);
278 static void ukey_delete(struct umap *, struct udpif_key *);
279 static enum upcall_type classify_upcall(enum dpif_upcall_type type,
280                                         const struct nlattr *userdata);
281
282 static int upcall_receive(struct upcall *, const struct dpif_backer *,
283                           const struct ofpbuf *packet, enum dpif_upcall_type,
284                           const struct nlattr *userdata, const struct flow *);
285 static void upcall_uninit(struct upcall *);
286
287 static upcall_callback upcall_cb;
288
289 static atomic_bool enable_megaflows = ATOMIC_VAR_INIT(true);
290
291 struct udpif *
292 udpif_create(struct dpif_backer *backer, struct dpif *dpif)
293 {
294     static struct ovsthread_once once = OVSTHREAD_ONCE_INITIALIZER;
295     struct udpif *udpif = xzalloc(sizeof *udpif);
296
297     if (ovsthread_once_start(&once)) {
298         unixctl_command_register("upcall/show", "", 0, 0, upcall_unixctl_show,
299                                  NULL);
300         unixctl_command_register("upcall/disable-megaflows", "", 0, 0,
301                                  upcall_unixctl_disable_megaflows, NULL);
302         unixctl_command_register("upcall/enable-megaflows", "", 0, 0,
303                                  upcall_unixctl_enable_megaflows, NULL);
304         unixctl_command_register("upcall/set-flow-limit", "", 1, 1,
305                                  upcall_unixctl_set_flow_limit, NULL);
306         unixctl_command_register("revalidator/wait", "", 0, 0,
307                                  upcall_unixctl_dump_wait, NULL);
308         unixctl_command_register("revalidator/purge", "", 0, 0,
309                                  upcall_unixctl_purge, NULL);
310         ovsthread_once_done(&once);
311     }
312
313     udpif->dpif = dpif;
314     udpif->backer = backer;
315     atomic_init(&udpif->flow_limit, MIN(ofproto_flow_limit, 10000));
316     udpif->secret = random_uint32();
317     udpif->reval_seq = seq_create();
318     udpif->dump_seq = seq_create();
319     latch_init(&udpif->exit_latch);
320     list_push_back(&all_udpifs, &udpif->list_node);
321     atomic_init(&udpif->n_flows, 0);
322     atomic_init(&udpif->n_flows_timestamp, LLONG_MIN);
323     ovs_mutex_init(&udpif->n_flows_mutex);
324     udpif->ukeys = xmalloc(N_UMAPS * sizeof *udpif->ukeys);
325     for (int i = 0; i < N_UMAPS; i++) {
326         cmap_init(&udpif->ukeys[i].cmap);
327         ovs_mutex_init(&udpif->ukeys[i].mutex);
328     }
329
330     dpif_register_upcall_cb(dpif, upcall_cb, udpif);
331
332     return udpif;
333 }
334
335 void
336 udpif_run(struct udpif *udpif)
337 {
338     if (udpif->conns && udpif->conn_seq != seq_read(udpif->dump_seq)) {
339         int i;
340
341         for (i = 0; i < udpif->n_conns; i++) {
342             unixctl_command_reply(udpif->conns[i], NULL);
343         }
344         free(udpif->conns);
345         udpif->conns = NULL;
346         udpif->n_conns = 0;
347     }
348 }
349
350 void
351 udpif_destroy(struct udpif *udpif)
352 {
353     udpif_stop_threads(udpif);
354
355     for (int i = 0; i < N_UMAPS; i++) {
356         cmap_destroy(&udpif->ukeys[i].cmap);
357         ovs_mutex_destroy(&udpif->ukeys[i].mutex);
358     }
359     free(udpif->ukeys);
360     udpif->ukeys = NULL;
361
362     list_remove(&udpif->list_node);
363     latch_destroy(&udpif->exit_latch);
364     seq_destroy(udpif->reval_seq);
365     seq_destroy(udpif->dump_seq);
366     ovs_mutex_destroy(&udpif->n_flows_mutex);
367     free(udpif);
368 }
369
370 /* Stops the handler and revalidator threads, must be enclosed in
371  * ovsrcu quiescent state unless when destroying udpif. */
372 static void
373 udpif_stop_threads(struct udpif *udpif)
374 {
375     if (udpif && (udpif->n_handlers != 0 || udpif->n_revalidators != 0)) {
376         size_t i;
377
378         latch_set(&udpif->exit_latch);
379
380         for (i = 0; i < udpif->n_handlers; i++) {
381             struct handler *handler = &udpif->handlers[i];
382
383             xpthread_join(handler->thread, NULL);
384         }
385
386         for (i = 0; i < udpif->n_revalidators; i++) {
387             xpthread_join(udpif->revalidators[i].thread, NULL);
388         }
389
390         dpif_disable_upcall(udpif->dpif);
391
392         for (i = 0; i < udpif->n_revalidators; i++) {
393             struct revalidator *revalidator = &udpif->revalidators[i];
394
395             /* Delete ukeys, and delete all flows from the datapath to prevent
396              * double-counting stats. */
397             revalidator_purge(revalidator);
398         }
399
400         latch_poll(&udpif->exit_latch);
401
402         ovs_barrier_destroy(&udpif->reval_barrier);
403
404         free(udpif->revalidators);
405         udpif->revalidators = NULL;
406         udpif->n_revalidators = 0;
407
408         free(udpif->handlers);
409         udpif->handlers = NULL;
410         udpif->n_handlers = 0;
411     }
412 }
413
414 /* Starts the handler and revalidator threads, must be enclosed in
415  * ovsrcu quiescent state. */
416 static void
417 udpif_start_threads(struct udpif *udpif, size_t n_handlers,
418                     size_t n_revalidators)
419 {
420     if (udpif && n_handlers && n_revalidators) {
421         size_t i;
422
423         udpif->n_handlers = n_handlers;
424         udpif->n_revalidators = n_revalidators;
425
426         udpif->handlers = xzalloc(udpif->n_handlers * sizeof *udpif->handlers);
427         for (i = 0; i < udpif->n_handlers; i++) {
428             struct handler *handler = &udpif->handlers[i];
429
430             handler->udpif = udpif;
431             handler->handler_id = i;
432             handler->thread = ovs_thread_create(
433                 "handler", udpif_upcall_handler, handler);
434         }
435
436         dpif_enable_upcall(udpif->dpif);
437
438         ovs_barrier_init(&udpif->reval_barrier, udpif->n_revalidators);
439         udpif->reval_exit = false;
440         udpif->revalidators = xzalloc(udpif->n_revalidators
441                                       * sizeof *udpif->revalidators);
442         for (i = 0; i < udpif->n_revalidators; i++) {
443             struct revalidator *revalidator = &udpif->revalidators[i];
444
445             revalidator->udpif = udpif;
446             revalidator->thread = ovs_thread_create(
447                 "revalidator", udpif_revalidator, revalidator);
448         }
449     }
450 }
451
452 /* Tells 'udpif' how many threads it should use to handle upcalls.
453  * 'n_handlers' and 'n_revalidators' can never be zero.  'udpif''s
454  * datapath handle must have packet reception enabled before starting
455  * threads. */
456 void
457 udpif_set_threads(struct udpif *udpif, size_t n_handlers,
458                   size_t n_revalidators)
459 {
460     ovs_assert(udpif);
461     ovs_assert(n_handlers && n_revalidators);
462
463     ovsrcu_quiesce_start();
464     if (udpif->n_handlers != n_handlers
465         || udpif->n_revalidators != n_revalidators) {
466         udpif_stop_threads(udpif);
467     }
468
469     if (!udpif->handlers && !udpif->revalidators) {
470         int error;
471
472         error = dpif_handlers_set(udpif->dpif, n_handlers);
473         if (error) {
474             VLOG_ERR("failed to configure handlers in dpif %s: %s",
475                      dpif_name(udpif->dpif), ovs_strerror(error));
476             return;
477         }
478
479         udpif_start_threads(udpif, n_handlers, n_revalidators);
480     }
481     ovsrcu_quiesce_end();
482 }
483
484 /* Waits for all ongoing upcall translations to complete.  This ensures that
485  * there are no transient references to any removed ofprotos (or other
486  * objects).  In particular, this should be called after an ofproto is removed
487  * (e.g. via xlate_remove_ofproto()) but before it is destroyed. */
488 void
489 udpif_synchronize(struct udpif *udpif)
490 {
491     /* This is stronger than necessary.  It would be sufficient to ensure
492      * (somehow) that each handler and revalidator thread had passed through
493      * its main loop once. */
494     size_t n_handlers = udpif->n_handlers;
495     size_t n_revalidators = udpif->n_revalidators;
496
497     ovsrcu_quiesce_start();
498     udpif_stop_threads(udpif);
499     udpif_start_threads(udpif, n_handlers, n_revalidators);
500     ovsrcu_quiesce_end();
501 }
502
503 /* Notifies 'udpif' that something changed which may render previous
504  * xlate_actions() results invalid. */
505 void
506 udpif_revalidate(struct udpif *udpif)
507 {
508     seq_change(udpif->reval_seq);
509 }
510
511 /* Returns a seq which increments every time 'udpif' pulls stats from the
512  * datapath.  Callers can use this to get a sense of when might be a good time
513  * to do periodic work which relies on relatively up to date statistics. */
514 struct seq *
515 udpif_dump_seq(struct udpif *udpif)
516 {
517     return udpif->dump_seq;
518 }
519
520 void
521 udpif_get_memory_usage(struct udpif *udpif, struct simap *usage)
522 {
523     size_t i;
524
525     simap_increase(usage, "handlers", udpif->n_handlers);
526
527     simap_increase(usage, "revalidators", udpif->n_revalidators);
528     for (i = 0; i < N_UMAPS; i++) {
529         simap_increase(usage, "udpif keys", cmap_count(&udpif->ukeys[i].cmap));
530     }
531 }
532
533 /* Remove flows from a single datapath. */
534 void
535 udpif_flush(struct udpif *udpif)
536 {
537     size_t n_handlers, n_revalidators;
538
539     n_handlers = udpif->n_handlers;
540     n_revalidators = udpif->n_revalidators;
541
542     ovsrcu_quiesce_start();
543
544     udpif_stop_threads(udpif);
545     dpif_flow_flush(udpif->dpif);
546     udpif_start_threads(udpif, n_handlers, n_revalidators);
547
548     ovsrcu_quiesce_end();
549 }
550
551 /* Removes all flows from all datapaths. */
552 static void
553 udpif_flush_all_datapaths(void)
554 {
555     struct udpif *udpif;
556
557     LIST_FOR_EACH (udpif, list_node, &all_udpifs) {
558         udpif_flush(udpif);
559     }
560 }
561
562 \f
563 static unsigned long
564 udpif_get_n_flows(struct udpif *udpif)
565 {
566     long long int time, now;
567     unsigned long flow_count;
568
569     now = time_msec();
570     atomic_read_relaxed(&udpif->n_flows_timestamp, &time);
571     if (time < now - 100 && !ovs_mutex_trylock(&udpif->n_flows_mutex)) {
572         struct dpif_dp_stats stats;
573
574         atomic_store_relaxed(&udpif->n_flows_timestamp, now);
575         dpif_get_dp_stats(udpif->dpif, &stats);
576         flow_count = stats.n_flows;
577         atomic_store_relaxed(&udpif->n_flows, flow_count);
578         ovs_mutex_unlock(&udpif->n_flows_mutex);
579     } else {
580         atomic_read_relaxed(&udpif->n_flows, &flow_count);
581     }
582     return flow_count;
583 }
584
585 /* The upcall handler thread tries to read a batch of UPCALL_MAX_BATCH
586  * upcalls from dpif, processes the batch and installs corresponding flows
587  * in dpif. */
588 static void *
589 udpif_upcall_handler(void *arg)
590 {
591     struct handler *handler = arg;
592     struct udpif *udpif = handler->udpif;
593
594     while (!latch_is_set(&handler->udpif->exit_latch)) {
595         if (recv_upcalls(handler)) {
596             poll_immediate_wake();
597         } else {
598             dpif_recv_wait(udpif->dpif, handler->handler_id);
599             latch_wait(&udpif->exit_latch);
600         }
601         poll_block();
602     }
603
604     return NULL;
605 }
606
607 static size_t
608 recv_upcalls(struct handler *handler)
609 {
610     struct udpif *udpif = handler->udpif;
611     uint64_t recv_stubs[UPCALL_MAX_BATCH][512 / 8];
612     struct ofpbuf recv_bufs[UPCALL_MAX_BATCH];
613     struct dpif_upcall dupcalls[UPCALL_MAX_BATCH];
614     struct upcall upcalls[UPCALL_MAX_BATCH];
615     struct flow flows[UPCALL_MAX_BATCH];
616     size_t n_upcalls, i;
617
618     n_upcalls = 0;
619     while (n_upcalls < UPCALL_MAX_BATCH) {
620         struct ofpbuf *recv_buf = &recv_bufs[n_upcalls];
621         struct dpif_upcall *dupcall = &dupcalls[n_upcalls];
622         struct upcall *upcall = &upcalls[n_upcalls];
623         struct flow *flow = &flows[n_upcalls];
624         struct pkt_metadata md;
625         int error;
626
627         ofpbuf_use_stub(recv_buf, recv_stubs[n_upcalls],
628                         sizeof recv_stubs[n_upcalls]);
629         if (dpif_recv(udpif->dpif, handler->handler_id, dupcall, recv_buf)) {
630             ofpbuf_uninit(recv_buf);
631             break;
632         }
633
634         if (odp_flow_key_to_flow(dupcall->key, dupcall->key_len, flow)
635             == ODP_FIT_ERROR) {
636             goto free_dupcall;
637         }
638
639         error = upcall_receive(upcall, udpif->backer, &dupcall->packet,
640                                dupcall->type, dupcall->userdata, flow);
641         if (error) {
642             if (error == ENODEV) {
643                 /* Received packet on datapath port for which we couldn't
644                  * associate an ofproto.  This can happen if a port is removed
645                  * while traffic is being received.  Print a rate-limited
646                  * message in case it happens frequently. */
647                 dpif_flow_put(udpif->dpif, DPIF_FP_CREATE, dupcall->key,
648                               dupcall->key_len, NULL, 0, NULL, 0, NULL);
649                 VLOG_INFO_RL(&rl, "received packet on unassociated datapath "
650                              "port %"PRIu32, flow->in_port.odp_port);
651             }
652             goto free_dupcall;
653         }
654
655         upcall->key = dupcall->key;
656         upcall->key_len = dupcall->key_len;
657
658         upcall->out_tun_key = dupcall->out_tun_key;
659
660         if (vsp_adjust_flow(upcall->ofproto, flow, &dupcall->packet)) {
661             upcall->vsp_adjusted = true;
662         }
663
664         md = pkt_metadata_from_flow(flow);
665         flow_extract(&dupcall->packet, &md, flow);
666
667         error = process_upcall(udpif, upcall, NULL);
668         if (error) {
669             goto cleanup;
670         }
671
672         n_upcalls++;
673         continue;
674
675 cleanup:
676         upcall_uninit(upcall);
677 free_dupcall:
678         ofpbuf_uninit(&dupcall->packet);
679         ofpbuf_uninit(recv_buf);
680     }
681
682     if (n_upcalls) {
683         handle_upcalls(handler->udpif, upcalls, n_upcalls);
684         for (i = 0; i < n_upcalls; i++) {
685             ofpbuf_uninit(&dupcalls[i].packet);
686             ofpbuf_uninit(&recv_bufs[i]);
687             upcall_uninit(&upcalls[i]);
688         }
689     }
690
691     return n_upcalls;
692 }
693
694 static void *
695 udpif_revalidator(void *arg)
696 {
697     /* Used by all revalidators. */
698     struct revalidator *revalidator = arg;
699     struct udpif *udpif = revalidator->udpif;
700     bool leader = revalidator == &udpif->revalidators[0];
701
702     /* Used only by the leader. */
703     long long int start_time = 0;
704     uint64_t last_reval_seq = 0;
705     size_t n_flows = 0;
706
707     revalidator->id = ovsthread_id_self();
708     for (;;) {
709         if (leader) {
710             uint64_t reval_seq;
711
712             reval_seq = seq_read(udpif->reval_seq);
713             last_reval_seq = reval_seq;
714
715             n_flows = udpif_get_n_flows(udpif);
716             udpif->max_n_flows = MAX(n_flows, udpif->max_n_flows);
717             udpif->avg_n_flows = (udpif->avg_n_flows + n_flows) / 2;
718
719             /* Only the leader checks the exit latch to prevent a race where
720              * some threads think it's true and exit and others think it's
721              * false and block indefinitely on the reval_barrier */
722             udpif->reval_exit = latch_is_set(&udpif->exit_latch);
723
724             start_time = time_msec();
725             if (!udpif->reval_exit) {
726                 udpif->dump = dpif_flow_dump_create(udpif->dpif);
727             }
728         }
729
730         /* Wait for the leader to start the flow dump. */
731         ovs_barrier_block(&udpif->reval_barrier);
732         if (udpif->reval_exit) {
733             break;
734         }
735         revalidate(revalidator);
736
737         /* Wait for all flows to have been dumped before we garbage collect. */
738         ovs_barrier_block(&udpif->reval_barrier);
739         revalidator_sweep(revalidator);
740
741         /* Wait for all revalidators to finish garbage collection. */
742         ovs_barrier_block(&udpif->reval_barrier);
743
744         if (leader) {
745             unsigned int flow_limit;
746             long long int duration;
747
748             atomic_read_relaxed(&udpif->flow_limit, &flow_limit);
749
750             dpif_flow_dump_destroy(udpif->dump);
751             seq_change(udpif->dump_seq);
752
753             duration = MAX(time_msec() - start_time, 1);
754             udpif->dump_duration = duration;
755             if (duration > 2000) {
756                 flow_limit /= duration / 1000;
757             } else if (duration > 1300) {
758                 flow_limit = flow_limit * 3 / 4;
759             } else if (duration < 1000 && n_flows > 2000
760                        && flow_limit < n_flows * 1000 / duration) {
761                 flow_limit += 1000;
762             }
763             flow_limit = MIN(ofproto_flow_limit, MAX(flow_limit, 1000));
764             atomic_store_relaxed(&udpif->flow_limit, flow_limit);
765
766             if (duration > 2000) {
767                 VLOG_INFO("Spent an unreasonably long %lldms dumping flows",
768                           duration);
769             }
770
771             poll_timer_wait_until(start_time + MIN(ofproto_max_idle, 500));
772             seq_wait(udpif->reval_seq, last_reval_seq);
773             latch_wait(&udpif->exit_latch);
774             poll_block();
775         }
776     }
777
778     return NULL;
779 }
780 \f
781 static enum upcall_type
782 classify_upcall(enum dpif_upcall_type type, const struct nlattr *userdata)
783 {
784     union user_action_cookie cookie;
785     size_t userdata_len;
786
787     /* First look at the upcall type. */
788     switch (type) {
789     case DPIF_UC_ACTION:
790         break;
791
792     case DPIF_UC_MISS:
793         return MISS_UPCALL;
794
795     case DPIF_N_UC_TYPES:
796     default:
797         VLOG_WARN_RL(&rl, "upcall has unexpected type %"PRIu32, type);
798         return BAD_UPCALL;
799     }
800
801     /* "action" upcalls need a closer look. */
802     if (!userdata) {
803         VLOG_WARN_RL(&rl, "action upcall missing cookie");
804         return BAD_UPCALL;
805     }
806     userdata_len = nl_attr_get_size(userdata);
807     if (userdata_len < sizeof cookie.type
808         || userdata_len > sizeof cookie) {
809         VLOG_WARN_RL(&rl, "action upcall cookie has unexpected size %"PRIuSIZE,
810                      userdata_len);
811         return BAD_UPCALL;
812     }
813     memset(&cookie, 0, sizeof cookie);
814     memcpy(&cookie, nl_attr_get(userdata), userdata_len);
815     if (userdata_len == MAX(8, sizeof cookie.sflow)
816         && cookie.type == USER_ACTION_COOKIE_SFLOW) {
817         return SFLOW_UPCALL;
818     } else if (userdata_len == MAX(8, sizeof cookie.slow_path)
819                && cookie.type == USER_ACTION_COOKIE_SLOW_PATH) {
820         return MISS_UPCALL;
821     } else if (userdata_len == MAX(8, sizeof cookie.flow_sample)
822                && cookie.type == USER_ACTION_COOKIE_FLOW_SAMPLE) {
823         return FLOW_SAMPLE_UPCALL;
824     } else if (userdata_len == MAX(8, sizeof cookie.ipfix)
825                && cookie.type == USER_ACTION_COOKIE_IPFIX) {
826         return IPFIX_UPCALL;
827     } else {
828         VLOG_WARN_RL(&rl, "invalid user cookie of type %"PRIu16
829                      " and size %"PRIuSIZE, cookie.type, userdata_len);
830         return BAD_UPCALL;
831     }
832 }
833
834 /* Calculates slow path actions for 'xout'.  'buf' must statically be
835  * initialized with at least 128 bytes of space. */
836 static void
837 compose_slow_path(struct udpif *udpif, struct xlate_out *xout,
838                   const struct flow *flow, odp_port_t odp_in_port,
839                   struct ofpbuf *buf)
840 {
841     union user_action_cookie cookie;
842     odp_port_t port;
843     uint32_t pid;
844
845     cookie.type = USER_ACTION_COOKIE_SLOW_PATH;
846     cookie.slow_path.unused = 0;
847     cookie.slow_path.reason = xout->slow;
848
849     port = xout->slow & (SLOW_CFM | SLOW_BFD | SLOW_LACP | SLOW_STP)
850         ? ODPP_NONE
851         : odp_in_port;
852     pid = dpif_port_get_pid(udpif->dpif, port, flow_hash_5tuple(flow, 0));
853     odp_put_userspace_action(pid, &cookie, sizeof cookie.slow_path, ODPP_NONE,
854                              buf);
855 }
856
857 /* If there is no error, the upcall must be destroyed with upcall_uninit()
858  * before quiescing, as the referred objects are guaranteed to exist only
859  * until the calling thread quiesces.  Otherwise, do not call upcall_uninit()
860  * since the 'upcall->put_actions' remains uninitialized. */
861 static int
862 upcall_receive(struct upcall *upcall, const struct dpif_backer *backer,
863                const struct ofpbuf *packet, enum dpif_upcall_type type,
864                const struct nlattr *userdata, const struct flow *flow)
865 {
866     int error;
867
868     error = xlate_lookup(backer, flow, &upcall->ofproto, &upcall->ipfix,
869                          &upcall->sflow, NULL, &upcall->in_port);
870     if (error) {
871         return error;
872     }
873
874     upcall->flow = flow;
875     upcall->packet = packet;
876     upcall->type = type;
877     upcall->userdata = userdata;
878     ofpbuf_init(&upcall->put_actions, 0);
879
880     upcall->xout_initialized = false;
881     upcall->vsp_adjusted = false;
882     upcall->ukey_persists = false;
883
884     upcall->ukey = NULL;
885     upcall->key = NULL;
886     upcall->key_len = 0;
887
888     upcall->out_tun_key = NULL;
889
890     return 0;
891 }
892
893 static void
894 upcall_xlate(struct udpif *udpif, struct upcall *upcall,
895              struct ofpbuf *odp_actions)
896 {
897     struct dpif_flow_stats stats;
898     struct xlate_in xin;
899
900     stats.n_packets = 1;
901     stats.n_bytes = ofpbuf_size(upcall->packet);
902     stats.used = time_msec();
903     stats.tcp_flags = ntohs(upcall->flow->tcp_flags);
904
905     xlate_in_init(&xin, upcall->ofproto, upcall->flow, upcall->in_port, NULL,
906                   stats.tcp_flags, upcall->packet);
907     xin.odp_actions = odp_actions;
908
909     if (upcall->type == DPIF_UC_MISS) {
910         xin.resubmit_stats = &stats;
911     } else {
912         /* For non-miss upcalls, there's a flow in the datapath which this
913          * packet was accounted to.  Presumably the revalidators will deal
914          * with pushing its stats eventually. */
915     }
916
917     upcall->dump_seq = seq_read(udpif->dump_seq);
918     upcall->reval_seq = seq_read(udpif->reval_seq);
919     xlate_actions(&xin, &upcall->xout);
920     upcall->xout_initialized = true;
921
922     /* Special case for fail-open mode.
923      *
924      * If we are in fail-open mode, but we are connected to a controller too,
925      * then we should send the packet up to the controller in the hope that it
926      * will try to set up a flow and thereby allow us to exit fail-open.
927      *
928      * See the top-level comment in fail-open.c for more information.
929      *
930      * Copy packets before they are modified by execution. */
931     if (upcall->xout.fail_open) {
932         const struct ofpbuf *packet = upcall->packet;
933         struct ofproto_packet_in *pin;
934
935         pin = xmalloc(sizeof *pin);
936         pin->up.packet = xmemdup(ofpbuf_data(packet), ofpbuf_size(packet));
937         pin->up.packet_len = ofpbuf_size(packet);
938         pin->up.reason = OFPR_NO_MATCH;
939         pin->up.table_id = 0;
940         pin->up.cookie = OVS_BE64_MAX;
941         flow_get_metadata(upcall->flow, &pin->up.fmd);
942         pin->send_len = 0; /* Not used for flow table misses. */
943         pin->miss_type = OFPROTO_PACKET_IN_NO_MISS;
944         ofproto_dpif_send_packet_in(upcall->ofproto, pin);
945     }
946
947     if (!upcall->xout.slow) {
948         ofpbuf_use_const(&upcall->put_actions,
949                          ofpbuf_data(upcall->xout.odp_actions),
950                          ofpbuf_size(upcall->xout.odp_actions));
951     } else {
952         ofpbuf_init(&upcall->put_actions, 0);
953         compose_slow_path(udpif, &upcall->xout, upcall->flow,
954                           upcall->flow->in_port.odp_port,
955                           &upcall->put_actions);
956     }
957
958     upcall->ukey = ukey_create_from_upcall(udpif, upcall);
959 }
960
961 static void
962 upcall_uninit(struct upcall *upcall)
963 {
964     if (upcall) {
965         if (upcall->xout_initialized) {
966             xlate_out_uninit(&upcall->xout);
967         }
968         ofpbuf_uninit(&upcall->put_actions);
969         if (!upcall->ukey_persists) {
970             ukey_delete__(upcall->ukey);
971         }
972     }
973 }
974
975 static int
976 upcall_cb(const struct ofpbuf *packet, const struct flow *flow,
977           enum dpif_upcall_type type, const struct nlattr *userdata,
978           struct ofpbuf *actions, struct flow_wildcards *wc,
979           struct ofpbuf *put_actions, void *aux)
980 {
981     struct udpif *udpif = aux;
982     unsigned int flow_limit;
983     struct upcall upcall;
984     bool megaflow;
985     int error;
986
987     atomic_read_relaxed(&enable_megaflows, &megaflow);
988     atomic_read_relaxed(&udpif->flow_limit, &flow_limit);
989
990     error = upcall_receive(&upcall, udpif->backer, packet, type, userdata,
991                            flow);
992     if (error) {
993         return error;
994     }
995
996     error = process_upcall(udpif, &upcall, actions);
997     if (error) {
998         goto out;
999     }
1000
1001     if (upcall.xout.slow && put_actions) {
1002         ofpbuf_put(put_actions, ofpbuf_data(&upcall.put_actions),
1003                    ofpbuf_size(&upcall.put_actions));
1004     }
1005
1006     if (OVS_LIKELY(wc)) {
1007         if (megaflow) {
1008             /* XXX: This could be avoided with sufficient API changes. */
1009             *wc = upcall.xout.wc;
1010         } else {
1011             flow_wildcards_init_for_packet(wc, flow);
1012         }
1013     }
1014
1015     if (udpif_get_n_flows(udpif) >= flow_limit) {
1016         error = ENOSPC;
1017         goto out;
1018     }
1019
1020     if (upcall.ukey && !ukey_install(udpif, upcall.ukey)) {
1021         error = ENOSPC;
1022     }
1023
1024 out:
1025     if (!error) {
1026         upcall.ukey_persists = true;
1027     }
1028     upcall_uninit(&upcall);
1029     return error;
1030 }
1031
1032 static int
1033 process_upcall(struct udpif *udpif, struct upcall *upcall,
1034                struct ofpbuf *odp_actions)
1035 {
1036     const struct nlattr *userdata = upcall->userdata;
1037     const struct ofpbuf *packet = upcall->packet;
1038     const struct flow *flow = upcall->flow;
1039
1040     switch (classify_upcall(upcall->type, userdata)) {
1041     case MISS_UPCALL:
1042         upcall_xlate(udpif, upcall, odp_actions);
1043         return 0;
1044
1045     case SFLOW_UPCALL:
1046         if (upcall->sflow) {
1047             union user_action_cookie cookie;
1048
1049             memset(&cookie, 0, sizeof cookie);
1050             memcpy(&cookie, nl_attr_get(userdata), sizeof cookie.sflow);
1051             dpif_sflow_received(upcall->sflow, packet, flow,
1052                                 flow->in_port.odp_port, &cookie);
1053         }
1054         break;
1055
1056     case IPFIX_UPCALL:
1057         if (upcall->ipfix) {
1058             union user_action_cookie cookie;
1059             struct flow_tnl output_tunnel_key;
1060
1061             memset(&cookie, 0, sizeof cookie);
1062             memcpy(&cookie, nl_attr_get(userdata), sizeof cookie.ipfix);
1063
1064             if (upcall->out_tun_key) {
1065                 memset(&output_tunnel_key, 0, sizeof output_tunnel_key);
1066                 odp_tun_key_from_attr(upcall->out_tun_key,
1067                                       &output_tunnel_key);
1068             }
1069             dpif_ipfix_bridge_sample(upcall->ipfix, packet, flow,
1070                                      flow->in_port.odp_port,
1071                                      cookie.ipfix.output_odp_port,
1072                                      upcall->out_tun_key ?
1073                                          &output_tunnel_key : NULL);
1074         }
1075         break;
1076
1077     case FLOW_SAMPLE_UPCALL:
1078         if (upcall->ipfix) {
1079             union user_action_cookie cookie;
1080
1081             memset(&cookie, 0, sizeof cookie);
1082             memcpy(&cookie, nl_attr_get(userdata), sizeof cookie.flow_sample);
1083
1084             /* The flow reflects exactly the contents of the packet.
1085              * Sample the packet using it. */
1086             dpif_ipfix_flow_sample(upcall->ipfix, packet, flow,
1087                                    cookie.flow_sample.collector_set_id,
1088                                    cookie.flow_sample.probability,
1089                                    cookie.flow_sample.obs_domain_id,
1090                                    cookie.flow_sample.obs_point_id);
1091         }
1092         break;
1093
1094     case BAD_UPCALL:
1095         break;
1096     }
1097
1098     return EAGAIN;
1099 }
1100
1101 static void
1102 handle_upcalls(struct udpif *udpif, struct upcall *upcalls,
1103                size_t n_upcalls)
1104 {
1105     struct dpif_op *opsp[UPCALL_MAX_BATCH * 2];
1106     struct ukey_op ops[UPCALL_MAX_BATCH * 2];
1107     unsigned int flow_limit;
1108     size_t n_ops, n_opsp, i;
1109     bool may_put;
1110     bool megaflow;
1111
1112     atomic_read_relaxed(&udpif->flow_limit, &flow_limit);
1113     atomic_read_relaxed(&enable_megaflows, &megaflow);
1114
1115     may_put = udpif_get_n_flows(udpif) < flow_limit;
1116
1117     /* Handle the packets individually in order of arrival.
1118      *
1119      *   - For SLOW_CFM, SLOW_LACP, SLOW_STP, and SLOW_BFD, translation is what
1120      *     processes received packets for these protocols.
1121      *
1122      *   - For SLOW_CONTROLLER, translation sends the packet to the OpenFlow
1123      *     controller.
1124      *
1125      * The loop fills 'ops' with an array of operations to execute in the
1126      * datapath. */
1127     n_ops = 0;
1128     for (i = 0; i < n_upcalls; i++) {
1129         struct upcall *upcall = &upcalls[i];
1130         const struct ofpbuf *packet = upcall->packet;
1131         struct ukey_op *op;
1132
1133         if (upcall->vsp_adjusted) {
1134             /* This packet was received on a VLAN splinter port.  We added a
1135              * VLAN to the packet to make the packet resemble the flow, but the
1136              * actions were composed assuming that the packet contained no
1137              * VLAN.  So, we must remove the VLAN header from the packet before
1138              * trying to execute the actions. */
1139             if (ofpbuf_size(upcall->xout.odp_actions)) {
1140                 eth_pop_vlan(CONST_CAST(struct ofpbuf *, upcall->packet));
1141             }
1142
1143             /* Remove the flow vlan tags inserted by vlan splinter logic
1144              * to ensure megaflow masks generated match the data path flow. */
1145             CONST_CAST(struct flow *, upcall->flow)->vlan_tci = 0;
1146         }
1147
1148         /* Do not install a flow into the datapath if:
1149          *
1150          *    - The datapath already has too many flows.
1151          *
1152          *    - We received this packet via some flow installed in the kernel
1153          *      already. */
1154         if (may_put && upcall->type == DPIF_UC_MISS) {
1155             struct udpif_key *ukey = upcall->ukey;
1156
1157             upcall->ukey_persists = true;
1158             op = &ops[n_ops++];
1159             op->ukey = ukey;
1160             op->dop.type = DPIF_OP_FLOW_PUT;
1161             op->dop.u.flow_put.flags = DPIF_FP_CREATE;
1162             op->dop.u.flow_put.key = ukey->key;
1163             op->dop.u.flow_put.key_len = ukey->key_len;
1164             op->dop.u.flow_put.mask = ukey->mask;
1165             op->dop.u.flow_put.mask_len = ukey->mask_len;
1166             op->dop.u.flow_put.stats = NULL;
1167             op->dop.u.flow_put.actions = ofpbuf_data(ukey->actions);
1168             op->dop.u.flow_put.actions_len = ofpbuf_size(ukey->actions);
1169         }
1170
1171         if (ofpbuf_size(upcall->xout.odp_actions)) {
1172             op = &ops[n_ops++];
1173             op->ukey = NULL;
1174             op->dop.type = DPIF_OP_EXECUTE;
1175             op->dop.u.execute.packet = CONST_CAST(struct ofpbuf *, packet);
1176             odp_key_to_pkt_metadata(upcall->key, upcall->key_len,
1177                                     &op->dop.u.execute.md);
1178             op->dop.u.execute.actions = ofpbuf_data(upcall->xout.odp_actions);
1179             op->dop.u.execute.actions_len = ofpbuf_size(upcall->xout.odp_actions);
1180             op->dop.u.execute.needs_help = (upcall->xout.slow & SLOW_ACTION) != 0;
1181             op->dop.u.execute.probe = false;
1182         }
1183     }
1184
1185     /* Execute batch.
1186      *
1187      * We install ukeys before installing the flows, locking them for exclusive
1188      * access by this thread for the period of installation. This ensures that
1189      * other threads won't attempt to delete the flows as we are creating them.
1190      */
1191     n_opsp = 0;
1192     for (i = 0; i < n_ops; i++) {
1193         struct udpif_key *ukey = ops[i].ukey;
1194
1195         if (ukey) {
1196             /* If we can't install the ukey, don't install the flow. */
1197             if (!ukey_install_start(udpif, ukey)) {
1198                 ukey_delete__(ukey);
1199                 ops[i].ukey = NULL;
1200                 continue;
1201             }
1202         }
1203         opsp[n_opsp++] = &ops[i].dop;
1204     }
1205     dpif_operate(udpif->dpif, opsp, n_opsp);
1206     for (i = 0; i < n_ops; i++) {
1207         if (ops[i].ukey) {
1208             ukey_install_finish(ops[i].ukey, ops[i].dop.error);
1209         }
1210     }
1211 }
1212
1213 static struct udpif_key *
1214 ukey_lookup(struct udpif *udpif, uint32_t hash, const struct nlattr *key,
1215             size_t key_len)
1216 {
1217     struct udpif_key *ukey;
1218     struct cmap *cmap = &udpif->ukeys[hash % N_UMAPS].cmap;
1219
1220     CMAP_FOR_EACH_WITH_HASH (ukey, cmap_node, hash, cmap) {
1221         if (ukey->key_len == key_len && !memcmp(ukey->key, key, key_len)) {
1222             return ukey;
1223         }
1224     }
1225     return NULL;
1226 }
1227
1228 static struct udpif_key *
1229 ukey_create__(const struct udpif *udpif,
1230               const struct nlattr *key, size_t key_len,
1231               const struct nlattr *mask, size_t mask_len,
1232               const struct ofpbuf *actions,
1233               uint64_t dump_seq, uint64_t reval_seq, long long int used)
1234     OVS_NO_THREAD_SAFETY_ANALYSIS
1235 {
1236     struct udpif_key *ukey = xmalloc(sizeof *ukey);
1237
1238     memcpy(&ukey->keybuf, key, key_len);
1239     ukey->key = &ukey->keybuf.nla;
1240     ukey->key_len = key_len;
1241     memcpy(&ukey->maskbuf, mask, mask_len);
1242     ukey->mask = &ukey->maskbuf.nla;
1243     ukey->mask_len = mask_len;
1244     ukey->hash = hash_bytes(ukey->key, ukey->key_len, udpif->secret);
1245     ukey->actions = ofpbuf_clone(actions);
1246
1247     ovs_mutex_init(&ukey->mutex);
1248     ukey->dump_seq = dump_seq;
1249     ukey->reval_seq = reval_seq;
1250     ukey->flow_exists = false;
1251     ukey->created = time_msec();
1252     memset(&ukey->stats, 0, sizeof ukey->stats);
1253     ukey->stats.used = used;
1254     ukey->xcache = NULL;
1255
1256     return ukey;
1257 }
1258
1259 static struct udpif_key *
1260 ukey_create_from_upcall(const struct udpif *udpif, const struct upcall *upcall)
1261 {
1262     struct odputil_keybuf keystub, maskstub;
1263     struct ofpbuf keybuf, maskbuf;
1264     bool recirc, megaflow;
1265
1266     if (upcall->key_len) {
1267         ofpbuf_use_const(&keybuf, upcall->key, upcall->key_len);
1268     } else {
1269         /* dpif-netdev doesn't provide a netlink-formatted flow key in the
1270          * upcall, so convert the upcall's flow here. */
1271         ofpbuf_use_stack(&keybuf, &keystub, sizeof keystub);
1272         odp_flow_key_from_flow(&keybuf, upcall->flow, &upcall->xout.wc.masks,
1273                                upcall->flow->in_port.odp_port, true);
1274     }
1275
1276     atomic_read_relaxed(&enable_megaflows, &megaflow);
1277     recirc = ofproto_dpif_get_enable_recirc(upcall->ofproto);
1278     ofpbuf_use_stack(&maskbuf, &maskstub, sizeof maskstub);
1279     if (megaflow) {
1280         size_t max_mpls;
1281
1282         max_mpls = ofproto_dpif_get_max_mpls_depth(upcall->ofproto);
1283         odp_flow_key_from_mask(&maskbuf, &upcall->xout.wc.masks, upcall->flow,
1284                                UINT32_MAX, max_mpls, recirc);
1285     }
1286
1287     return ukey_create__(udpif, ofpbuf_data(&keybuf), ofpbuf_size(&keybuf),
1288                          ofpbuf_data(&maskbuf), ofpbuf_size(&maskbuf),
1289                          &upcall->put_actions, upcall->dump_seq,
1290                          upcall->reval_seq, 0);
1291 }
1292
1293 static struct udpif_key *
1294 ukey_create_from_dpif_flow(const struct udpif *udpif,
1295                            const struct dpif_flow *flow)
1296 {
1297     struct ofpbuf actions;
1298     uint64_t dump_seq, reval_seq;
1299
1300     dump_seq = seq_read(udpif->dump_seq);
1301     reval_seq = seq_read(udpif->reval_seq);
1302     ofpbuf_use_const(&actions, &flow->actions, flow->actions_len);
1303     return ukey_create__(udpif, flow->key, flow->key_len,
1304                          flow->mask, flow->mask_len, &actions,
1305                          dump_seq, reval_seq, flow->stats.used);
1306 }
1307
1308 /* Attempts to insert a ukey into the shared ukey maps.
1309  *
1310  * On success, returns true, installs the ukey and returns it in a locked
1311  * state. Otherwise, returns false. */
1312 static bool
1313 ukey_install_start(struct udpif *udpif, struct udpif_key *new_ukey)
1314     OVS_TRY_LOCK(true, new_ukey->mutex)
1315 {
1316     struct umap *umap;
1317     struct udpif_key *old_ukey;
1318     uint32_t idx;
1319     bool locked = false;
1320
1321     idx = new_ukey->hash % N_UMAPS;
1322     umap = &udpif->ukeys[idx];
1323     ovs_mutex_lock(&umap->mutex);
1324     old_ukey = ukey_lookup(udpif, new_ukey->hash, new_ukey->key,
1325                            new_ukey->key_len);
1326     if (old_ukey) {
1327         /* Uncommon case: A ukey is already installed with the same UFID. */
1328         if (old_ukey->key_len == new_ukey->key_len
1329             && !memcmp(old_ukey->key, new_ukey->key, new_ukey->key_len)) {
1330             COVERAGE_INC(handler_duplicate_upcall);
1331         } else {
1332             struct ds ds = DS_EMPTY_INITIALIZER;
1333
1334             odp_flow_key_format(old_ukey->key, old_ukey->key_len, &ds);
1335             ds_put_cstr(&ds, "\n");
1336             odp_flow_key_format(new_ukey->key, new_ukey->key_len, &ds);
1337
1338             VLOG_WARN_RL(&rl, "Conflicting ukey for flows:\n%s", ds_cstr(&ds));
1339             ds_destroy(&ds);
1340         }
1341     } else {
1342         ovs_mutex_lock(&new_ukey->mutex);
1343         cmap_insert(&umap->cmap, &new_ukey->cmap_node, new_ukey->hash);
1344         locked = true;
1345     }
1346     ovs_mutex_unlock(&umap->mutex);
1347
1348     return locked;
1349 }
1350
1351 static void
1352 ukey_install_finish__(struct udpif_key *ukey) OVS_REQUIRES(ukey->mutex)
1353 {
1354     ukey->flow_exists = true;
1355 }
1356
1357 static bool
1358 ukey_install_finish(struct udpif_key *ukey, int error)
1359     OVS_RELEASES(ukey->mutex)
1360 {
1361     if (!error) {
1362         ukey_install_finish__(ukey);
1363     }
1364     ovs_mutex_unlock(&ukey->mutex);
1365
1366     return !error;
1367 }
1368
1369 static bool
1370 ukey_install(struct udpif *udpif, struct udpif_key *ukey)
1371 {
1372     /* The usual way to keep 'ukey->flow_exists' in sync with the datapath is
1373      * to call ukey_install_start(), install the corresponding datapath flow,
1374      * then call ukey_install_finish(). The netdev interface using upcall_cb()
1375      * doesn't provide a function to separately finish the flow installation,
1376      * so we perform the operations together here.
1377      *
1378      * This is fine currently, as revalidator threads will only delete this
1379      * ukey during revalidator_sweep() and only if the dump_seq is mismatched.
1380      * It is unlikely for a revalidator thread to advance dump_seq and reach
1381      * the next GC phase between ukey creation and flow installation. */
1382     return ukey_install_start(udpif, ukey) && ukey_install_finish(ukey, 0);
1383 }
1384
1385 /* Searches for a ukey in 'udpif->ukeys' that matches 'flow' and attempts to
1386  * lock the ukey. If the ukey does not exist, create it.
1387  *
1388  * Returns true on success, setting *result to the matching ukey and returning
1389  * it in a locked state. Otherwise, returns false and clears *result. */
1390 static int
1391 ukey_acquire(struct udpif *udpif, const struct dpif_flow *flow,
1392              struct udpif_key **result)
1393     OVS_TRY_LOCK(true, (*result)->mutex)
1394 {
1395     struct udpif_key *ukey;
1396     uint32_t hash;
1397     bool locked = false;
1398
1399     hash = hash_bytes(flow->key, flow->key_len, udpif->secret);
1400     ukey = ukey_lookup(udpif, hash, flow->key, flow->key_len);
1401     if (ukey) {
1402         if (!ovs_mutex_trylock(&ukey->mutex)) {
1403             locked = true;
1404         }
1405     } else {
1406         bool installed;
1407
1408         /* Usually we try to avoid installing flows from revalidator threads,
1409          * because locking on a umap may cause handler threads to block.
1410          * However there are certain cases, like when ovs-vswitchd is
1411          * restarted, where it is desirable to handle flows that exist in the
1412          * datapath gracefully (ie, don't just clear the datapath). */
1413         ukey = ukey_create_from_dpif_flow(udpif, flow);
1414         installed = ukey_install_start(udpif, ukey);
1415         if (installed) {
1416             ukey_install_finish__(ukey);
1417             locked = true;
1418         } else {
1419             ukey_delete__(ukey);
1420             locked = false;
1421         }
1422     }
1423
1424     if (locked) {
1425         *result = ukey;
1426     } else {
1427         *result = NULL;
1428     }
1429     return locked;
1430 }
1431
1432 static void
1433 ukey_delete__(struct udpif_key *ukey)
1434     OVS_NO_THREAD_SAFETY_ANALYSIS
1435 {
1436     if (ukey) {
1437         xlate_cache_delete(ukey->xcache);
1438         ofpbuf_delete(ukey->actions);
1439         ovs_mutex_destroy(&ukey->mutex);
1440         free(ukey);
1441     }
1442 }
1443
1444 static void
1445 ukey_delete(struct umap *umap, struct udpif_key *ukey)
1446     OVS_REQUIRES(umap->mutex)
1447 {
1448     cmap_remove(&umap->cmap, &ukey->cmap_node, ukey->hash);
1449     ovsrcu_postpone(ukey_delete__, ukey);
1450 }
1451
1452 static bool
1453 should_revalidate(const struct udpif *udpif, uint64_t packets,
1454                   long long int used)
1455 {
1456     long long int metric, now, duration;
1457
1458     if (udpif->dump_duration < 200) {
1459         /* We are likely to handle full revalidation for the flows. */
1460         return true;
1461     }
1462
1463     /* Calculate the mean time between seeing these packets. If this
1464      * exceeds the threshold, then delete the flow rather than performing
1465      * costly revalidation for flows that aren't being hit frequently.
1466      *
1467      * This is targeted at situations where the dump_duration is high (~1s),
1468      * and revalidation is triggered by a call to udpif_revalidate(). In
1469      * these situations, revalidation of all flows causes fluctuations in the
1470      * flow_limit due to the interaction with the dump_duration and max_idle.
1471      * This tends to result in deletion of low-throughput flows anyway, so
1472      * skip the revalidation and just delete those flows. */
1473     packets = MAX(packets, 1);
1474     now = MAX(used, time_msec());
1475     duration = now - used;
1476     metric = duration / packets;
1477
1478     if (metric < 200) {
1479         /* The flow is receiving more than ~5pps, so keep it. */
1480         return true;
1481     }
1482     return false;
1483 }
1484
1485 static bool
1486 revalidate_ukey(struct udpif *udpif, struct udpif_key *ukey,
1487                 const struct dpif_flow_stats *stats, uint64_t reval_seq)
1488     OVS_REQUIRES(ukey->mutex)
1489 {
1490     uint64_t slow_path_buf[128 / 8];
1491     struct xlate_out xout, *xoutp;
1492     struct netflow *netflow;
1493     struct ofproto_dpif *ofproto;
1494     struct dpif_flow_stats push;
1495     struct ofpbuf xout_actions;
1496     struct flow flow, dp_mask;
1497     uint32_t *dp32, *xout32;
1498     ofp_port_t ofp_in_port;
1499     struct xlate_in xin;
1500     long long int last_used;
1501     int error;
1502     size_t i;
1503     bool ok;
1504     bool need_revalidate;
1505
1506     ok = false;
1507     xoutp = NULL;
1508     netflow = NULL;
1509
1510     need_revalidate = (ukey->reval_seq != reval_seq);
1511     last_used = ukey->stats.used;
1512     push.used = stats->used;
1513     push.tcp_flags = stats->tcp_flags;
1514     push.n_packets = (stats->n_packets > ukey->stats.n_packets
1515                       ? stats->n_packets - ukey->stats.n_packets
1516                       : 0);
1517     push.n_bytes = (stats->n_bytes > ukey->stats.n_bytes
1518                     ? stats->n_bytes - ukey->stats.n_bytes
1519                     : 0);
1520
1521     if (need_revalidate && last_used
1522         && !should_revalidate(udpif, push.n_packets, last_used)) {
1523         ok = false;
1524         goto exit;
1525     }
1526
1527     /* We will push the stats, so update the ukey stats cache. */
1528     ukey->stats = *stats;
1529     if (!push.n_packets && !need_revalidate) {
1530         ok = true;
1531         goto exit;
1532     }
1533
1534     if (ukey->xcache && !need_revalidate) {
1535         xlate_push_stats(ukey->xcache, &push);
1536         ok = true;
1537         goto exit;
1538     }
1539
1540     if (odp_flow_key_to_flow(ukey->key, ukey->key_len, &flow)
1541         == ODP_FIT_ERROR) {
1542         goto exit;
1543     }
1544
1545     error = xlate_lookup(udpif->backer, &flow, &ofproto, NULL, NULL, &netflow,
1546                          &ofp_in_port);
1547     if (error) {
1548         goto exit;
1549     }
1550
1551     if (need_revalidate) {
1552         xlate_cache_clear(ukey->xcache);
1553     }
1554     if (!ukey->xcache) {
1555         ukey->xcache = xlate_cache_new();
1556     }
1557
1558     xlate_in_init(&xin, ofproto, &flow, ofp_in_port, NULL, push.tcp_flags,
1559                   NULL);
1560     if (push.n_packets) {
1561         xin.resubmit_stats = &push;
1562         xin.may_learn = true;
1563     }
1564     xin.xcache = ukey->xcache;
1565     xin.skip_wildcards = !need_revalidate;
1566     xlate_actions(&xin, &xout);
1567     xoutp = &xout;
1568
1569     if (!need_revalidate) {
1570         ok = true;
1571         goto exit;
1572     }
1573
1574     if (!xout.slow) {
1575         ofpbuf_use_const(&xout_actions, ofpbuf_data(xout.odp_actions),
1576                          ofpbuf_size(xout.odp_actions));
1577     } else {
1578         ofpbuf_use_stack(&xout_actions, slow_path_buf, sizeof slow_path_buf);
1579         compose_slow_path(udpif, &xout, &flow, flow.in_port.odp_port,
1580                           &xout_actions);
1581     }
1582
1583     if (!ofpbuf_equal(&xout_actions, ukey->actions)) {
1584         goto exit;
1585     }
1586
1587     if (odp_flow_key_to_mask(ukey->mask, ukey->mask_len, &dp_mask, &flow)
1588         == ODP_FIT_ERROR) {
1589         goto exit;
1590     }
1591
1592     /* Since the kernel is free to ignore wildcarded bits in the mask, we can't
1593      * directly check that the masks are the same.  Instead we check that the
1594      * mask in the kernel is more specific i.e. less wildcarded, than what
1595      * we've calculated here.  This guarantees we don't catch any packets we
1596      * shouldn't with the megaflow. */
1597     dp32 = (uint32_t *) &dp_mask;
1598     xout32 = (uint32_t *) &xout.wc.masks;
1599     for (i = 0; i < FLOW_U32S; i++) {
1600         if ((dp32[i] | xout32[i]) != dp32[i]) {
1601             goto exit;
1602         }
1603     }
1604
1605     ok = true;
1606
1607 exit:
1608     if (ok) {
1609         ukey->reval_seq = reval_seq;
1610     }
1611     if (netflow && !ok) {
1612         netflow_flow_clear(netflow, &flow);
1613     }
1614     xlate_out_uninit(xoutp);
1615     return ok;
1616 }
1617
1618 static void
1619 delete_op_init(struct ukey_op *op, struct udpif_key *ukey)
1620 {
1621     op->ukey = ukey;
1622     op->dop.type = DPIF_OP_FLOW_DEL;
1623     op->dop.u.flow_del.key = ukey->key;
1624     op->dop.u.flow_del.key_len = ukey->key_len;
1625     op->dop.u.flow_del.stats = &op->stats;
1626 }
1627
1628 static void
1629 push_ukey_ops__(struct udpif *udpif, struct ukey_op *ops, size_t n_ops)
1630 {
1631     struct dpif_op *opsp[REVALIDATE_MAX_BATCH];
1632     size_t i;
1633
1634     ovs_assert(n_ops <= REVALIDATE_MAX_BATCH);
1635     for (i = 0; i < n_ops; i++) {
1636         opsp[i] = &ops[i].dop;
1637     }
1638     dpif_operate(udpif->dpif, opsp, n_ops);
1639
1640     for (i = 0; i < n_ops; i++) {
1641         struct ukey_op *op = &ops[i];
1642         struct dpif_flow_stats *push, *stats, push_buf;
1643
1644         stats = op->dop.u.flow_del.stats;
1645         push = &push_buf;
1646
1647         ovs_mutex_lock(&op->ukey->mutex);
1648         push->used = MAX(stats->used, op->ukey->stats.used);
1649         push->tcp_flags = stats->tcp_flags | op->ukey->stats.tcp_flags;
1650         push->n_packets = stats->n_packets - op->ukey->stats.n_packets;
1651         push->n_bytes = stats->n_bytes - op->ukey->stats.n_bytes;
1652         ovs_mutex_unlock(&op->ukey->mutex);
1653
1654         if (push->n_packets || netflow_exists()) {
1655             struct ofproto_dpif *ofproto;
1656             struct netflow *netflow;
1657             ofp_port_t ofp_in_port;
1658             struct flow flow;
1659             int error;
1660
1661             ovs_mutex_lock(&op->ukey->mutex);
1662             if (op->ukey->xcache) {
1663                 xlate_push_stats(op->ukey->xcache, push);
1664                 ovs_mutex_unlock(&op->ukey->mutex);
1665                 continue;
1666             }
1667             ovs_mutex_unlock(&op->ukey->mutex);
1668
1669             if (odp_flow_key_to_flow(op->dop.u.flow_del.key,
1670                                      op->dop.u.flow_del.key_len, &flow)
1671                 == ODP_FIT_ERROR) {
1672                 continue;
1673             }
1674
1675             error = xlate_lookup(udpif->backer, &flow, &ofproto,
1676                                  NULL, NULL, &netflow, &ofp_in_port);
1677             if (!error) {
1678                 struct xlate_in xin;
1679
1680                 xlate_in_init(&xin, ofproto, &flow, ofp_in_port, NULL,
1681                               push->tcp_flags, NULL);
1682                 xin.resubmit_stats = push->n_packets ? push : NULL;
1683                 xin.may_learn = push->n_packets > 0;
1684                 xin.skip_wildcards = true;
1685                 xlate_actions_for_side_effects(&xin);
1686
1687                 if (netflow) {
1688                     netflow_flow_clear(netflow, &flow);
1689                 }
1690             }
1691         }
1692     }
1693 }
1694
1695 static void
1696 push_ukey_ops(struct udpif *udpif, struct umap *umap,
1697               struct ukey_op *ops, size_t n_ops)
1698 {
1699     int i;
1700
1701     push_ukey_ops__(udpif, ops, n_ops);
1702     ovs_mutex_lock(&umap->mutex);
1703     for (i = 0; i < n_ops; i++) {
1704         ukey_delete(umap, ops[i].ukey);
1705     }
1706     ovs_mutex_unlock(&umap->mutex);
1707 }
1708
1709 static void
1710 revalidate(struct revalidator *revalidator)
1711 {
1712     struct udpif *udpif = revalidator->udpif;
1713     struct dpif_flow_dump_thread *dump_thread;
1714     uint64_t dump_seq, reval_seq;
1715     unsigned int flow_limit;
1716
1717     dump_seq = seq_read(udpif->dump_seq);
1718     reval_seq = seq_read(udpif->reval_seq);
1719     atomic_read_relaxed(&udpif->flow_limit, &flow_limit);
1720     dump_thread = dpif_flow_dump_thread_create(udpif->dump);
1721     for (;;) {
1722         struct ukey_op ops[REVALIDATE_MAX_BATCH];
1723         int n_ops = 0;
1724
1725         struct dpif_flow flows[REVALIDATE_MAX_BATCH];
1726         const struct dpif_flow *f;
1727         int n_dumped;
1728
1729         long long int max_idle;
1730         long long int now;
1731         size_t n_dp_flows;
1732         bool kill_them_all;
1733
1734         n_dumped = dpif_flow_dump_next(dump_thread, flows, ARRAY_SIZE(flows));
1735         if (!n_dumped) {
1736             break;
1737         }
1738
1739         now = time_msec();
1740
1741         /* In normal operation we want to keep flows around until they have
1742          * been idle for 'ofproto_max_idle' milliseconds.  However:
1743          *
1744          *     - If the number of datapath flows climbs above 'flow_limit',
1745          *       drop that down to 100 ms to try to bring the flows down to
1746          *       the limit.
1747          *
1748          *     - If the number of datapath flows climbs above twice
1749          *       'flow_limit', delete all the datapath flows as an emergency
1750          *       measure.  (We reassess this condition for the next batch of
1751          *       datapath flows, so we will recover before all the flows are
1752          *       gone.) */
1753         n_dp_flows = udpif_get_n_flows(udpif);
1754         kill_them_all = n_dp_flows > flow_limit * 2;
1755         max_idle = n_dp_flows > flow_limit ? 100 : ofproto_max_idle;
1756
1757         for (f = flows; f < &flows[n_dumped]; f++) {
1758             long long int used = f->stats.used;
1759             struct udpif_key *ukey;
1760             bool already_dumped, keep;
1761
1762             if (!ukey_acquire(udpif, f, &ukey)) {
1763                 /* Another thread is processing this flow, so don't bother
1764                  * processing it.*/
1765                 COVERAGE_INC(upcall_ukey_contention);
1766                 continue;
1767             }
1768
1769             already_dumped = ukey->dump_seq == dump_seq;
1770             if (already_dumped) {
1771                 /* The flow has already been handled during this flow dump
1772                  * operation. Skip it. */
1773                 if (ukey->xcache) {
1774                     COVERAGE_INC(dumped_duplicate_flow);
1775                 } else {
1776                     COVERAGE_INC(dumped_new_flow);
1777                 }
1778                 ovs_mutex_unlock(&ukey->mutex);
1779                 continue;
1780             }
1781
1782             if (!used) {
1783                 used = ukey->created;
1784             }
1785             if (kill_them_all || (used && used < now - max_idle)) {
1786                 keep = false;
1787             } else {
1788                 keep = revalidate_ukey(udpif, ukey, &f->stats, reval_seq);
1789             }
1790             ukey->dump_seq = dump_seq;
1791             ukey->flow_exists = keep;
1792
1793             if (!keep) {
1794                 delete_op_init(&ops[n_ops++], ukey);
1795             }
1796             ovs_mutex_unlock(&ukey->mutex);
1797         }
1798
1799         if (n_ops) {
1800             push_ukey_ops__(udpif, ops, n_ops);
1801         }
1802         ovsrcu_quiesce();
1803     }
1804     dpif_flow_dump_thread_destroy(dump_thread);
1805 }
1806
1807 static bool
1808 handle_missed_revalidation(struct udpif *udpif, uint64_t reval_seq,
1809                            struct udpif_key *ukey)
1810 {
1811     struct dpif_flow_stats stats;
1812     bool keep;
1813
1814     COVERAGE_INC(revalidate_missed_dp_flow);
1815
1816     memset(&stats, 0, sizeof stats);
1817     ovs_mutex_lock(&ukey->mutex);
1818     keep = revalidate_ukey(udpif, ukey, &stats, reval_seq);
1819     ovs_mutex_unlock(&ukey->mutex);
1820
1821     return keep;
1822 }
1823
1824 static void
1825 revalidator_sweep__(struct revalidator *revalidator, bool purge)
1826 {
1827     struct udpif *udpif;
1828     uint64_t dump_seq, reval_seq;
1829     int slice;
1830
1831     udpif = revalidator->udpif;
1832     dump_seq = seq_read(udpif->dump_seq);
1833     reval_seq = seq_read(udpif->reval_seq);
1834     slice = revalidator - udpif->revalidators;
1835     ovs_assert(slice < udpif->n_revalidators);
1836
1837     for (int i = slice; i < N_UMAPS; i += udpif->n_revalidators) {
1838         struct ukey_op ops[REVALIDATE_MAX_BATCH];
1839         struct udpif_key *ukey;
1840         struct umap *umap = &udpif->ukeys[i];
1841         size_t n_ops = 0;
1842
1843         CMAP_FOR_EACH(ukey, cmap_node, &umap->cmap) {
1844             bool flow_exists, seq_mismatch;
1845
1846             /* Handler threads could be holding a ukey lock while it installs a
1847              * new flow, so don't hang around waiting for access to it. */
1848             if (ovs_mutex_trylock(&ukey->mutex)) {
1849                 continue;
1850             }
1851             flow_exists = ukey->flow_exists;
1852             seq_mismatch = (ukey->dump_seq != dump_seq
1853                             && ukey->reval_seq != reval_seq);
1854             ovs_mutex_unlock(&ukey->mutex);
1855
1856             if (flow_exists
1857                 && (purge
1858                     || (seq_mismatch
1859                         && !handle_missed_revalidation(udpif, reval_seq,
1860                                                        ukey)))) {
1861                 struct ukey_op *op = &ops[n_ops++];
1862
1863                 delete_op_init(op, ukey);
1864                 if (n_ops == REVALIDATE_MAX_BATCH) {
1865                     push_ukey_ops(udpif, umap, ops, n_ops);
1866                     n_ops = 0;
1867                 }
1868             } else if (!flow_exists) {
1869                 ovs_mutex_lock(&umap->mutex);
1870                 ukey_delete(umap, ukey);
1871                 ovs_mutex_unlock(&umap->mutex);
1872             }
1873         }
1874
1875         if (n_ops) {
1876             push_ukey_ops(udpif, umap, ops, n_ops);
1877         }
1878         ovsrcu_quiesce();
1879     }
1880 }
1881
1882 static void
1883 revalidator_sweep(struct revalidator *revalidator)
1884 {
1885     revalidator_sweep__(revalidator, false);
1886 }
1887
1888 static void
1889 revalidator_purge(struct revalidator *revalidator)
1890 {
1891     revalidator_sweep__(revalidator, true);
1892 }
1893 \f
1894 static void
1895 upcall_unixctl_show(struct unixctl_conn *conn, int argc OVS_UNUSED,
1896                     const char *argv[] OVS_UNUSED, void *aux OVS_UNUSED)
1897 {
1898     struct ds ds = DS_EMPTY_INITIALIZER;
1899     struct udpif *udpif;
1900
1901     LIST_FOR_EACH (udpif, list_node, &all_udpifs) {
1902         unsigned int flow_limit;
1903         size_t i;
1904
1905         atomic_read_relaxed(&udpif->flow_limit, &flow_limit);
1906
1907         ds_put_format(&ds, "%s:\n", dpif_name(udpif->dpif));
1908         ds_put_format(&ds, "\tflows         : (current %lu)"
1909             " (avg %u) (max %u) (limit %u)\n", udpif_get_n_flows(udpif),
1910             udpif->avg_n_flows, udpif->max_n_flows, flow_limit);
1911         ds_put_format(&ds, "\tdump duration : %lldms\n", udpif->dump_duration);
1912         ds_put_char(&ds, '\n');
1913
1914         for (i = 0; i < n_revalidators; i++) {
1915             struct revalidator *revalidator = &udpif->revalidators[i];
1916             int j, elements = 0;
1917
1918             for (j = i; j < N_UMAPS; j += n_revalidators) {
1919                 elements += cmap_count(&udpif->ukeys[j].cmap);
1920             }
1921             ds_put_format(&ds, "\t%u: (keys %d)\n", revalidator->id, elements);
1922         }
1923     }
1924
1925     unixctl_command_reply(conn, ds_cstr(&ds));
1926     ds_destroy(&ds);
1927 }
1928
1929 /* Disable using the megaflows.
1930  *
1931  * This command is only needed for advanced debugging, so it's not
1932  * documented in the man page. */
1933 static void
1934 upcall_unixctl_disable_megaflows(struct unixctl_conn *conn,
1935                                  int argc OVS_UNUSED,
1936                                  const char *argv[] OVS_UNUSED,
1937                                  void *aux OVS_UNUSED)
1938 {
1939     atomic_store_relaxed(&enable_megaflows, false);
1940     udpif_flush_all_datapaths();
1941     unixctl_command_reply(conn, "megaflows disabled");
1942 }
1943
1944 /* Re-enable using megaflows.
1945  *
1946  * This command is only needed for advanced debugging, so it's not
1947  * documented in the man page. */
1948 static void
1949 upcall_unixctl_enable_megaflows(struct unixctl_conn *conn,
1950                                 int argc OVS_UNUSED,
1951                                 const char *argv[] OVS_UNUSED,
1952                                 void *aux OVS_UNUSED)
1953 {
1954     atomic_store_relaxed(&enable_megaflows, true);
1955     udpif_flush_all_datapaths();
1956     unixctl_command_reply(conn, "megaflows enabled");
1957 }
1958
1959 /* Set the flow limit.
1960  *
1961  * This command is only needed for advanced debugging, so it's not
1962  * documented in the man page. */
1963 static void
1964 upcall_unixctl_set_flow_limit(struct unixctl_conn *conn,
1965                               int argc OVS_UNUSED,
1966                               const char *argv[] OVS_UNUSED,
1967                               void *aux OVS_UNUSED)
1968 {
1969     struct ds ds = DS_EMPTY_INITIALIZER;
1970     struct udpif *udpif;
1971     unsigned int flow_limit = atoi(argv[1]);
1972
1973     LIST_FOR_EACH (udpif, list_node, &all_udpifs) {
1974         atomic_store_relaxed(&udpif->flow_limit, flow_limit);
1975     }
1976     ds_put_format(&ds, "set flow_limit to %u\n", flow_limit);
1977     unixctl_command_reply(conn, ds_cstr(&ds));
1978     ds_destroy(&ds);
1979 }
1980
1981 static void
1982 upcall_unixctl_dump_wait(struct unixctl_conn *conn,
1983                          int argc OVS_UNUSED,
1984                          const char *argv[] OVS_UNUSED,
1985                          void *aux OVS_UNUSED)
1986 {
1987     if (list_is_singleton(&all_udpifs)) {
1988         struct udpif *udpif = NULL;
1989         size_t len;
1990
1991         udpif = OBJECT_CONTAINING(list_front(&all_udpifs), udpif, list_node);
1992         len = (udpif->n_conns + 1) * sizeof *udpif->conns;
1993         udpif->conn_seq = seq_read(udpif->dump_seq);
1994         udpif->conns = xrealloc(udpif->conns, len);
1995         udpif->conns[udpif->n_conns++] = conn;
1996     } else {
1997         unixctl_command_reply_error(conn, "can't wait on multiple udpifs.");
1998     }
1999 }
2000
2001 static void
2002 upcall_unixctl_purge(struct unixctl_conn *conn, int argc OVS_UNUSED,
2003                      const char *argv[] OVS_UNUSED, void *aux OVS_UNUSED)
2004 {
2005     struct udpif *udpif;
2006
2007     LIST_FOR_EACH (udpif, list_node, &all_udpifs) {
2008         int n;
2009
2010         for (n = 0; n < udpif->n_revalidators; n++) {
2011             revalidator_purge(&udpif->revalidators[n]);
2012         }
2013     }
2014     unixctl_command_reply(conn, "");
2015 }