ofproto-dpif: Do not block on uninitialized pause barriers.
[cascardo/ovs.git] / ofproto / ofproto-dpif-upcall.c
1 /* Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014, 2015 Nicira, Inc.
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.  */
14
15 #include <config.h>
16 #include "ofproto-dpif-upcall.h"
17
18 #include <errno.h>
19 #include <stdbool.h>
20 #include <inttypes.h>
21
22 #include "connmgr.h"
23 #include "coverage.h"
24 #include "cmap.h"
25 #include "dpif.h"
26 #include "dynamic-string.h"
27 #include "fail-open.h"
28 #include "guarded-list.h"
29 #include "latch.h"
30 #include "list.h"
31 #include "netlink.h"
32 #include "ofpbuf.h"
33 #include "ofproto-dpif-ipfix.h"
34 #include "ofproto-dpif-sflow.h"
35 #include "ofproto-dpif-xlate.h"
36 #include "ovs-rcu.h"
37 #include "packets.h"
38 #include "poll-loop.h"
39 #include "seq.h"
40 #include "unixctl.h"
41 #include "openvswitch/vlog.h"
42
43 #define MAX_QUEUE_LENGTH 512
44 #define UPCALL_MAX_BATCH 64
45 #define REVALIDATE_MAX_BATCH 50
46
47 VLOG_DEFINE_THIS_MODULE(ofproto_dpif_upcall);
48
49 COVERAGE_DEFINE(dumped_duplicate_flow);
50 COVERAGE_DEFINE(dumped_new_flow);
51 COVERAGE_DEFINE(handler_duplicate_upcall);
52 COVERAGE_DEFINE(upcall_ukey_contention);
53 COVERAGE_DEFINE(revalidate_missed_dp_flow);
54
55 /* A thread that reads upcalls from dpif, forwards each upcall's packet,
56  * and possibly sets up a kernel flow as a cache. */
57 struct handler {
58     struct udpif *udpif;               /* Parent udpif. */
59     pthread_t thread;                  /* Thread ID. */
60     uint32_t handler_id;               /* Handler id. */
61 };
62
63 /* In the absence of a multiple-writer multiple-reader datastructure for
64  * storing ukeys, we use a large number of cmaps, each with its own lock for
65  * writing. */
66 #define N_UMAPS 512 /* per udpif. */
67 struct umap {
68     struct ovs_mutex mutex;            /* Take for writing to the following. */
69     struct cmap cmap;                  /* Datapath flow keys. */
70 };
71
72 /* A thread that processes datapath flows, updates OpenFlow statistics, and
73  * updates or removes them if necessary. */
74 struct revalidator {
75     struct udpif *udpif;               /* Parent udpif. */
76     pthread_t thread;                  /* Thread ID. */
77     unsigned int id;                   /* ovsthread_id_self(). */
78 };
79
80 /* An upcall handler for ofproto_dpif.
81  *
82  * udpif keeps records of two kind of logically separate units:
83  *
84  * upcall handling
85  * ---------------
86  *
87  *    - An array of 'struct handler's for upcall handling and flow
88  *      installation.
89  *
90  * flow revalidation
91  * -----------------
92  *
93  *    - Revalidation threads which read the datapath flow table and maintains
94  *      them.
95  */
96 struct udpif {
97     struct ovs_list list_node;         /* In all_udpifs list. */
98
99     struct dpif *dpif;                 /* Datapath handle. */
100     struct dpif_backer *backer;        /* Opaque dpif_backer pointer. */
101
102     struct handler *handlers;          /* Upcall handlers. */
103     size_t n_handlers;
104
105     struct revalidator *revalidators;  /* Flow revalidators. */
106     size_t n_revalidators;
107
108     struct latch exit_latch;           /* Tells child threads to exit. */
109
110     /* Revalidation. */
111     struct seq *reval_seq;             /* Incremented to force revalidation. */
112     bool reval_exit;                   /* Set by leader on 'exit_latch. */
113     struct ovs_barrier reval_barrier;  /* Barrier used by revalidators. */
114     struct dpif_flow_dump *dump;       /* DPIF flow dump state. */
115     long long int dump_duration;       /* Duration of the last flow dump. */
116     struct seq *dump_seq;              /* Increments each dump iteration. */
117     atomic_bool enable_ufid;           /* If true, skip dumping flow attrs. */
118
119     /* These variables provide a mechanism for the main thread to pause
120      * all revalidation without having to completely shut the threads down.
121      * 'pause_latch' is shared between the main thread and the lead
122      * revalidator thread, so when it is desirable to halt revalidation, the
123      * main thread will set the latch. 'pause' and 'pause_barrier' are shared
124      * by revalidator threads. The lead revalidator will set 'pause' when it
125      * observes the latch has been set, and this will cause all revalidator
126      * threads to wait on 'pause_barrier' at the beginning of the next
127      * revalidation round. */
128     bool pause;                        /* Set by leader on 'pause_latch. */
129     struct latch pause_latch;          /* Set to force revalidators pause. */
130     struct ovs_barrier pause_barrier;  /* Barrier used to pause all */
131                                        /* revalidators by main thread. */
132
133     /* There are 'N_UMAPS' maps containing 'struct udpif_key' elements.
134      *
135      * During the flow dump phase, revalidators insert into these with a random
136      * distribution. During the garbage collection phase, each revalidator
137      * takes care of garbage collecting a slice of these maps. */
138     struct umap *ukeys;
139
140     /* Datapath flow statistics. */
141     unsigned int max_n_flows;
142     unsigned int avg_n_flows;
143
144     /* Following fields are accessed and modified by different threads. */
145     atomic_uint flow_limit;            /* Datapath flow hard limit. */
146
147     /* n_flows_mutex prevents multiple threads updating these concurrently. */
148     atomic_uint n_flows;               /* Number of flows in the datapath. */
149     atomic_llong n_flows_timestamp;    /* Last time n_flows was updated. */
150     struct ovs_mutex n_flows_mutex;
151
152     /* Following fields are accessed and modified only from the main thread. */
153     struct unixctl_conn **conns;       /* Connections waiting on dump_seq. */
154     uint64_t conn_seq;                 /* Corresponds to 'dump_seq' when
155                                           conns[n_conns-1] was stored. */
156     size_t n_conns;                    /* Number of connections waiting. */
157 };
158
159 enum upcall_type {
160     BAD_UPCALL,                 /* Some kind of bug somewhere. */
161     MISS_UPCALL,                /* A flow miss.  */
162     SFLOW_UPCALL,               /* sFlow sample. */
163     FLOW_SAMPLE_UPCALL,         /* Per-flow sampling. */
164     IPFIX_UPCALL                /* Per-bridge sampling. */
165 };
166
167 enum reval_result {
168     UKEY_KEEP,
169     UKEY_DELETE,
170     UKEY_MODIFY
171 };
172
173 struct upcall {
174     struct ofproto_dpif *ofproto;  /* Parent ofproto. */
175     const struct recirc_id_node *recirc; /* Recirculation context. */
176     bool have_recirc_ref;                /* Reference held on recirc ctx? */
177
178     /* The flow and packet are only required to be constant when using
179      * dpif-netdev.  If a modification is absolutely necessary, a const cast
180      * may be used with other datapaths. */
181     const struct flow *flow;       /* Parsed representation of the packet. */
182     const ovs_u128 *ufid;          /* Unique identifier for 'flow'. */
183     unsigned pmd_id;               /* Datapath poll mode driver id. */
184     const struct dp_packet *packet;   /* Packet associated with this upcall. */
185     ofp_port_t in_port;            /* OpenFlow in port, or OFPP_NONE. */
186
187     enum dpif_upcall_type type;    /* Datapath type of the upcall. */
188     const struct nlattr *userdata; /* Userdata for DPIF_UC_ACTION Upcalls. */
189     const struct nlattr *actions;  /* Flow actions in DPIF_UC_ACTION Upcalls. */
190
191     bool xout_initialized;         /* True if 'xout' must be uninitialized. */
192     struct xlate_out xout;         /* Result of xlate_actions(). */
193     struct ofpbuf odp_actions;     /* Datapath actions from xlate_actions(). */
194     struct flow_wildcards wc;      /* Dependencies that megaflow must match. */
195     struct ofpbuf put_actions;     /* Actions 'put' in the fastpath. */
196
197     struct dpif_ipfix *ipfix;      /* IPFIX pointer or NULL. */
198     struct dpif_sflow *sflow;      /* SFlow pointer or NULL. */
199
200     bool vsp_adjusted;             /* 'packet' and 'flow' were adjusted for
201                                       VLAN splinters if true. */
202
203     struct udpif_key *ukey;        /* Revalidator flow cache. */
204     bool ukey_persists;            /* Set true to keep 'ukey' beyond the
205                                       lifetime of this upcall. */
206
207     uint64_t dump_seq;             /* udpif->dump_seq at translation time. */
208     uint64_t reval_seq;            /* udpif->reval_seq at translation time. */
209
210     /* Not used by the upcall callback interface. */
211     const struct nlattr *key;      /* Datapath flow key. */
212     size_t key_len;                /* Datapath flow key length. */
213     const struct nlattr *out_tun_key;  /* Datapath output tunnel key. */
214
215     uint64_t odp_actions_stub[1024 / 8]; /* Stub for odp_actions. */
216 };
217
218 /* 'udpif_key's are responsible for tracking the little bit of state udpif
219  * needs to do flow expiration which can't be pulled directly from the
220  * datapath.  They may be created by any handler or revalidator thread at any
221  * time, and read by any revalidator during the dump phase. They are however
222  * each owned by a single revalidator which takes care of destroying them
223  * during the garbage-collection phase.
224  *
225  * The mutex within the ukey protects some members of the ukey. The ukey
226  * itself is protected by RCU and is held within a umap in the parent udpif.
227  * Adding or removing a ukey from a umap is only safe when holding the
228  * corresponding umap lock. */
229 struct udpif_key {
230     struct cmap_node cmap_node;     /* In parent revalidator 'ukeys' map. */
231
232     /* These elements are read only once created, and therefore aren't
233      * protected by a mutex. */
234     const struct nlattr *key;      /* Datapath flow key. */
235     size_t key_len;                /* Length of 'key'. */
236     const struct nlattr *mask;     /* Datapath flow mask. */
237     size_t mask_len;               /* Length of 'mask'. */
238     ovs_u128 ufid;                 /* Unique flow identifier. */
239     bool ufid_present;             /* True if 'ufid' is in datapath. */
240     uint32_t hash;                 /* Pre-computed hash for 'key'. */
241     unsigned pmd_id;               /* Datapath poll mode driver id. */
242
243     struct ovs_mutex mutex;                   /* Guards the following. */
244     struct dpif_flow_stats stats OVS_GUARDED; /* Last known stats.*/
245     long long int created OVS_GUARDED;        /* Estimate of creation time. */
246     uint64_t dump_seq OVS_GUARDED;            /* Tracks udpif->dump_seq. */
247     uint64_t reval_seq OVS_GUARDED;           /* Tracks udpif->reval_seq. */
248     bool flow_exists OVS_GUARDED;             /* Ensures flows are only deleted
249                                                  once. */
250     /* Datapath flow actions as nlattrs.  Protected by RCU.  Read with
251      * ukey_get_actions(), and write with ukey_set_actions(). */
252     OVSRCU_TYPE(struct ofpbuf *) actions;
253
254     struct xlate_cache *xcache OVS_GUARDED;   /* Cache for xlate entries that
255                                                * are affected by this ukey.
256                                                * Used for stats and learning.*/
257     union {
258         struct odputil_keybuf buf;
259         struct nlattr nla;
260     } keybuf, maskbuf;
261
262     /* Recirculation IDs with references held by the ukey. */
263     unsigned n_recircs;
264     uint32_t recircs[];   /* 'n_recircs' id's for which references are held. */
265 };
266
267 /* Datapath operation with optional ukey attached. */
268 struct ukey_op {
269     struct udpif_key *ukey;
270     struct dpif_flow_stats stats; /* Stats for 'op'. */
271     struct dpif_op dop;           /* Flow operation. */
272 };
273
274 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
275 static struct ovs_list all_udpifs = OVS_LIST_INITIALIZER(&all_udpifs);
276
277 static size_t recv_upcalls(struct handler *);
278 static int process_upcall(struct udpif *, struct upcall *,
279                           struct ofpbuf *odp_actions, struct flow_wildcards *);
280 static void handle_upcalls(struct udpif *, struct upcall *, size_t n_upcalls);
281 static void udpif_stop_threads(struct udpif *);
282 static void udpif_start_threads(struct udpif *, size_t n_handlers,
283                                 size_t n_revalidators);
284 static void udpif_pause_revalidators(struct udpif *);
285 static void udpif_resume_revalidators(struct udpif *);
286 static void *udpif_upcall_handler(void *);
287 static void *udpif_revalidator(void *);
288 static unsigned long udpif_get_n_flows(struct udpif *);
289 static void revalidate(struct revalidator *);
290 static void revalidator_pause(struct revalidator *);
291 static void revalidator_sweep(struct revalidator *);
292 static void revalidator_purge(struct revalidator *);
293 static void upcall_unixctl_show(struct unixctl_conn *conn, int argc,
294                                 const char *argv[], void *aux);
295 static void upcall_unixctl_disable_megaflows(struct unixctl_conn *, int argc,
296                                              const char *argv[], void *aux);
297 static void upcall_unixctl_enable_megaflows(struct unixctl_conn *, int argc,
298                                             const char *argv[], void *aux);
299 static void upcall_unixctl_disable_ufid(struct unixctl_conn *, int argc,
300                                               const char *argv[], void *aux);
301 static void upcall_unixctl_enable_ufid(struct unixctl_conn *, int argc,
302                                              const char *argv[], void *aux);
303 static void upcall_unixctl_set_flow_limit(struct unixctl_conn *conn, int argc,
304                                             const char *argv[], void *aux);
305 static void upcall_unixctl_dump_wait(struct unixctl_conn *conn, int argc,
306                                      const char *argv[], void *aux);
307 static void upcall_unixctl_purge(struct unixctl_conn *conn, int argc,
308                                  const char *argv[], void *aux);
309
310 static struct udpif_key *ukey_create_from_upcall(struct upcall *,
311                                                  struct flow_wildcards *);
312 static int ukey_create_from_dpif_flow(const struct udpif *,
313                                       const struct dpif_flow *,
314                                       struct udpif_key **);
315 static void ukey_get_actions(struct udpif_key *, const struct nlattr **actions,
316                              size_t *size);
317 static bool ukey_install_start(struct udpif *, struct udpif_key *ukey);
318 static bool ukey_install_finish(struct udpif_key *ukey, int error);
319 static bool ukey_install(struct udpif *udpif, struct udpif_key *ukey);
320 static struct udpif_key *ukey_lookup(struct udpif *udpif,
321                                      const ovs_u128 *ufid);
322 static int ukey_acquire(struct udpif *, const struct dpif_flow *,
323                         struct udpif_key **result, int *error);
324 static void ukey_delete__(struct udpif_key *);
325 static void ukey_delete(struct umap *, struct udpif_key *);
326 static enum upcall_type classify_upcall(enum dpif_upcall_type type,
327                                         const struct nlattr *userdata);
328
329 static int upcall_receive(struct upcall *, const struct dpif_backer *,
330                           const struct dp_packet *packet, enum dpif_upcall_type,
331                           const struct nlattr *userdata, const struct flow *,
332                           const ovs_u128 *ufid, const unsigned pmd_id);
333 static void upcall_uninit(struct upcall *);
334
335 static upcall_callback upcall_cb;
336 static dp_purge_callback dp_purge_cb;
337
338 static atomic_bool enable_megaflows = ATOMIC_VAR_INIT(true);
339 static atomic_bool enable_ufid = ATOMIC_VAR_INIT(true);
340
341 void
342 udpif_init(void)
343 {
344     static struct ovsthread_once once = OVSTHREAD_ONCE_INITIALIZER;
345     if (ovsthread_once_start(&once)) {
346         unixctl_command_register("upcall/show", "", 0, 0, upcall_unixctl_show,
347                                  NULL);
348         unixctl_command_register("upcall/disable-megaflows", "", 0, 0,
349                                  upcall_unixctl_disable_megaflows, NULL);
350         unixctl_command_register("upcall/enable-megaflows", "", 0, 0,
351                                  upcall_unixctl_enable_megaflows, NULL);
352         unixctl_command_register("upcall/disable-ufid", "", 0, 0,
353                                  upcall_unixctl_disable_ufid, NULL);
354         unixctl_command_register("upcall/enable-ufid", "", 0, 0,
355                                  upcall_unixctl_enable_ufid, NULL);
356         unixctl_command_register("upcall/set-flow-limit", "", 1, 1,
357                                  upcall_unixctl_set_flow_limit, NULL);
358         unixctl_command_register("revalidator/wait", "", 0, 0,
359                                  upcall_unixctl_dump_wait, NULL);
360         unixctl_command_register("revalidator/purge", "", 0, 0,
361                                  upcall_unixctl_purge, NULL);
362         ovsthread_once_done(&once);
363     }
364 }
365
366 struct udpif *
367 udpif_create(struct dpif_backer *backer, struct dpif *dpif)
368 {
369     struct udpif *udpif = xzalloc(sizeof *udpif);
370
371     udpif->dpif = dpif;
372     udpif->backer = backer;
373     atomic_init(&udpif->flow_limit, MIN(ofproto_flow_limit, 10000));
374     udpif->reval_seq = seq_create();
375     udpif->dump_seq = seq_create();
376     latch_init(&udpif->exit_latch);
377     latch_init(&udpif->pause_latch);
378     list_push_back(&all_udpifs, &udpif->list_node);
379     atomic_init(&udpif->enable_ufid, false);
380     atomic_init(&udpif->n_flows, 0);
381     atomic_init(&udpif->n_flows_timestamp, LLONG_MIN);
382     ovs_mutex_init(&udpif->n_flows_mutex);
383     udpif->ukeys = xmalloc(N_UMAPS * sizeof *udpif->ukeys);
384     for (int i = 0; i < N_UMAPS; i++) {
385         cmap_init(&udpif->ukeys[i].cmap);
386         ovs_mutex_init(&udpif->ukeys[i].mutex);
387     }
388
389     dpif_register_upcall_cb(dpif, upcall_cb, udpif);
390     dpif_register_dp_purge_cb(dpif, dp_purge_cb, udpif);
391
392     return udpif;
393 }
394
395 void
396 udpif_run(struct udpif *udpif)
397 {
398     if (udpif->conns && udpif->conn_seq != seq_read(udpif->dump_seq)) {
399         int i;
400
401         for (i = 0; i < udpif->n_conns; i++) {
402             unixctl_command_reply(udpif->conns[i], NULL);
403         }
404         free(udpif->conns);
405         udpif->conns = NULL;
406         udpif->n_conns = 0;
407     }
408 }
409
410 void
411 udpif_destroy(struct udpif *udpif)
412 {
413     udpif_stop_threads(udpif);
414
415     for (int i = 0; i < N_UMAPS; i++) {
416         cmap_destroy(&udpif->ukeys[i].cmap);
417         ovs_mutex_destroy(&udpif->ukeys[i].mutex);
418     }
419     free(udpif->ukeys);
420     udpif->ukeys = NULL;
421
422     list_remove(&udpif->list_node);
423     latch_destroy(&udpif->exit_latch);
424     latch_destroy(&udpif->pause_latch);
425     seq_destroy(udpif->reval_seq);
426     seq_destroy(udpif->dump_seq);
427     ovs_mutex_destroy(&udpif->n_flows_mutex);
428     free(udpif);
429 }
430
431 /* Stops the handler and revalidator threads, must be enclosed in
432  * ovsrcu quiescent state unless when destroying udpif. */
433 static void
434 udpif_stop_threads(struct udpif *udpif)
435 {
436     if (udpif && (udpif->n_handlers != 0 || udpif->n_revalidators != 0)) {
437         size_t i;
438
439         latch_set(&udpif->exit_latch);
440
441         for (i = 0; i < udpif->n_handlers; i++) {
442             struct handler *handler = &udpif->handlers[i];
443
444             xpthread_join(handler->thread, NULL);
445         }
446
447         for (i = 0; i < udpif->n_revalidators; i++) {
448             xpthread_join(udpif->revalidators[i].thread, NULL);
449         }
450
451         dpif_disable_upcall(udpif->dpif);
452
453         for (i = 0; i < udpif->n_revalidators; i++) {
454             struct revalidator *revalidator = &udpif->revalidators[i];
455
456             /* Delete ukeys, and delete all flows from the datapath to prevent
457              * double-counting stats. */
458             revalidator_purge(revalidator);
459         }
460
461         latch_poll(&udpif->exit_latch);
462
463         ovs_barrier_destroy(&udpif->reval_barrier);
464         ovs_barrier_destroy(&udpif->pause_barrier);
465
466         free(udpif->revalidators);
467         udpif->revalidators = NULL;
468         udpif->n_revalidators = 0;
469
470         free(udpif->handlers);
471         udpif->handlers = NULL;
472         udpif->n_handlers = 0;
473     }
474 }
475
476 /* Starts the handler and revalidator threads, must be enclosed in
477  * ovsrcu quiescent state. */
478 static void
479 udpif_start_threads(struct udpif *udpif, size_t n_handlers,
480                     size_t n_revalidators)
481 {
482     if (udpif && n_handlers && n_revalidators) {
483         size_t i;
484         bool enable_ufid;
485
486         udpif->n_handlers = n_handlers;
487         udpif->n_revalidators = n_revalidators;
488
489         udpif->handlers = xzalloc(udpif->n_handlers * sizeof *udpif->handlers);
490         for (i = 0; i < udpif->n_handlers; i++) {
491             struct handler *handler = &udpif->handlers[i];
492
493             handler->udpif = udpif;
494             handler->handler_id = i;
495             handler->thread = ovs_thread_create(
496                 "handler", udpif_upcall_handler, handler);
497         }
498
499         enable_ufid = ofproto_dpif_get_enable_ufid(udpif->backer);
500         atomic_init(&udpif->enable_ufid, enable_ufid);
501         dpif_enable_upcall(udpif->dpif);
502
503         ovs_barrier_init(&udpif->reval_barrier, udpif->n_revalidators);
504         ovs_barrier_init(&udpif->pause_barrier, udpif->n_revalidators + 1);
505         udpif->reval_exit = false;
506         udpif->pause = false;
507         udpif->revalidators = xzalloc(udpif->n_revalidators
508                                       * sizeof *udpif->revalidators);
509         for (i = 0; i < udpif->n_revalidators; i++) {
510             struct revalidator *revalidator = &udpif->revalidators[i];
511
512             revalidator->udpif = udpif;
513             revalidator->thread = ovs_thread_create(
514                 "revalidator", udpif_revalidator, revalidator);
515         }
516     }
517 }
518
519 /* Pauses all revalidators.  Should only be called by the main thread.
520  * When function returns, all revalidators are paused and will proceed
521  * only after udpif_resume_revalidators() is called. */
522 static void
523 udpif_pause_revalidators(struct udpif *udpif)
524 {
525     if (ofproto_dpif_backer_enabled(udpif->backer)) {
526         latch_set(&udpif->pause_latch);
527         ovs_barrier_block(&udpif->pause_barrier);
528     }
529 }
530
531 /* Resumes the pausing of revalidators.  Should only be called by the
532  * main thread. */
533 static void
534 udpif_resume_revalidators(struct udpif *udpif)
535 {
536     if (ofproto_dpif_backer_enabled(udpif->backer)) {
537         latch_poll(&udpif->pause_latch);
538         ovs_barrier_block(&udpif->pause_barrier);
539     }
540 }
541
542 /* Tells 'udpif' how many threads it should use to handle upcalls.
543  * 'n_handlers' and 'n_revalidators' can never be zero.  'udpif''s
544  * datapath handle must have packet reception enabled before starting
545  * threads. */
546 void
547 udpif_set_threads(struct udpif *udpif, size_t n_handlers,
548                   size_t n_revalidators)
549 {
550     ovs_assert(udpif);
551     ovs_assert(n_handlers && n_revalidators);
552
553     ovsrcu_quiesce_start();
554     if (udpif->n_handlers != n_handlers
555         || udpif->n_revalidators != n_revalidators) {
556         udpif_stop_threads(udpif);
557     }
558
559     if (!udpif->handlers && !udpif->revalidators) {
560         int error;
561
562         error = dpif_handlers_set(udpif->dpif, n_handlers);
563         if (error) {
564             VLOG_ERR("failed to configure handlers in dpif %s: %s",
565                      dpif_name(udpif->dpif), ovs_strerror(error));
566             return;
567         }
568
569         udpif_start_threads(udpif, n_handlers, n_revalidators);
570     }
571     ovsrcu_quiesce_end();
572 }
573
574 /* Waits for all ongoing upcall translations to complete.  This ensures that
575  * there are no transient references to any removed ofprotos (or other
576  * objects).  In particular, this should be called after an ofproto is removed
577  * (e.g. via xlate_remove_ofproto()) but before it is destroyed. */
578 void
579 udpif_synchronize(struct udpif *udpif)
580 {
581     /* This is stronger than necessary.  It would be sufficient to ensure
582      * (somehow) that each handler and revalidator thread had passed through
583      * its main loop once. */
584     size_t n_handlers = udpif->n_handlers;
585     size_t n_revalidators = udpif->n_revalidators;
586
587     ovsrcu_quiesce_start();
588     udpif_stop_threads(udpif);
589     udpif_start_threads(udpif, n_handlers, n_revalidators);
590     ovsrcu_quiesce_end();
591 }
592
593 /* Notifies 'udpif' that something changed which may render previous
594  * xlate_actions() results invalid. */
595 void
596 udpif_revalidate(struct udpif *udpif)
597 {
598     seq_change(udpif->reval_seq);
599 }
600
601 /* Returns a seq which increments every time 'udpif' pulls stats from the
602  * datapath.  Callers can use this to get a sense of when might be a good time
603  * to do periodic work which relies on relatively up to date statistics. */
604 struct seq *
605 udpif_dump_seq(struct udpif *udpif)
606 {
607     return udpif->dump_seq;
608 }
609
610 void
611 udpif_get_memory_usage(struct udpif *udpif, struct simap *usage)
612 {
613     size_t i;
614
615     simap_increase(usage, "handlers", udpif->n_handlers);
616
617     simap_increase(usage, "revalidators", udpif->n_revalidators);
618     for (i = 0; i < N_UMAPS; i++) {
619         simap_increase(usage, "udpif keys", cmap_count(&udpif->ukeys[i].cmap));
620     }
621 }
622
623 /* Remove flows from a single datapath. */
624 void
625 udpif_flush(struct udpif *udpif)
626 {
627     size_t n_handlers, n_revalidators;
628
629     n_handlers = udpif->n_handlers;
630     n_revalidators = udpif->n_revalidators;
631
632     ovsrcu_quiesce_start();
633
634     udpif_stop_threads(udpif);
635     dpif_flow_flush(udpif->dpif);
636     udpif_start_threads(udpif, n_handlers, n_revalidators);
637
638     ovsrcu_quiesce_end();
639 }
640
641 /* Removes all flows from all datapaths. */
642 static void
643 udpif_flush_all_datapaths(void)
644 {
645     struct udpif *udpif;
646
647     LIST_FOR_EACH (udpif, list_node, &all_udpifs) {
648         udpif_flush(udpif);
649     }
650 }
651
652 static bool
653 udpif_use_ufid(struct udpif *udpif)
654 {
655     bool enable;
656
657     atomic_read_relaxed(&enable_ufid, &enable);
658     return enable && ofproto_dpif_get_enable_ufid(udpif->backer);
659 }
660
661 \f
662 static unsigned long
663 udpif_get_n_flows(struct udpif *udpif)
664 {
665     long long int time, now;
666     unsigned long flow_count;
667
668     now = time_msec();
669     atomic_read_relaxed(&udpif->n_flows_timestamp, &time);
670     if (time < now - 100 && !ovs_mutex_trylock(&udpif->n_flows_mutex)) {
671         struct dpif_dp_stats stats;
672
673         atomic_store_relaxed(&udpif->n_flows_timestamp, now);
674         dpif_get_dp_stats(udpif->dpif, &stats);
675         flow_count = stats.n_flows;
676         atomic_store_relaxed(&udpif->n_flows, flow_count);
677         ovs_mutex_unlock(&udpif->n_flows_mutex);
678     } else {
679         atomic_read_relaxed(&udpif->n_flows, &flow_count);
680     }
681     return flow_count;
682 }
683
684 /* The upcall handler thread tries to read a batch of UPCALL_MAX_BATCH
685  * upcalls from dpif, processes the batch and installs corresponding flows
686  * in dpif. */
687 static void *
688 udpif_upcall_handler(void *arg)
689 {
690     struct handler *handler = arg;
691     struct udpif *udpif = handler->udpif;
692
693     while (!latch_is_set(&handler->udpif->exit_latch)) {
694         if (recv_upcalls(handler)) {
695             poll_immediate_wake();
696         } else {
697             dpif_recv_wait(udpif->dpif, handler->handler_id);
698             latch_wait(&udpif->exit_latch);
699         }
700         poll_block();
701     }
702
703     return NULL;
704 }
705
706 static size_t
707 recv_upcalls(struct handler *handler)
708 {
709     struct udpif *udpif = handler->udpif;
710     uint64_t recv_stubs[UPCALL_MAX_BATCH][512 / 8];
711     struct ofpbuf recv_bufs[UPCALL_MAX_BATCH];
712     struct dpif_upcall dupcalls[UPCALL_MAX_BATCH];
713     struct upcall upcalls[UPCALL_MAX_BATCH];
714     struct flow flows[UPCALL_MAX_BATCH];
715     size_t n_upcalls, i;
716
717     n_upcalls = 0;
718     while (n_upcalls < UPCALL_MAX_BATCH) {
719         struct ofpbuf *recv_buf = &recv_bufs[n_upcalls];
720         struct dpif_upcall *dupcall = &dupcalls[n_upcalls];
721         struct upcall *upcall = &upcalls[n_upcalls];
722         struct flow *flow = &flows[n_upcalls];
723         int error;
724
725         ofpbuf_use_stub(recv_buf, recv_stubs[n_upcalls],
726                         sizeof recv_stubs[n_upcalls]);
727         if (dpif_recv(udpif->dpif, handler->handler_id, dupcall, recv_buf)) {
728             ofpbuf_uninit(recv_buf);
729             break;
730         }
731
732         if (odp_flow_key_to_flow(dupcall->key, dupcall->key_len, flow)
733             == ODP_FIT_ERROR) {
734             goto free_dupcall;
735         }
736
737         error = upcall_receive(upcall, udpif->backer, &dupcall->packet,
738                                dupcall->type, dupcall->userdata, flow,
739                                &dupcall->ufid, PMD_ID_NULL);
740         if (error) {
741             if (error == ENODEV) {
742                 /* Received packet on datapath port for which we couldn't
743                  * associate an ofproto.  This can happen if a port is removed
744                  * while traffic is being received.  Print a rate-limited
745                  * message in case it happens frequently. */
746                 dpif_flow_put(udpif->dpif, DPIF_FP_CREATE, dupcall->key,
747                               dupcall->key_len, NULL, 0, NULL, 0,
748                               &dupcall->ufid, PMD_ID_NULL, NULL);
749                 VLOG_INFO_RL(&rl, "received packet on unassociated datapath "
750                              "port %"PRIu32, flow->in_port.odp_port);
751             }
752             goto free_dupcall;
753         }
754
755         upcall->key = dupcall->key;
756         upcall->key_len = dupcall->key_len;
757         upcall->ufid = &dupcall->ufid;
758
759         upcall->out_tun_key = dupcall->out_tun_key;
760         upcall->actions = dupcall->actions;
761
762         if (vsp_adjust_flow(upcall->ofproto, flow, &dupcall->packet)) {
763             upcall->vsp_adjusted = true;
764         }
765
766         pkt_metadata_from_flow(&dupcall->packet.md, flow);
767         flow_extract(&dupcall->packet, flow);
768
769         error = process_upcall(udpif, upcall,
770                                &upcall->odp_actions, &upcall->wc);
771         if (error) {
772             goto cleanup;
773         }
774
775         n_upcalls++;
776         continue;
777
778 cleanup:
779         upcall_uninit(upcall);
780 free_dupcall:
781         dp_packet_uninit(&dupcall->packet);
782         ofpbuf_uninit(recv_buf);
783     }
784
785     if (n_upcalls) {
786         handle_upcalls(handler->udpif, upcalls, n_upcalls);
787         for (i = 0; i < n_upcalls; i++) {
788             dp_packet_uninit(&dupcalls[i].packet);
789             ofpbuf_uninit(&recv_bufs[i]);
790             upcall_uninit(&upcalls[i]);
791         }
792     }
793
794     return n_upcalls;
795 }
796
797 static void *
798 udpif_revalidator(void *arg)
799 {
800     /* Used by all revalidators. */
801     struct revalidator *revalidator = arg;
802     struct udpif *udpif = revalidator->udpif;
803     bool leader = revalidator == &udpif->revalidators[0];
804
805     /* Used only by the leader. */
806     long long int start_time = 0;
807     uint64_t last_reval_seq = 0;
808     size_t n_flows = 0;
809
810     revalidator->id = ovsthread_id_self();
811     for (;;) {
812         if (leader) {
813             uint64_t reval_seq;
814
815             recirc_run(); /* Recirculation cleanup. */
816
817             reval_seq = seq_read(udpif->reval_seq);
818             last_reval_seq = reval_seq;
819
820             n_flows = udpif_get_n_flows(udpif);
821             udpif->max_n_flows = MAX(n_flows, udpif->max_n_flows);
822             udpif->avg_n_flows = (udpif->avg_n_flows + n_flows) / 2;
823
824             /* Only the leader checks the pause latch to prevent a race where
825              * some threads think it's false and proceed to block on
826              * reval_barrier and others think it's true and block indefinitely
827              * on the pause_barrier */
828             udpif->pause = latch_is_set(&udpif->pause_latch);
829
830             /* Only the leader checks the exit latch to prevent a race where
831              * some threads think it's true and exit and others think it's
832              * false and block indefinitely on the reval_barrier */
833             udpif->reval_exit = latch_is_set(&udpif->exit_latch);
834
835             start_time = time_msec();
836             if (!udpif->reval_exit) {
837                 bool terse_dump;
838
839                 terse_dump = udpif_use_ufid(udpif);
840                 udpif->dump = dpif_flow_dump_create(udpif->dpif, terse_dump);
841             }
842         }
843
844         /* Wait for the leader to start the flow dump. */
845         ovs_barrier_block(&udpif->reval_barrier);
846         if (udpif->pause) {
847             revalidator_pause(revalidator);
848         }
849
850         if (udpif->reval_exit) {
851             break;
852         }
853         revalidate(revalidator);
854
855         /* Wait for all flows to have been dumped before we garbage collect. */
856         ovs_barrier_block(&udpif->reval_barrier);
857         revalidator_sweep(revalidator);
858
859         /* Wait for all revalidators to finish garbage collection. */
860         ovs_barrier_block(&udpif->reval_barrier);
861
862         if (leader) {
863             unsigned int flow_limit;
864             long long int duration;
865
866             atomic_read_relaxed(&udpif->flow_limit, &flow_limit);
867
868             dpif_flow_dump_destroy(udpif->dump);
869             seq_change(udpif->dump_seq);
870
871             duration = MAX(time_msec() - start_time, 1);
872             udpif->dump_duration = duration;
873             if (duration > 2000) {
874                 flow_limit /= duration / 1000;
875             } else if (duration > 1300) {
876                 flow_limit = flow_limit * 3 / 4;
877             } else if (duration < 1000 && n_flows > 2000
878                        && flow_limit < n_flows * 1000 / duration) {
879                 flow_limit += 1000;
880             }
881             flow_limit = MIN(ofproto_flow_limit, MAX(flow_limit, 1000));
882             atomic_store_relaxed(&udpif->flow_limit, flow_limit);
883
884             if (duration > 2000) {
885                 VLOG_INFO("Spent an unreasonably long %lldms dumping flows",
886                           duration);
887             }
888
889             poll_timer_wait_until(start_time + MIN(ofproto_max_idle, 500));
890             seq_wait(udpif->reval_seq, last_reval_seq);
891             latch_wait(&udpif->exit_latch);
892             latch_wait(&udpif->pause_latch);
893             poll_block();
894         }
895     }
896
897     return NULL;
898 }
899 \f
900 static enum upcall_type
901 classify_upcall(enum dpif_upcall_type type, const struct nlattr *userdata)
902 {
903     union user_action_cookie cookie;
904     size_t userdata_len;
905
906     /* First look at the upcall type. */
907     switch (type) {
908     case DPIF_UC_ACTION:
909         break;
910
911     case DPIF_UC_MISS:
912         return MISS_UPCALL;
913
914     case DPIF_N_UC_TYPES:
915     default:
916         VLOG_WARN_RL(&rl, "upcall has unexpected type %"PRIu32, type);
917         return BAD_UPCALL;
918     }
919
920     /* "action" upcalls need a closer look. */
921     if (!userdata) {
922         VLOG_WARN_RL(&rl, "action upcall missing cookie");
923         return BAD_UPCALL;
924     }
925     userdata_len = nl_attr_get_size(userdata);
926     if (userdata_len < sizeof cookie.type
927         || userdata_len > sizeof cookie) {
928         VLOG_WARN_RL(&rl, "action upcall cookie has unexpected size %"PRIuSIZE,
929                      userdata_len);
930         return BAD_UPCALL;
931     }
932     memset(&cookie, 0, sizeof cookie);
933     memcpy(&cookie, nl_attr_get(userdata), userdata_len);
934     if (userdata_len == MAX(8, sizeof cookie.sflow)
935         && cookie.type == USER_ACTION_COOKIE_SFLOW) {
936         return SFLOW_UPCALL;
937     } else if (userdata_len == MAX(8, sizeof cookie.slow_path)
938                && cookie.type == USER_ACTION_COOKIE_SLOW_PATH) {
939         return MISS_UPCALL;
940     } else if (userdata_len == MAX(8, sizeof cookie.flow_sample)
941                && cookie.type == USER_ACTION_COOKIE_FLOW_SAMPLE) {
942         return FLOW_SAMPLE_UPCALL;
943     } else if (userdata_len == MAX(8, sizeof cookie.ipfix)
944                && cookie.type == USER_ACTION_COOKIE_IPFIX) {
945         return IPFIX_UPCALL;
946     } else {
947         VLOG_WARN_RL(&rl, "invalid user cookie of type %"PRIu16
948                      " and size %"PRIuSIZE, cookie.type, userdata_len);
949         return BAD_UPCALL;
950     }
951 }
952
953 /* Calculates slow path actions for 'xout'.  'buf' must statically be
954  * initialized with at least 128 bytes of space. */
955 static void
956 compose_slow_path(struct udpif *udpif, struct xlate_out *xout,
957                   const struct flow *flow, odp_port_t odp_in_port,
958                   struct ofpbuf *buf)
959 {
960     union user_action_cookie cookie;
961     odp_port_t port;
962     uint32_t pid;
963
964     cookie.type = USER_ACTION_COOKIE_SLOW_PATH;
965     cookie.slow_path.unused = 0;
966     cookie.slow_path.reason = xout->slow;
967
968     port = xout->slow & (SLOW_CFM | SLOW_BFD | SLOW_LACP | SLOW_STP)
969         ? ODPP_NONE
970         : odp_in_port;
971     pid = dpif_port_get_pid(udpif->dpif, port, flow_hash_5tuple(flow, 0));
972     odp_put_userspace_action(pid, &cookie, sizeof cookie.slow_path,
973                              ODPP_NONE, false, buf);
974 }
975
976 /* If there is no error, the upcall must be destroyed with upcall_uninit()
977  * before quiescing, as the referred objects are guaranteed to exist only
978  * until the calling thread quiesces.  Otherwise, do not call upcall_uninit()
979  * since the 'upcall->put_actions' remains uninitialized. */
980 static int
981 upcall_receive(struct upcall *upcall, const struct dpif_backer *backer,
982                const struct dp_packet *packet, enum dpif_upcall_type type,
983                const struct nlattr *userdata, const struct flow *flow,
984                const ovs_u128 *ufid, const unsigned pmd_id)
985 {
986     int error;
987
988     error = xlate_lookup(backer, flow, &upcall->ofproto, &upcall->ipfix,
989                          &upcall->sflow, NULL, &upcall->in_port);
990     if (error) {
991         return error;
992     }
993
994     upcall->recirc = NULL;
995     upcall->have_recirc_ref = false;
996     upcall->flow = flow;
997     upcall->packet = packet;
998     upcall->ufid = ufid;
999     upcall->pmd_id = pmd_id;
1000     upcall->type = type;
1001     upcall->userdata = userdata;
1002     ofpbuf_use_stub(&upcall->odp_actions, upcall->odp_actions_stub,
1003                     sizeof upcall->odp_actions_stub);
1004     ofpbuf_init(&upcall->put_actions, 0);
1005
1006     upcall->xout_initialized = false;
1007     upcall->vsp_adjusted = false;
1008     upcall->ukey_persists = false;
1009
1010     upcall->ukey = NULL;
1011     upcall->key = NULL;
1012     upcall->key_len = 0;
1013
1014     upcall->out_tun_key = NULL;
1015     upcall->actions = NULL;
1016
1017     return 0;
1018 }
1019
1020 static void
1021 upcall_xlate(struct udpif *udpif, struct upcall *upcall,
1022              struct ofpbuf *odp_actions, struct flow_wildcards *wc)
1023 {
1024     struct dpif_flow_stats stats;
1025     struct xlate_in xin;
1026
1027     stats.n_packets = 1;
1028     stats.n_bytes = dp_packet_size(upcall->packet);
1029     stats.used = time_msec();
1030     stats.tcp_flags = ntohs(upcall->flow->tcp_flags);
1031
1032     xlate_in_init(&xin, upcall->ofproto, upcall->flow, upcall->in_port, NULL,
1033                   stats.tcp_flags, upcall->packet, wc, odp_actions);
1034
1035     if (upcall->type == DPIF_UC_MISS) {
1036         xin.resubmit_stats = &stats;
1037
1038         if (xin.recirc) {
1039             /* We may install a datapath flow only if we get a reference to the
1040              * recirculation context (otherwise we could have recirculation
1041              * upcalls using recirculation ID for which no context can be
1042              * found).  We may still execute the flow's actions even if we
1043              * don't install the flow. */
1044             upcall->recirc = xin.recirc;
1045             upcall->have_recirc_ref = recirc_id_node_try_ref_rcu(xin.recirc);
1046         }
1047     } else {
1048         /* For non-miss upcalls, we are either executing actions (one of which
1049          * is an userspace action) for an upcall, in which case the stats have
1050          * already been taken care of, or there's a flow in the datapath which
1051          * this packet was accounted to.  Presumably the revalidators will deal
1052          * with pushing its stats eventually. */
1053     }
1054
1055     upcall->dump_seq = seq_read(udpif->dump_seq);
1056     upcall->reval_seq = seq_read(udpif->reval_seq);
1057     xlate_actions(&xin, &upcall->xout);
1058     upcall->xout_initialized = true;
1059
1060     /* Special case for fail-open mode.
1061      *
1062      * If we are in fail-open mode, but we are connected to a controller too,
1063      * then we should send the packet up to the controller in the hope that it
1064      * will try to set up a flow and thereby allow us to exit fail-open.
1065      *
1066      * See the top-level comment in fail-open.c for more information.
1067      *
1068      * Copy packets before they are modified by execution. */
1069     if (upcall->xout.fail_open) {
1070         const struct dp_packet *packet = upcall->packet;
1071         struct ofproto_packet_in *pin;
1072
1073         pin = xmalloc(sizeof *pin);
1074         pin->up.packet = xmemdup(dp_packet_data(packet), dp_packet_size(packet));
1075         pin->up.packet_len = dp_packet_size(packet);
1076         pin->up.reason = OFPR_NO_MATCH;
1077         pin->up.table_id = 0;
1078         pin->up.cookie = OVS_BE64_MAX;
1079         flow_get_metadata(upcall->flow, &pin->up.flow_metadata);
1080         pin->send_len = 0; /* Not used for flow table misses. */
1081         pin->miss_type = OFPROTO_PACKET_IN_NO_MISS;
1082         ofproto_dpif_send_packet_in(upcall->ofproto, pin);
1083     }
1084
1085     if (!upcall->xout.slow) {
1086         ofpbuf_use_const(&upcall->put_actions,
1087                          odp_actions->data, odp_actions->size);
1088     } else {
1089         ofpbuf_init(&upcall->put_actions, 0);
1090         compose_slow_path(udpif, &upcall->xout, upcall->flow,
1091                           upcall->flow->in_port.odp_port,
1092                           &upcall->put_actions);
1093     }
1094
1095     /* This function is also called for slow-pathed flows.  As we are only
1096      * going to create new datapath flows for actual datapath misses, there is
1097      * no point in creating a ukey otherwise. */
1098     if (upcall->type == DPIF_UC_MISS) {
1099         upcall->ukey = ukey_create_from_upcall(upcall, wc);
1100     }
1101 }
1102
1103 static void
1104 upcall_uninit(struct upcall *upcall)
1105 {
1106     if (upcall) {
1107         if (upcall->xout_initialized) {
1108             xlate_out_uninit(&upcall->xout);
1109         }
1110         ofpbuf_uninit(&upcall->odp_actions);
1111         ofpbuf_uninit(&upcall->put_actions);
1112         if (upcall->ukey) {
1113             if (!upcall->ukey_persists) {
1114                 ukey_delete__(upcall->ukey);
1115             }
1116         } else if (upcall->have_recirc_ref) {
1117             /* The reference was transferred to the ukey if one was created. */
1118             recirc_id_node_unref(upcall->recirc);
1119         }
1120     }
1121 }
1122
1123 static int
1124 upcall_cb(const struct dp_packet *packet, const struct flow *flow, ovs_u128 *ufid,
1125           unsigned pmd_id, enum dpif_upcall_type type,
1126           const struct nlattr *userdata, struct ofpbuf *actions,
1127           struct flow_wildcards *wc, struct ofpbuf *put_actions, void *aux)
1128 {
1129     static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
1130     struct udpif *udpif = aux;
1131     unsigned int flow_limit;
1132     struct upcall upcall;
1133     bool megaflow;
1134     int error;
1135
1136     atomic_read_relaxed(&enable_megaflows, &megaflow);
1137     atomic_read_relaxed(&udpif->flow_limit, &flow_limit);
1138
1139     error = upcall_receive(&upcall, udpif->backer, packet, type, userdata,
1140                            flow, ufid, pmd_id);
1141     if (error) {
1142         return error;
1143     }
1144
1145     error = process_upcall(udpif, &upcall, actions, wc);
1146     if (error) {
1147         goto out;
1148     }
1149
1150     if (upcall.xout.slow && put_actions) {
1151         ofpbuf_put(put_actions, upcall.put_actions.data,
1152                    upcall.put_actions.size);
1153     }
1154
1155     if (OVS_UNLIKELY(!megaflow)) {
1156         flow_wildcards_init_for_packet(wc, flow);
1157     }
1158
1159     if (udpif_get_n_flows(udpif) >= flow_limit) {
1160         VLOG_WARN_RL(&rl, "upcall_cb failure: datapath flow limit reached");
1161         error = ENOSPC;
1162         goto out;
1163     }
1164
1165     /* Prevent miss flow installation if the key has recirculation ID but we
1166      * were not able to get a reference on it. */
1167     if (type == DPIF_UC_MISS && upcall.recirc && !upcall.have_recirc_ref) {
1168         VLOG_WARN_RL(&rl, "upcall_cb failure: no reference for recirc flow");
1169         error = ENOSPC;
1170         goto out;
1171     }
1172
1173     if (upcall.ukey && !ukey_install(udpif, upcall.ukey)) {
1174         VLOG_WARN_RL(&rl, "upcall_cb failure: ukey installation fails");
1175         error = ENOSPC;
1176     }
1177 out:
1178     if (!error) {
1179         upcall.ukey_persists = true;
1180     }
1181     upcall_uninit(&upcall);
1182     return error;
1183 }
1184
1185 static int
1186 process_upcall(struct udpif *udpif, struct upcall *upcall,
1187                struct ofpbuf *odp_actions, struct flow_wildcards *wc)
1188 {
1189     const struct nlattr *userdata = upcall->userdata;
1190     const struct dp_packet *packet = upcall->packet;
1191     const struct flow *flow = upcall->flow;
1192
1193     switch (classify_upcall(upcall->type, userdata)) {
1194     case MISS_UPCALL:
1195         upcall_xlate(udpif, upcall, odp_actions, wc);
1196         return 0;
1197
1198     case SFLOW_UPCALL:
1199         if (upcall->sflow) {
1200             union user_action_cookie cookie;
1201             const struct nlattr *actions;
1202             size_t actions_len = 0;
1203             struct dpif_sflow_actions sflow_actions;
1204             memset(&sflow_actions, 0, sizeof sflow_actions);
1205             memset(&cookie, 0, sizeof cookie);
1206             memcpy(&cookie, nl_attr_get(userdata), sizeof cookie.sflow);
1207             if (upcall->actions) {
1208                 /* Actions were passed up from datapath. */
1209                 actions = nl_attr_get(upcall->actions);
1210                 actions_len = nl_attr_get_size(upcall->actions);
1211                 if (actions && actions_len) {
1212                     dpif_sflow_read_actions(flow, actions, actions_len,
1213                                             &sflow_actions);
1214                 }
1215             }
1216             if (actions_len == 0) {
1217                 /* Lookup actions in userspace cache. */
1218                 struct udpif_key *ukey = ukey_lookup(udpif, upcall->ufid);
1219                 if (ukey) {
1220                     ukey_get_actions(ukey, &actions, &actions_len);
1221                     dpif_sflow_read_actions(flow, actions, actions_len,
1222                                             &sflow_actions);
1223                 }
1224             }
1225             dpif_sflow_received(upcall->sflow, packet, flow,
1226                                 flow->in_port.odp_port, &cookie,
1227                                 actions_len > 0 ? &sflow_actions : NULL);
1228         }
1229         break;
1230
1231     case IPFIX_UPCALL:
1232         if (upcall->ipfix) {
1233             union user_action_cookie cookie;
1234             struct flow_tnl output_tunnel_key;
1235
1236             memset(&cookie, 0, sizeof cookie);
1237             memcpy(&cookie, nl_attr_get(userdata), sizeof cookie.ipfix);
1238
1239             if (upcall->out_tun_key) {
1240                 odp_tun_key_from_attr(upcall->out_tun_key, false,
1241                                       &output_tunnel_key);
1242             }
1243             dpif_ipfix_bridge_sample(upcall->ipfix, packet, flow,
1244                                      flow->in_port.odp_port,
1245                                      cookie.ipfix.output_odp_port,
1246                                      upcall->out_tun_key ?
1247                                          &output_tunnel_key : NULL);
1248         }
1249         break;
1250
1251     case FLOW_SAMPLE_UPCALL:
1252         if (upcall->ipfix) {
1253             union user_action_cookie cookie;
1254
1255             memset(&cookie, 0, sizeof cookie);
1256             memcpy(&cookie, nl_attr_get(userdata), sizeof cookie.flow_sample);
1257
1258             /* The flow reflects exactly the contents of the packet.
1259              * Sample the packet using it. */
1260             dpif_ipfix_flow_sample(upcall->ipfix, packet, flow,
1261                                    cookie.flow_sample.collector_set_id,
1262                                    cookie.flow_sample.probability,
1263                                    cookie.flow_sample.obs_domain_id,
1264                                    cookie.flow_sample.obs_point_id);
1265         }
1266         break;
1267
1268     case BAD_UPCALL:
1269         break;
1270     }
1271
1272     return EAGAIN;
1273 }
1274
1275 static void
1276 handle_upcalls(struct udpif *udpif, struct upcall *upcalls,
1277                size_t n_upcalls)
1278 {
1279     struct dpif_op *opsp[UPCALL_MAX_BATCH * 2];
1280     struct ukey_op ops[UPCALL_MAX_BATCH * 2];
1281     unsigned int flow_limit;
1282     size_t n_ops, n_opsp, i;
1283     bool may_put;
1284     bool megaflow;
1285
1286     atomic_read_relaxed(&udpif->flow_limit, &flow_limit);
1287     atomic_read_relaxed(&enable_megaflows, &megaflow);
1288
1289     may_put = udpif_get_n_flows(udpif) < flow_limit;
1290
1291     /* Handle the packets individually in order of arrival.
1292      *
1293      *   - For SLOW_CFM, SLOW_LACP, SLOW_STP, and SLOW_BFD, translation is what
1294      *     processes received packets for these protocols.
1295      *
1296      *   - For SLOW_CONTROLLER, translation sends the packet to the OpenFlow
1297      *     controller.
1298      *
1299      * The loop fills 'ops' with an array of operations to execute in the
1300      * datapath. */
1301     n_ops = 0;
1302     for (i = 0; i < n_upcalls; i++) {
1303         struct upcall *upcall = &upcalls[i];
1304         const struct dp_packet *packet = upcall->packet;
1305         struct ukey_op *op;
1306
1307         if (upcall->vsp_adjusted) {
1308             /* This packet was received on a VLAN splinter port.  We added a
1309              * VLAN to the packet to make the packet resemble the flow, but the
1310              * actions were composed assuming that the packet contained no
1311              * VLAN.  So, we must remove the VLAN header from the packet before
1312              * trying to execute the actions. */
1313             if (upcall->odp_actions.size) {
1314                 eth_pop_vlan(CONST_CAST(struct dp_packet *, upcall->packet));
1315             }
1316
1317             /* Remove the flow vlan tags inserted by vlan splinter logic
1318              * to ensure megaflow masks generated match the data path flow. */
1319             CONST_CAST(struct flow *, upcall->flow)->vlan_tci = 0;
1320         }
1321
1322         /* Do not install a flow into the datapath if:
1323          *
1324          *    - The datapath already has too many flows.
1325          *
1326          *    - We received this packet via some flow installed in the kernel
1327          *      already.
1328          *
1329          *    - Upcall was a recirculation but we do not have a reference to
1330          *      to the recirculation ID. */
1331         if (may_put && upcall->type == DPIF_UC_MISS &&
1332             (!upcall->recirc || upcall->have_recirc_ref)) {
1333             struct udpif_key *ukey = upcall->ukey;
1334
1335             upcall->ukey_persists = true;
1336             op = &ops[n_ops++];
1337
1338             op->ukey = ukey;
1339             op->dop.type = DPIF_OP_FLOW_PUT;
1340             op->dop.u.flow_put.flags = DPIF_FP_CREATE;
1341             op->dop.u.flow_put.key = ukey->key;
1342             op->dop.u.flow_put.key_len = ukey->key_len;
1343             op->dop.u.flow_put.mask = ukey->mask;
1344             op->dop.u.flow_put.mask_len = ukey->mask_len;
1345             op->dop.u.flow_put.ufid = upcall->ufid;
1346             op->dop.u.flow_put.stats = NULL;
1347             ukey_get_actions(ukey, &op->dop.u.flow_put.actions,
1348                              &op->dop.u.flow_put.actions_len);
1349         }
1350
1351         if (upcall->odp_actions.size) {
1352             op = &ops[n_ops++];
1353             op->ukey = NULL;
1354             op->dop.type = DPIF_OP_EXECUTE;
1355             op->dop.u.execute.packet = CONST_CAST(struct dp_packet *, packet);
1356             odp_key_to_pkt_metadata(upcall->key, upcall->key_len,
1357                                     &op->dop.u.execute.packet->md);
1358             op->dop.u.execute.actions = upcall->odp_actions.data;
1359             op->dop.u.execute.actions_len = upcall->odp_actions.size;
1360             op->dop.u.execute.needs_help = (upcall->xout.slow & SLOW_ACTION) != 0;
1361             op->dop.u.execute.probe = false;
1362         }
1363     }
1364
1365     /* Execute batch.
1366      *
1367      * We install ukeys before installing the flows, locking them for exclusive
1368      * access by this thread for the period of installation. This ensures that
1369      * other threads won't attempt to delete the flows as we are creating them.
1370      */
1371     n_opsp = 0;
1372     for (i = 0; i < n_ops; i++) {
1373         struct udpif_key *ukey = ops[i].ukey;
1374
1375         if (ukey) {
1376             /* If we can't install the ukey, don't install the flow. */
1377             if (!ukey_install_start(udpif, ukey)) {
1378                 ukey_delete__(ukey);
1379                 ops[i].ukey = NULL;
1380                 continue;
1381             }
1382         }
1383         opsp[n_opsp++] = &ops[i].dop;
1384     }
1385     dpif_operate(udpif->dpif, opsp, n_opsp);
1386     for (i = 0; i < n_ops; i++) {
1387         if (ops[i].ukey) {
1388             ukey_install_finish(ops[i].ukey, ops[i].dop.error);
1389         }
1390     }
1391 }
1392
1393 static uint32_t
1394 get_ufid_hash(const ovs_u128 *ufid)
1395 {
1396     return ufid->u32[0];
1397 }
1398
1399 static struct udpif_key *
1400 ukey_lookup(struct udpif *udpif, const ovs_u128 *ufid)
1401 {
1402     struct udpif_key *ukey;
1403     int idx = get_ufid_hash(ufid) % N_UMAPS;
1404     struct cmap *cmap = &udpif->ukeys[idx].cmap;
1405
1406     CMAP_FOR_EACH_WITH_HASH (ukey, cmap_node, get_ufid_hash(ufid), cmap) {
1407         if (ovs_u128_equals(&ukey->ufid, ufid)) {
1408             return ukey;
1409         }
1410     }
1411     return NULL;
1412 }
1413
1414 /* Provides safe lockless access of RCU protected 'ukey->actions'.  Callers may
1415  * alternatively access the field directly if they take 'ukey->mutex'. */
1416 static void
1417 ukey_get_actions(struct udpif_key *ukey, const struct nlattr **actions, size_t *size)
1418 {
1419     const struct ofpbuf *buf = ovsrcu_get(struct ofpbuf *, &ukey->actions);
1420     *actions = buf->data;
1421     *size = buf->size;
1422 }
1423
1424 static void
1425 ukey_set_actions(struct udpif_key *ukey, const struct ofpbuf *actions)
1426 {
1427     ovsrcu_postpone(ofpbuf_delete,
1428                     ovsrcu_get_protected(struct ofpbuf *, &ukey->actions));
1429     ovsrcu_set(&ukey->actions, ofpbuf_clone(actions));
1430 }
1431
1432 static struct udpif_key *
1433 ukey_create__(const struct nlattr *key, size_t key_len,
1434               const struct nlattr *mask, size_t mask_len,
1435               bool ufid_present, const ovs_u128 *ufid,
1436               const unsigned pmd_id, const struct ofpbuf *actions,
1437               uint64_t dump_seq, uint64_t reval_seq, long long int used,
1438               const struct recirc_id_node *key_recirc, struct xlate_out *xout)
1439     OVS_NO_THREAD_SAFETY_ANALYSIS
1440 {
1441     unsigned n_recircs = (key_recirc ? 1 : 0) + (xout ? xout->n_recircs : 0);
1442     struct udpif_key *ukey = xmalloc(sizeof *ukey +
1443                                      n_recircs * sizeof *ukey->recircs);
1444
1445     memcpy(&ukey->keybuf, key, key_len);
1446     ukey->key = &ukey->keybuf.nla;
1447     ukey->key_len = key_len;
1448     memcpy(&ukey->maskbuf, mask, mask_len);
1449     ukey->mask = &ukey->maskbuf.nla;
1450     ukey->mask_len = mask_len;
1451     ukey->ufid_present = ufid_present;
1452     ukey->ufid = *ufid;
1453     ukey->pmd_id = pmd_id;
1454     ukey->hash = get_ufid_hash(&ukey->ufid);
1455
1456     ovsrcu_init(&ukey->actions, NULL);
1457     ukey_set_actions(ukey, actions);
1458
1459     ovs_mutex_init(&ukey->mutex);
1460     ukey->dump_seq = dump_seq;
1461     ukey->reval_seq = reval_seq;
1462     ukey->flow_exists = false;
1463     ukey->created = time_msec();
1464     memset(&ukey->stats, 0, sizeof ukey->stats);
1465     ukey->stats.used = used;
1466     ukey->xcache = NULL;
1467
1468     ukey->n_recircs = n_recircs;
1469     if (key_recirc) {
1470         ukey->recircs[0] = key_recirc->id;
1471     }
1472     if (xout && xout->n_recircs) {
1473         const uint32_t *act_recircs = xlate_out_get_recircs(xout);
1474
1475         memcpy(ukey->recircs + (key_recirc ? 1 : 0), act_recircs,
1476                xout->n_recircs * sizeof *ukey->recircs);
1477         xlate_out_take_recircs(xout);
1478     }
1479     return ukey;
1480 }
1481
1482 static struct udpif_key *
1483 ukey_create_from_upcall(struct upcall *upcall, struct flow_wildcards *wc)
1484 {
1485     struct odputil_keybuf keystub, maskstub;
1486     struct ofpbuf keybuf, maskbuf;
1487     bool megaflow;
1488     struct odp_flow_key_parms odp_parms = {
1489         .flow = upcall->flow,
1490         .mask = &wc->masks,
1491     };
1492
1493     odp_parms.support = ofproto_dpif_get_support(upcall->ofproto)->odp;
1494     if (upcall->key_len) {
1495         ofpbuf_use_const(&keybuf, upcall->key, upcall->key_len);
1496     } else {
1497         /* dpif-netdev doesn't provide a netlink-formatted flow key in the
1498          * upcall, so convert the upcall's flow here. */
1499         ofpbuf_use_stack(&keybuf, &keystub, sizeof keystub);
1500         odp_parms.odp_in_port = upcall->flow->in_port.odp_port;
1501         odp_flow_key_from_flow(&odp_parms, &keybuf);
1502     }
1503
1504     atomic_read_relaxed(&enable_megaflows, &megaflow);
1505     ofpbuf_use_stack(&maskbuf, &maskstub, sizeof maskstub);
1506     if (megaflow) {
1507         odp_parms.odp_in_port = ODPP_NONE;
1508         odp_parms.key_buf = &keybuf;
1509
1510         odp_flow_key_from_mask(&odp_parms, &maskbuf);
1511     }
1512
1513     return ukey_create__(keybuf.data, keybuf.size, maskbuf.data, maskbuf.size,
1514                          true, upcall->ufid, upcall->pmd_id,
1515                          &upcall->put_actions, upcall->dump_seq,
1516                          upcall->reval_seq, 0,
1517                          upcall->have_recirc_ref ? upcall->recirc : NULL,
1518                          &upcall->xout);
1519 }
1520
1521 static int
1522 ukey_create_from_dpif_flow(const struct udpif *udpif,
1523                            const struct dpif_flow *flow,
1524                            struct udpif_key **ukey)
1525 {
1526     struct dpif_flow full_flow;
1527     struct ofpbuf actions;
1528     uint64_t dump_seq, reval_seq;
1529     uint64_t stub[DPIF_FLOW_BUFSIZE / 8];
1530     const struct nlattr *a;
1531     unsigned int left;
1532
1533     if (!flow->key_len || !flow->actions_len) {
1534         struct ofpbuf buf;
1535         int err;
1536
1537         /* If the key or actions were not provided by the datapath, fetch the
1538          * full flow. */
1539         ofpbuf_use_stack(&buf, &stub, sizeof stub);
1540         err = dpif_flow_get(udpif->dpif, NULL, 0, &flow->ufid,
1541                             flow->pmd_id, &buf, &full_flow);
1542         if (err) {
1543             return err;
1544         }
1545         flow = &full_flow;
1546     }
1547
1548     /* Check the flow actions for recirculation action.  As recirculation
1549      * relies on OVS userspace internal state, we need to delete all old
1550      * datapath flows with recirculation upon OVS restart. */
1551     NL_ATTR_FOR_EACH_UNSAFE (a, left, flow->actions, flow->actions_len) {
1552         if (nl_attr_type(a) == OVS_ACTION_ATTR_RECIRC) {
1553             return EINVAL;
1554         }
1555     }
1556
1557     dump_seq = seq_read(udpif->dump_seq);
1558     reval_seq = seq_read(udpif->reval_seq);
1559     ofpbuf_use_const(&actions, &flow->actions, flow->actions_len);
1560     *ukey = ukey_create__(flow->key, flow->key_len,
1561                           flow->mask, flow->mask_len, flow->ufid_present,
1562                           &flow->ufid, flow->pmd_id, &actions, dump_seq,
1563                           reval_seq, flow->stats.used, NULL, NULL);
1564
1565     return 0;
1566 }
1567
1568 /* Attempts to insert a ukey into the shared ukey maps.
1569  *
1570  * On success, returns true, installs the ukey and returns it in a locked
1571  * state. Otherwise, returns false. */
1572 static bool
1573 ukey_install_start(struct udpif *udpif, struct udpif_key *new_ukey)
1574     OVS_TRY_LOCK(true, new_ukey->mutex)
1575 {
1576     struct umap *umap;
1577     struct udpif_key *old_ukey;
1578     uint32_t idx;
1579     bool locked = false;
1580
1581     idx = new_ukey->hash % N_UMAPS;
1582     umap = &udpif->ukeys[idx];
1583     ovs_mutex_lock(&umap->mutex);
1584     old_ukey = ukey_lookup(udpif, &new_ukey->ufid);
1585     if (old_ukey) {
1586         /* Uncommon case: A ukey is already installed with the same UFID. */
1587         if (old_ukey->key_len == new_ukey->key_len
1588             && !memcmp(old_ukey->key, new_ukey->key, new_ukey->key_len)) {
1589             COVERAGE_INC(handler_duplicate_upcall);
1590         } else {
1591             struct ds ds = DS_EMPTY_INITIALIZER;
1592
1593             odp_format_ufid(&old_ukey->ufid, &ds);
1594             ds_put_cstr(&ds, " ");
1595             odp_flow_key_format(old_ukey->key, old_ukey->key_len, &ds);
1596             ds_put_cstr(&ds, "\n");
1597             odp_format_ufid(&new_ukey->ufid, &ds);
1598             ds_put_cstr(&ds, " ");
1599             odp_flow_key_format(new_ukey->key, new_ukey->key_len, &ds);
1600
1601             VLOG_WARN_RL(&rl, "Conflicting ukey for flows:\n%s", ds_cstr(&ds));
1602             ds_destroy(&ds);
1603         }
1604     } else {
1605         ovs_mutex_lock(&new_ukey->mutex);
1606         cmap_insert(&umap->cmap, &new_ukey->cmap_node, new_ukey->hash);
1607         locked = true;
1608     }
1609     ovs_mutex_unlock(&umap->mutex);
1610
1611     return locked;
1612 }
1613
1614 static void
1615 ukey_install_finish__(struct udpif_key *ukey) OVS_REQUIRES(ukey->mutex)
1616 {
1617     ukey->flow_exists = true;
1618 }
1619
1620 static bool
1621 ukey_install_finish(struct udpif_key *ukey, int error)
1622     OVS_RELEASES(ukey->mutex)
1623 {
1624     if (!error) {
1625         ukey_install_finish__(ukey);
1626     }
1627     ovs_mutex_unlock(&ukey->mutex);
1628
1629     return !error;
1630 }
1631
1632 static bool
1633 ukey_install(struct udpif *udpif, struct udpif_key *ukey)
1634 {
1635     /* The usual way to keep 'ukey->flow_exists' in sync with the datapath is
1636      * to call ukey_install_start(), install the corresponding datapath flow,
1637      * then call ukey_install_finish(). The netdev interface using upcall_cb()
1638      * doesn't provide a function to separately finish the flow installation,
1639      * so we perform the operations together here.
1640      *
1641      * This is fine currently, as revalidator threads will only delete this
1642      * ukey during revalidator_sweep() and only if the dump_seq is mismatched.
1643      * It is unlikely for a revalidator thread to advance dump_seq and reach
1644      * the next GC phase between ukey creation and flow installation. */
1645     return ukey_install_start(udpif, ukey) && ukey_install_finish(ukey, 0);
1646 }
1647
1648 /* Searches for a ukey in 'udpif->ukeys' that matches 'flow' and attempts to
1649  * lock the ukey. If the ukey does not exist, create it.
1650  *
1651  * Returns 0 on success, setting *result to the matching ukey and returning it
1652  * in a locked state. Otherwise, returns an errno and clears *result. EBUSY
1653  * indicates that another thread is handling this flow. Other errors indicate
1654  * an unexpected condition creating a new ukey.
1655  *
1656  * *error is an output parameter provided to appease the threadsafety analyser,
1657  * and its value matches the return value. */
1658 static int
1659 ukey_acquire(struct udpif *udpif, const struct dpif_flow *flow,
1660              struct udpif_key **result, int *error)
1661     OVS_TRY_LOCK(0, (*result)->mutex)
1662 {
1663     struct udpif_key *ukey;
1664     int retval;
1665
1666     ukey = ukey_lookup(udpif, &flow->ufid);
1667     if (ukey) {
1668         retval = ovs_mutex_trylock(&ukey->mutex);
1669     } else {
1670         /* Usually we try to avoid installing flows from revalidator threads,
1671          * because locking on a umap may cause handler threads to block.
1672          * However there are certain cases, like when ovs-vswitchd is
1673          * restarted, where it is desirable to handle flows that exist in the
1674          * datapath gracefully (ie, don't just clear the datapath). */
1675         bool install;
1676
1677         retval = ukey_create_from_dpif_flow(udpif, flow, &ukey);
1678         if (retval) {
1679             goto done;
1680         }
1681         install = ukey_install_start(udpif, ukey);
1682         if (install) {
1683             ukey_install_finish__(ukey);
1684             retval = 0;
1685         } else {
1686             ukey_delete__(ukey);
1687             retval = EBUSY;
1688         }
1689     }
1690
1691 done:
1692     *error = retval;
1693     if (retval) {
1694         *result = NULL;
1695     } else {
1696         *result = ukey;
1697     }
1698     return retval;
1699 }
1700
1701 static void
1702 ukey_delete__(struct udpif_key *ukey)
1703     OVS_NO_THREAD_SAFETY_ANALYSIS
1704 {
1705     if (ukey) {
1706         for (int i = 0; i < ukey->n_recircs; i++) {
1707             recirc_free_id(ukey->recircs[i]);
1708         }
1709         xlate_cache_delete(ukey->xcache);
1710         ofpbuf_delete(ovsrcu_get(struct ofpbuf *, &ukey->actions));
1711         ovs_mutex_destroy(&ukey->mutex);
1712         free(ukey);
1713     }
1714 }
1715
1716 static void
1717 ukey_delete(struct umap *umap, struct udpif_key *ukey)
1718     OVS_REQUIRES(umap->mutex)
1719 {
1720     cmap_remove(&umap->cmap, &ukey->cmap_node, ukey->hash);
1721     ovsrcu_postpone(ukey_delete__, ukey);
1722 }
1723
1724 static bool
1725 should_revalidate(const struct udpif *udpif, uint64_t packets,
1726                   long long int used)
1727 {
1728     long long int metric, now, duration;
1729
1730     if (udpif->dump_duration < 200) {
1731         /* We are likely to handle full revalidation for the flows. */
1732         return true;
1733     }
1734
1735     /* Calculate the mean time between seeing these packets. If this
1736      * exceeds the threshold, then delete the flow rather than performing
1737      * costly revalidation for flows that aren't being hit frequently.
1738      *
1739      * This is targeted at situations where the dump_duration is high (~1s),
1740      * and revalidation is triggered by a call to udpif_revalidate(). In
1741      * these situations, revalidation of all flows causes fluctuations in the
1742      * flow_limit due to the interaction with the dump_duration and max_idle.
1743      * This tends to result in deletion of low-throughput flows anyway, so
1744      * skip the revalidation and just delete those flows. */
1745     packets = MAX(packets, 1);
1746     now = MAX(used, time_msec());
1747     duration = now - used;
1748     metric = duration / packets;
1749
1750     if (metric < 200) {
1751         /* The flow is receiving more than ~5pps, so keep it. */
1752         return true;
1753     }
1754     return false;
1755 }
1756
1757 /* Verifies that the datapath actions of 'ukey' are still correct, and pushes
1758  * 'stats' for it.
1759  *
1760  * Returns a recommended action for 'ukey', options include:
1761  *      UKEY_DELETE The ukey should be deleted.
1762  *      UKEY_KEEP   The ukey is fine as is.
1763  *      UKEY_MODIFY The ukey's actions should be changed but is otherwise
1764  *                  fine.  Callers should change the actions to those found
1765  *                  in the caller supplied 'odp_actions' buffer. */
1766 static enum reval_result
1767 revalidate_ukey(struct udpif *udpif, struct udpif_key *ukey,
1768                 const struct dpif_flow_stats *stats,
1769                 struct ofpbuf *odp_actions, uint64_t reval_seq)
1770     OVS_REQUIRES(ukey->mutex)
1771 {
1772     struct xlate_out xout, *xoutp;
1773     struct netflow *netflow;
1774     struct ofproto_dpif *ofproto;
1775     struct dpif_flow_stats push;
1776     struct flow flow, dp_mask;
1777     struct flow_wildcards wc;
1778     enum reval_result result;
1779     uint64_t *dp64, *xout64;
1780     ofp_port_t ofp_in_port;
1781     struct xlate_in xin;
1782     long long int last_used;
1783     int error;
1784     size_t i;
1785     bool need_revalidate;
1786
1787     result = UKEY_DELETE;
1788     xoutp = NULL;
1789     netflow = NULL;
1790
1791     ofpbuf_clear(odp_actions);
1792     need_revalidate = (ukey->reval_seq != reval_seq);
1793     last_used = ukey->stats.used;
1794     push.used = stats->used;
1795     push.tcp_flags = stats->tcp_flags;
1796     push.n_packets = (stats->n_packets > ukey->stats.n_packets
1797                       ? stats->n_packets - ukey->stats.n_packets
1798                       : 0);
1799     push.n_bytes = (stats->n_bytes > ukey->stats.n_bytes
1800                     ? stats->n_bytes - ukey->stats.n_bytes
1801                     : 0);
1802
1803     if (need_revalidate && last_used
1804         && !should_revalidate(udpif, push.n_packets, last_used)) {
1805         goto exit;
1806     }
1807
1808     /* We will push the stats, so update the ukey stats cache. */
1809     ukey->stats = *stats;
1810     if (!push.n_packets && !need_revalidate) {
1811         result = UKEY_KEEP;
1812         goto exit;
1813     }
1814
1815     if (ukey->xcache && !need_revalidate) {
1816         xlate_push_stats(ukey->xcache, &push);
1817         result = UKEY_KEEP;
1818         goto exit;
1819     }
1820
1821     if (odp_flow_key_to_flow(ukey->key, ukey->key_len, &flow)
1822         == ODP_FIT_ERROR) {
1823         goto exit;
1824     }
1825
1826     error = xlate_lookup(udpif->backer, &flow, &ofproto, NULL, NULL, &netflow,
1827                          &ofp_in_port);
1828     if (error) {
1829         goto exit;
1830     }
1831
1832     if (need_revalidate) {
1833         xlate_cache_clear(ukey->xcache);
1834     }
1835     if (!ukey->xcache) {
1836         ukey->xcache = xlate_cache_new();
1837     }
1838
1839     xlate_in_init(&xin, ofproto, &flow, ofp_in_port, NULL, push.tcp_flags,
1840                   NULL, need_revalidate ? &wc : NULL, odp_actions);
1841     if (push.n_packets) {
1842         xin.resubmit_stats = &push;
1843         xin.may_learn = true;
1844     }
1845     xin.xcache = ukey->xcache;
1846     xlate_actions(&xin, &xout);
1847     xoutp = &xout;
1848
1849     if (!need_revalidate) {
1850         result = UKEY_KEEP;
1851         goto exit;
1852     }
1853
1854     if (xout.slow) {
1855         ofpbuf_clear(odp_actions);
1856         compose_slow_path(udpif, &xout, &flow, flow.in_port.odp_port,
1857                           odp_actions);
1858     }
1859
1860     if (odp_flow_key_to_mask(ukey->mask, ukey->mask_len, ukey->key,
1861                              ukey->key_len, &dp_mask, &flow) == ODP_FIT_ERROR) {
1862         goto exit;
1863     }
1864
1865     /* Since the kernel is free to ignore wildcarded bits in the mask, we can't
1866      * directly check that the masks are the same.  Instead we check that the
1867      * mask in the kernel is more specific i.e. less wildcarded, than what
1868      * we've calculated here.  This guarantees we don't catch any packets we
1869      * shouldn't with the megaflow. */
1870     dp64 = (uint64_t *) &dp_mask;
1871     xout64 = (uint64_t *) &wc.masks;
1872     for (i = 0; i < FLOW_U64S; i++) {
1873         if ((dp64[i] | xout64[i]) != dp64[i]) {
1874             goto exit;
1875         }
1876     }
1877
1878     if (!ofpbuf_equal(odp_actions,
1879                       ovsrcu_get(struct ofpbuf *, &ukey->actions))) {
1880         /* The datapath mask was OK, but the actions seem to have changed.
1881          * Let's modify it in place. */
1882         result = UKEY_MODIFY;
1883         goto exit;
1884     }
1885
1886     result = UKEY_KEEP;
1887
1888 exit:
1889     if (result != UKEY_DELETE) {
1890         ukey->reval_seq = reval_seq;
1891     }
1892     if (netflow && result == UKEY_DELETE) {
1893         netflow_flow_clear(netflow, &flow);
1894     }
1895     xlate_out_uninit(xoutp);
1896     return result;
1897 }
1898
1899 static void
1900 delete_op_init__(struct udpif *udpif, struct ukey_op *op,
1901                  const struct dpif_flow *flow)
1902 {
1903     op->ukey = NULL;
1904     op->dop.type = DPIF_OP_FLOW_DEL;
1905     op->dop.u.flow_del.key = flow->key;
1906     op->dop.u.flow_del.key_len = flow->key_len;
1907     op->dop.u.flow_del.ufid = flow->ufid_present ? &flow->ufid : NULL;
1908     op->dop.u.flow_del.pmd_id = flow->pmd_id;
1909     op->dop.u.flow_del.stats = &op->stats;
1910     op->dop.u.flow_del.terse = udpif_use_ufid(udpif);
1911 }
1912
1913 static void
1914 delete_op_init(struct udpif *udpif, struct ukey_op *op, struct udpif_key *ukey)
1915 {
1916     op->ukey = ukey;
1917     op->dop.type = DPIF_OP_FLOW_DEL;
1918     op->dop.u.flow_del.key = ukey->key;
1919     op->dop.u.flow_del.key_len = ukey->key_len;
1920     op->dop.u.flow_del.ufid = ukey->ufid_present ? &ukey->ufid : NULL;
1921     op->dop.u.flow_del.pmd_id = ukey->pmd_id;
1922     op->dop.u.flow_del.stats = &op->stats;
1923     op->dop.u.flow_del.terse = udpif_use_ufid(udpif);
1924 }
1925
1926 static void
1927 modify_op_init(struct ukey_op *op, struct udpif_key *ukey)
1928 {
1929     op->ukey = ukey;
1930     op->dop.type = DPIF_OP_FLOW_PUT;
1931     op->dop.u.flow_put.flags = DPIF_FP_MODIFY;
1932     op->dop.u.flow_put.key = ukey->key;
1933     op->dop.u.flow_put.key_len = ukey->key_len;
1934     op->dop.u.flow_put.mask = ukey->mask;
1935     op->dop.u.flow_put.mask_len = ukey->mask_len;
1936     op->dop.u.flow_put.ufid = &ukey->ufid;
1937     op->dop.u.flow_put.pmd_id = ukey->pmd_id;
1938     op->dop.u.flow_put.stats = NULL;
1939     ukey_get_actions(ukey, &op->dop.u.flow_put.actions,
1940                      &op->dop.u.flow_put.actions_len);
1941 }
1942
1943 static void
1944 push_ukey_ops__(struct udpif *udpif, struct ukey_op *ops, size_t n_ops)
1945 {
1946     struct dpif_op *opsp[REVALIDATE_MAX_BATCH];
1947     size_t i;
1948
1949     ovs_assert(n_ops <= REVALIDATE_MAX_BATCH);
1950     for (i = 0; i < n_ops; i++) {
1951         opsp[i] = &ops[i].dop;
1952     }
1953     dpif_operate(udpif->dpif, opsp, n_ops);
1954
1955     for (i = 0; i < n_ops; i++) {
1956         struct ukey_op *op = &ops[i];
1957         struct dpif_flow_stats *push, *stats, push_buf;
1958
1959         stats = op->dop.u.flow_del.stats;
1960         push = &push_buf;
1961
1962         if (op->dop.type != DPIF_OP_FLOW_DEL) {
1963             /* Only deleted flows need their stats pushed. */
1964             continue;
1965         }
1966
1967         if (op->dop.error) {
1968             /* flow_del error, 'stats' is unusable. */
1969             continue;
1970         }
1971
1972         if (op->ukey) {
1973             ovs_mutex_lock(&op->ukey->mutex);
1974             push->used = MAX(stats->used, op->ukey->stats.used);
1975             push->tcp_flags = stats->tcp_flags | op->ukey->stats.tcp_flags;
1976             push->n_packets = stats->n_packets - op->ukey->stats.n_packets;
1977             push->n_bytes = stats->n_bytes - op->ukey->stats.n_bytes;
1978             ovs_mutex_unlock(&op->ukey->mutex);
1979         } else {
1980             push = stats;
1981         }
1982
1983         if (push->n_packets || netflow_exists()) {
1984             const struct nlattr *key = op->dop.u.flow_del.key;
1985             size_t key_len = op->dop.u.flow_del.key_len;
1986             struct ofproto_dpif *ofproto;
1987             struct netflow *netflow;
1988             ofp_port_t ofp_in_port;
1989             struct flow flow;
1990             int error;
1991
1992             if (op->ukey) {
1993                 ovs_mutex_lock(&op->ukey->mutex);
1994                 if (op->ukey->xcache) {
1995                     xlate_push_stats(op->ukey->xcache, push);
1996                     ovs_mutex_unlock(&op->ukey->mutex);
1997                     continue;
1998                 }
1999                 ovs_mutex_unlock(&op->ukey->mutex);
2000                 key = op->ukey->key;
2001                 key_len = op->ukey->key_len;
2002             }
2003
2004             if (odp_flow_key_to_flow(key, key_len, &flow)
2005                 == ODP_FIT_ERROR) {
2006                 continue;
2007             }
2008
2009             error = xlate_lookup(udpif->backer, &flow, &ofproto, NULL, NULL,
2010                                  &netflow, &ofp_in_port);
2011             if (!error) {
2012                 struct xlate_in xin;
2013
2014                 xlate_in_init(&xin, ofproto, &flow, ofp_in_port, NULL,
2015                               push->tcp_flags, NULL, NULL, NULL);
2016                 xin.resubmit_stats = push->n_packets ? push : NULL;
2017                 xin.may_learn = push->n_packets > 0;
2018                 xlate_actions_for_side_effects(&xin);
2019
2020                 if (netflow) {
2021                     netflow_flow_clear(netflow, &flow);
2022                 }
2023             }
2024         }
2025     }
2026 }
2027
2028 static void
2029 push_ukey_ops(struct udpif *udpif, struct umap *umap,
2030               struct ukey_op *ops, size_t n_ops)
2031 {
2032     int i;
2033
2034     push_ukey_ops__(udpif, ops, n_ops);
2035     ovs_mutex_lock(&umap->mutex);
2036     for (i = 0; i < n_ops; i++) {
2037         ukey_delete(umap, ops[i].ukey);
2038     }
2039     ovs_mutex_unlock(&umap->mutex);
2040 }
2041
2042 static void
2043 log_unexpected_flow(const struct dpif_flow *flow, int error)
2044 {
2045     static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(10, 60);
2046     struct ds ds = DS_EMPTY_INITIALIZER;
2047
2048     ds_put_format(&ds, "Failed to acquire udpif_key corresponding to "
2049                   "unexpected flow (%s): ", ovs_strerror(error));
2050     odp_format_ufid(&flow->ufid, &ds);
2051     VLOG_WARN_RL(&rl, "%s", ds_cstr(&ds));
2052 }
2053
2054 static void
2055 revalidate(struct revalidator *revalidator)
2056 {
2057     uint64_t odp_actions_stub[1024 / 8];
2058     struct ofpbuf odp_actions = OFPBUF_STUB_INITIALIZER(odp_actions_stub);
2059
2060     struct udpif *udpif = revalidator->udpif;
2061     struct dpif_flow_dump_thread *dump_thread;
2062     uint64_t dump_seq, reval_seq;
2063     unsigned int flow_limit;
2064
2065     dump_seq = seq_read(udpif->dump_seq);
2066     reval_seq = seq_read(udpif->reval_seq);
2067     atomic_read_relaxed(&udpif->flow_limit, &flow_limit);
2068     dump_thread = dpif_flow_dump_thread_create(udpif->dump);
2069     for (;;) {
2070         struct ukey_op ops[REVALIDATE_MAX_BATCH];
2071         int n_ops = 0;
2072
2073         struct dpif_flow flows[REVALIDATE_MAX_BATCH];
2074         const struct dpif_flow *f;
2075         int n_dumped;
2076
2077         long long int max_idle;
2078         long long int now;
2079         size_t n_dp_flows;
2080         bool kill_them_all;
2081
2082         n_dumped = dpif_flow_dump_next(dump_thread, flows, ARRAY_SIZE(flows));
2083         if (!n_dumped) {
2084             break;
2085         }
2086
2087         now = time_msec();
2088
2089         /* In normal operation we want to keep flows around until they have
2090          * been idle for 'ofproto_max_idle' milliseconds.  However:
2091          *
2092          *     - If the number of datapath flows climbs above 'flow_limit',
2093          *       drop that down to 100 ms to try to bring the flows down to
2094          *       the limit.
2095          *
2096          *     - If the number of datapath flows climbs above twice
2097          *       'flow_limit', delete all the datapath flows as an emergency
2098          *       measure.  (We reassess this condition for the next batch of
2099          *       datapath flows, so we will recover before all the flows are
2100          *       gone.) */
2101         n_dp_flows = udpif_get_n_flows(udpif);
2102         kill_them_all = n_dp_flows > flow_limit * 2;
2103         max_idle = n_dp_flows > flow_limit ? 100 : ofproto_max_idle;
2104
2105         for (f = flows; f < &flows[n_dumped]; f++) {
2106             long long int used = f->stats.used;
2107             enum reval_result result;
2108             struct udpif_key *ukey;
2109             bool already_dumped;
2110             int error;
2111
2112             if (ukey_acquire(udpif, f, &ukey, &error)) {
2113                 if (error == EBUSY) {
2114                     /* Another thread is processing this flow, so don't bother
2115                      * processing it.*/
2116                     COVERAGE_INC(upcall_ukey_contention);
2117                 } else {
2118                     log_unexpected_flow(f, error);
2119                     if (error != ENOENT) {
2120                         delete_op_init__(udpif, &ops[n_ops++], f);
2121                     }
2122                 }
2123                 continue;
2124             }
2125
2126             already_dumped = ukey->dump_seq == dump_seq;
2127             if (already_dumped) {
2128                 /* The flow has already been handled during this flow dump
2129                  * operation. Skip it. */
2130                 if (ukey->xcache) {
2131                     COVERAGE_INC(dumped_duplicate_flow);
2132                 } else {
2133                     COVERAGE_INC(dumped_new_flow);
2134                 }
2135                 ovs_mutex_unlock(&ukey->mutex);
2136                 continue;
2137             }
2138
2139             if (!used) {
2140                 used = ukey->created;
2141             }
2142             if (kill_them_all || (used && used < now - max_idle)) {
2143                 result = UKEY_DELETE;
2144             } else {
2145                 result = revalidate_ukey(udpif, ukey, &f->stats, &odp_actions,
2146                                          reval_seq);
2147             }
2148             ukey->dump_seq = dump_seq;
2149             ukey->flow_exists = result != UKEY_DELETE;
2150
2151             if (result == UKEY_DELETE) {
2152                 delete_op_init(udpif, &ops[n_ops++], ukey);
2153             } else if (result == UKEY_MODIFY) {
2154                 ukey_set_actions(ukey, &odp_actions);
2155                 modify_op_init(&ops[n_ops++], ukey);
2156             }
2157             ovs_mutex_unlock(&ukey->mutex);
2158         }
2159
2160         if (n_ops) {
2161             push_ukey_ops__(udpif, ops, n_ops);
2162         }
2163         ovsrcu_quiesce();
2164     }
2165     dpif_flow_dump_thread_destroy(dump_thread);
2166     ofpbuf_uninit(&odp_actions);
2167 }
2168
2169 /* Pauses the 'revalidator', can only proceed after main thread
2170  * calls udpif_resume_revalidators(). */
2171 static void
2172 revalidator_pause(struct revalidator *revalidator)
2173 {
2174     /* The first block is for sync'ing the pause with main thread. */
2175     ovs_barrier_block(&revalidator->udpif->pause_barrier);
2176     /* The second block is for pausing until main thread resumes. */
2177     ovs_barrier_block(&revalidator->udpif->pause_barrier);
2178 }
2179
2180 static void
2181 revalidator_sweep__(struct revalidator *revalidator, bool purge)
2182 {
2183     struct udpif *udpif;
2184     uint64_t dump_seq, reval_seq;
2185     int slice;
2186
2187     udpif = revalidator->udpif;
2188     dump_seq = seq_read(udpif->dump_seq);
2189     reval_seq = seq_read(udpif->reval_seq);
2190     slice = revalidator - udpif->revalidators;
2191     ovs_assert(slice < udpif->n_revalidators);
2192
2193     for (int i = slice; i < N_UMAPS; i += udpif->n_revalidators) {
2194         uint64_t odp_actions_stub[1024 / 8];
2195         struct ofpbuf odp_actions = OFPBUF_STUB_INITIALIZER(odp_actions_stub);
2196
2197         struct ukey_op ops[REVALIDATE_MAX_BATCH];
2198         struct udpif_key *ukey;
2199         struct umap *umap = &udpif->ukeys[i];
2200         size_t n_ops = 0;
2201
2202         CMAP_FOR_EACH(ukey, cmap_node, &umap->cmap) {
2203             bool flow_exists, seq_mismatch;
2204             enum reval_result result;
2205
2206             /* Handler threads could be holding a ukey lock while it installs a
2207              * new flow, so don't hang around waiting for access to it. */
2208             if (ovs_mutex_trylock(&ukey->mutex)) {
2209                 continue;
2210             }
2211             flow_exists = ukey->flow_exists;
2212             seq_mismatch = (ukey->dump_seq != dump_seq
2213                             && ukey->reval_seq != reval_seq);
2214
2215             if (purge) {
2216                 result = UKEY_DELETE;
2217             } else if (!seq_mismatch) {
2218                 result = UKEY_KEEP;
2219             } else {
2220                 struct dpif_flow_stats stats;
2221                 COVERAGE_INC(revalidate_missed_dp_flow);
2222                 memset(&stats, 0, sizeof stats);
2223                 result = revalidate_ukey(udpif, ukey, &stats, &odp_actions,
2224                                          reval_seq);
2225             }
2226             ovs_mutex_unlock(&ukey->mutex);
2227
2228             if (result == UKEY_DELETE) {
2229                 delete_op_init(udpif, &ops[n_ops++], ukey);
2230             } else if (result == UKEY_MODIFY) {
2231                 ukey_set_actions(ukey, &odp_actions);
2232                 modify_op_init(&ops[n_ops++], ukey);
2233             }
2234
2235             if (n_ops == REVALIDATE_MAX_BATCH) {
2236                 push_ukey_ops(udpif, umap, ops, n_ops);
2237                 n_ops = 0;
2238             }
2239
2240             if (!flow_exists) {
2241                 ovs_mutex_lock(&umap->mutex);
2242                 ukey_delete(umap, ukey);
2243                 ovs_mutex_unlock(&umap->mutex);
2244             }
2245         }
2246
2247         if (n_ops) {
2248             push_ukey_ops(udpif, umap, ops, n_ops);
2249         }
2250
2251         ofpbuf_uninit(&odp_actions);
2252         ovsrcu_quiesce();
2253     }
2254 }
2255
2256 static void
2257 revalidator_sweep(struct revalidator *revalidator)
2258 {
2259     revalidator_sweep__(revalidator, false);
2260 }
2261
2262 static void
2263 revalidator_purge(struct revalidator *revalidator)
2264 {
2265     revalidator_sweep__(revalidator, true);
2266 }
2267
2268 /* In reaction to dpif purge, purges all 'ukey's with same 'pmd_id'. */
2269 static void
2270 dp_purge_cb(void *aux, unsigned pmd_id)
2271 {
2272     struct udpif *udpif = aux;
2273     size_t i;
2274
2275     udpif_pause_revalidators(udpif);
2276     for (i = 0; i < N_UMAPS; i++) {
2277         struct ukey_op ops[REVALIDATE_MAX_BATCH];
2278         struct udpif_key *ukey;
2279         struct umap *umap = &udpif->ukeys[i];
2280         size_t n_ops = 0;
2281
2282         CMAP_FOR_EACH(ukey, cmap_node, &umap->cmap) {
2283              if (ukey->pmd_id == pmd_id) {
2284                 delete_op_init(udpif, &ops[n_ops++], ukey);
2285                 if (n_ops == REVALIDATE_MAX_BATCH) {
2286                     push_ukey_ops(udpif, umap, ops, n_ops);
2287                     n_ops = 0;
2288                 }
2289             }
2290         }
2291
2292         if (n_ops) {
2293             push_ukey_ops(udpif, umap, ops, n_ops);
2294         }
2295
2296         ovsrcu_quiesce();
2297     }
2298     udpif_resume_revalidators(udpif);
2299 }
2300 \f
2301 static void
2302 upcall_unixctl_show(struct unixctl_conn *conn, int argc OVS_UNUSED,
2303                     const char *argv[] OVS_UNUSED, void *aux OVS_UNUSED)
2304 {
2305     struct ds ds = DS_EMPTY_INITIALIZER;
2306     struct udpif *udpif;
2307
2308     LIST_FOR_EACH (udpif, list_node, &all_udpifs) {
2309         unsigned int flow_limit;
2310         bool ufid_enabled;
2311         size_t i;
2312
2313         atomic_read_relaxed(&udpif->flow_limit, &flow_limit);
2314         ufid_enabled = udpif_use_ufid(udpif);
2315
2316         ds_put_format(&ds, "%s:\n", dpif_name(udpif->dpif));
2317         ds_put_format(&ds, "\tflows         : (current %lu)"
2318             " (avg %u) (max %u) (limit %u)\n", udpif_get_n_flows(udpif),
2319             udpif->avg_n_flows, udpif->max_n_flows, flow_limit);
2320         ds_put_format(&ds, "\tdump duration : %lldms\n", udpif->dump_duration);
2321         ds_put_format(&ds, "\tufid enabled : ");
2322         if (ufid_enabled) {
2323             ds_put_format(&ds, "true\n");
2324         } else {
2325             ds_put_format(&ds, "false\n");
2326         }
2327         ds_put_char(&ds, '\n');
2328
2329         for (i = 0; i < n_revalidators; i++) {
2330             struct revalidator *revalidator = &udpif->revalidators[i];
2331             int j, elements = 0;
2332
2333             for (j = i; j < N_UMAPS; j += n_revalidators) {
2334                 elements += cmap_count(&udpif->ukeys[j].cmap);
2335             }
2336             ds_put_format(&ds, "\t%u: (keys %d)\n", revalidator->id, elements);
2337         }
2338     }
2339
2340     unixctl_command_reply(conn, ds_cstr(&ds));
2341     ds_destroy(&ds);
2342 }
2343
2344 /* Disable using the megaflows.
2345  *
2346  * This command is only needed for advanced debugging, so it's not
2347  * documented in the man page. */
2348 static void
2349 upcall_unixctl_disable_megaflows(struct unixctl_conn *conn,
2350                                  int argc OVS_UNUSED,
2351                                  const char *argv[] OVS_UNUSED,
2352                                  void *aux OVS_UNUSED)
2353 {
2354     atomic_store_relaxed(&enable_megaflows, false);
2355     udpif_flush_all_datapaths();
2356     unixctl_command_reply(conn, "megaflows disabled");
2357 }
2358
2359 /* Re-enable using megaflows.
2360  *
2361  * This command is only needed for advanced debugging, so it's not
2362  * documented in the man page. */
2363 static void
2364 upcall_unixctl_enable_megaflows(struct unixctl_conn *conn,
2365                                 int argc OVS_UNUSED,
2366                                 const char *argv[] OVS_UNUSED,
2367                                 void *aux OVS_UNUSED)
2368 {
2369     atomic_store_relaxed(&enable_megaflows, true);
2370     udpif_flush_all_datapaths();
2371     unixctl_command_reply(conn, "megaflows enabled");
2372 }
2373
2374 /* Disable skipping flow attributes during flow dump.
2375  *
2376  * This command is only needed for advanced debugging, so it's not
2377  * documented in the man page. */
2378 static void
2379 upcall_unixctl_disable_ufid(struct unixctl_conn *conn, int argc OVS_UNUSED,
2380                            const char *argv[] OVS_UNUSED, void *aux OVS_UNUSED)
2381 {
2382     atomic_store_relaxed(&enable_ufid, false);
2383     unixctl_command_reply(conn, "Datapath dumping tersely using UFID disabled");
2384 }
2385
2386 /* Re-enable skipping flow attributes during flow dump.
2387  *
2388  * This command is only needed for advanced debugging, so it's not documented
2389  * in the man page. */
2390 static void
2391 upcall_unixctl_enable_ufid(struct unixctl_conn *conn, int argc OVS_UNUSED,
2392                           const char *argv[] OVS_UNUSED, void *aux OVS_UNUSED)
2393 {
2394     atomic_store_relaxed(&enable_ufid, true);
2395     unixctl_command_reply(conn, "Datapath dumping tersely using UFID enabled "
2396                                 "for supported datapaths");
2397 }
2398
2399 /* Set the flow limit.
2400  *
2401  * This command is only needed for advanced debugging, so it's not
2402  * documented in the man page. */
2403 static void
2404 upcall_unixctl_set_flow_limit(struct unixctl_conn *conn,
2405                               int argc OVS_UNUSED,
2406                               const char *argv[] OVS_UNUSED,
2407                               void *aux OVS_UNUSED)
2408 {
2409     struct ds ds = DS_EMPTY_INITIALIZER;
2410     struct udpif *udpif;
2411     unsigned int flow_limit = atoi(argv[1]);
2412
2413     LIST_FOR_EACH (udpif, list_node, &all_udpifs) {
2414         atomic_store_relaxed(&udpif->flow_limit, flow_limit);
2415     }
2416     ds_put_format(&ds, "set flow_limit to %u\n", flow_limit);
2417     unixctl_command_reply(conn, ds_cstr(&ds));
2418     ds_destroy(&ds);
2419 }
2420
2421 static void
2422 upcall_unixctl_dump_wait(struct unixctl_conn *conn,
2423                          int argc OVS_UNUSED,
2424                          const char *argv[] OVS_UNUSED,
2425                          void *aux OVS_UNUSED)
2426 {
2427     if (list_is_singleton(&all_udpifs)) {
2428         struct udpif *udpif = NULL;
2429         size_t len;
2430
2431         udpif = OBJECT_CONTAINING(list_front(&all_udpifs), udpif, list_node);
2432         len = (udpif->n_conns + 1) * sizeof *udpif->conns;
2433         udpif->conn_seq = seq_read(udpif->dump_seq);
2434         udpif->conns = xrealloc(udpif->conns, len);
2435         udpif->conns[udpif->n_conns++] = conn;
2436     } else {
2437         unixctl_command_reply_error(conn, "can't wait on multiple udpifs.");
2438     }
2439 }
2440
2441 static void
2442 upcall_unixctl_purge(struct unixctl_conn *conn, int argc OVS_UNUSED,
2443                      const char *argv[] OVS_UNUSED, void *aux OVS_UNUSED)
2444 {
2445     struct udpif *udpif;
2446
2447     LIST_FOR_EACH (udpif, list_node, &all_udpifs) {
2448         int n;
2449
2450         for (n = 0; n < udpif->n_revalidators; n++) {
2451             revalidator_purge(&udpif->revalidators[n]);
2452         }
2453     }
2454     unixctl_command_reply(conn, "");
2455 }