3a75690a5dfaa0729da774348e227d1511a9b823
[cascardo/ovs.git] / ofproto / ofproto-dpif-upcall.c
1 /* Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014 Nicira, Inc.
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.  */
14
15 #include <config.h>
16 #include "ofproto-dpif-upcall.h"
17
18 #include <errno.h>
19 #include <stdbool.h>
20 #include <inttypes.h>
21
22 #include "connmgr.h"
23 #include "coverage.h"
24 #include "dpif.h"
25 #include "dynamic-string.h"
26 #include "fail-open.h"
27 #include "guarded-list.h"
28 #include "latch.h"
29 #include "list.h"
30 #include "netlink.h"
31 #include "ofpbuf.h"
32 #include "ofproto-dpif-ipfix.h"
33 #include "ofproto-dpif-sflow.h"
34 #include "ofproto-dpif-xlate.h"
35 #include "ovs-rcu.h"
36 #include "packets.h"
37 #include "poll-loop.h"
38 #include "seq.h"
39 #include "unixctl.h"
40 #include "vlog.h"
41
42 #define MAX_QUEUE_LENGTH 512
43 #define UPCALL_MAX_BATCH 50
44 #define REVALIDATE_MAX_BATCH 50
45
46 VLOG_DEFINE_THIS_MODULE(ofproto_dpif_upcall);
47
48 COVERAGE_DEFINE(upcall_duplicate_flow);
49
50 /* A thread that reads upcalls from dpif, forwards each upcall's packet,
51  * and possibly sets up a kernel flow as a cache. */
52 struct handler {
53     struct udpif *udpif;               /* Parent udpif. */
54     pthread_t thread;                  /* Thread ID. */
55     uint32_t handler_id;               /* Handler id. */
56 };
57
58 /* A thread that processes datapath flows, updates OpenFlow statistics, and
59  * updates or removes them if necessary. */
60 struct revalidator {
61     struct udpif *udpif;               /* Parent udpif. */
62     pthread_t thread;                  /* Thread ID. */
63     unsigned int id;                   /* ovsthread_id_self(). */
64     struct hmap *ukeys;                /* Points into udpif->ukeys for this
65                                           revalidator. Used for GC phase. */
66 };
67
68 /* An upcall handler for ofproto_dpif.
69  *
70  * udpif keeps records of two kind of logically separate units:
71  *
72  * upcall handling
73  * ---------------
74  *
75  *    - An array of 'struct handler's for upcall handling and flow
76  *      installation.
77  *
78  * flow revalidation
79  * -----------------
80  *
81  *    - Revalidation threads which read the datapath flow table and maintains
82  *      them.
83  */
84 struct udpif {
85     struct list list_node;             /* In all_udpifs list. */
86
87     struct dpif *dpif;                 /* Datapath handle. */
88     struct dpif_backer *backer;        /* Opaque dpif_backer pointer. */
89
90     uint32_t secret;                   /* Random seed for upcall hash. */
91
92     struct handler *handlers;          /* Upcall handlers. */
93     size_t n_handlers;
94
95     struct revalidator *revalidators;  /* Flow revalidators. */
96     size_t n_revalidators;
97
98     struct latch exit_latch;           /* Tells child threads to exit. */
99
100     /* Revalidation. */
101     struct seq *reval_seq;             /* Incremented to force revalidation. */
102     bool need_revalidate;              /* As indicated by 'reval_seq'. */
103     bool reval_exit;                   /* Set by leader on 'exit_latch. */
104     pthread_barrier_t reval_barrier;   /* Barrier used by revalidators. */
105     struct dpif_flow_dump *dump;       /* DPIF flow dump state. */
106     long long int dump_duration;       /* Duration of the last flow dump. */
107     struct seq *dump_seq;              /* Increments each dump iteration. */
108
109     /* There are 'n_revalidators' ukey hmaps. Each revalidator retains a
110      * reference to one of these for garbage collection.
111      *
112      * During the flow dump phase, revalidators insert into these with a random
113      * distribution. During the garbage collection phase, each revalidator
114      * takes care of garbage collecting one of these hmaps. */
115     struct {
116         struct ovs_mutex mutex;        /* Guards the following. */
117         struct hmap hmap OVS_GUARDED;  /* Datapath flow keys. */
118     } *ukeys;
119
120     /* Datapath flow statistics. */
121     unsigned int max_n_flows;
122     unsigned int avg_n_flows;
123
124     /* Following fields are accessed and modified by different threads. */
125     atomic_uint flow_limit;            /* Datapath flow hard limit. */
126
127     /* n_flows_mutex prevents multiple threads updating these concurrently. */
128     atomic_ulong n_flows;           /* Number of flows in the datapath. */
129     atomic_llong n_flows_timestamp;    /* Last time n_flows was updated. */
130     struct ovs_mutex n_flows_mutex;
131 };
132
133 enum upcall_type {
134     BAD_UPCALL,                 /* Some kind of bug somewhere. */
135     MISS_UPCALL,                /* A flow miss.  */
136     SFLOW_UPCALL,               /* sFlow sample. */
137     FLOW_SAMPLE_UPCALL,         /* Per-flow sampling. */
138     IPFIX_UPCALL                /* Per-bridge sampling. */
139 };
140
141 struct upcall {
142     struct ofproto_dpif *ofproto;
143
144     struct flow flow;
145     const struct nlattr *key;
146     size_t key_len;
147     enum dpif_upcall_type upcall_type;
148     struct dpif_flow_stats stats;
149     odp_port_t odp_in_port;
150
151     uint64_t slow_path_buf[128 / 8];
152     struct odputil_keybuf mask_buf;
153
154     struct xlate_out xout;
155
156     /* Raw upcall plus data for keeping track of the memory backing it. */
157     struct dpif_upcall dpif_upcall; /* As returned by dpif_recv() */
158     struct ofpbuf upcall_buf;       /* Owns some data in 'dpif_upcall'. */
159     uint64_t upcall_stub[512 / 8];  /* Buffer to reduce need for malloc(). */
160 };
161
162 /* 'udpif_key's are responsible for tracking the little bit of state udpif
163  * needs to do flow expiration which can't be pulled directly from the
164  * datapath.  They may be created or maintained by any revalidator during
165  * the dump phase, but are owned by a single revalidator, and are destroyed
166  * by that revalidator during the garbage-collection phase.
167  *
168  * While some elements of a udpif_key are protected by a mutex, the ukey itself
169  * is not.  Therefore it is not safe to destroy a udpif_key except when all
170  * revalidators are in garbage collection phase, or they aren't running. */
171 struct udpif_key {
172     struct hmap_node hmap_node;     /* In parent revalidator 'ukeys' map. */
173
174     /* These elements are read only once created, and therefore aren't
175      * protected by a mutex. */
176     const struct nlattr *key;      /* Datapath flow key. */
177     size_t key_len;                /* Length of 'key'. */
178
179     struct ovs_mutex mutex;                   /* Guards the following. */
180     struct dpif_flow_stats stats OVS_GUARDED; /* Last known stats.*/
181     long long int created OVS_GUARDED;        /* Estimate of creation time. */
182     bool mark OVS_GUARDED;                    /* For mark and sweep garbage
183                                                  collection. */
184     bool flow_exists OVS_GUARDED;             /* Ensures flows are only deleted
185                                                  once. */
186
187     struct xlate_cache *xcache OVS_GUARDED;   /* Cache for xlate entries that
188                                                * are affected by this ukey.
189                                                * Used for stats and learning.*/
190     struct odputil_keybuf key_buf;            /* Memory for 'key'. */
191 };
192
193 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
194 static struct list all_udpifs = LIST_INITIALIZER(&all_udpifs);
195
196 static size_t read_upcalls(struct handler *,
197                            struct upcall upcalls[UPCALL_MAX_BATCH]);
198 static void handle_upcalls(struct handler *, struct upcall *, size_t n_upcalls);
199 static void udpif_stop_threads(struct udpif *);
200 static void udpif_start_threads(struct udpif *, size_t n_handlers,
201                                 size_t n_revalidators);
202 static void *udpif_upcall_handler(void *);
203 static void *udpif_revalidator(void *);
204 static unsigned long udpif_get_n_flows(struct udpif *);
205 static void revalidate(struct revalidator *);
206 static void revalidator_sweep(struct revalidator *);
207 static void revalidator_purge(struct revalidator *);
208 static void upcall_unixctl_show(struct unixctl_conn *conn, int argc,
209                                 const char *argv[], void *aux);
210 static void upcall_unixctl_disable_megaflows(struct unixctl_conn *, int argc,
211                                              const char *argv[], void *aux);
212 static void upcall_unixctl_enable_megaflows(struct unixctl_conn *, int argc,
213                                             const char *argv[], void *aux);
214 static void upcall_unixctl_set_flow_limit(struct unixctl_conn *conn, int argc,
215                                             const char *argv[], void *aux);
216
217 static struct udpif_key *ukey_create(const struct nlattr *key, size_t key_len,
218                                      long long int used);
219 static struct udpif_key *ukey_lookup(struct udpif *udpif,
220                                      const struct nlattr *key, size_t key_len,
221                                      uint32_t hash);
222 static bool ukey_acquire(struct udpif *udpif, const struct nlattr *key,
223                          size_t key_len, long long int used,
224                          struct udpif_key **result);
225 static void ukey_delete(struct revalidator *, struct udpif_key *);
226
227 static atomic_bool enable_megaflows = ATOMIC_VAR_INIT(true);
228
229 struct udpif *
230 udpif_create(struct dpif_backer *backer, struct dpif *dpif)
231 {
232     static struct ovsthread_once once = OVSTHREAD_ONCE_INITIALIZER;
233     struct udpif *udpif = xzalloc(sizeof *udpif);
234
235     if (ovsthread_once_start(&once)) {
236         unixctl_command_register("upcall/show", "", 0, 0, upcall_unixctl_show,
237                                  NULL);
238         unixctl_command_register("upcall/disable-megaflows", "", 0, 0,
239                                  upcall_unixctl_disable_megaflows, NULL);
240         unixctl_command_register("upcall/enable-megaflows", "", 0, 0,
241                                  upcall_unixctl_enable_megaflows, NULL);
242         unixctl_command_register("upcall/set-flow-limit", "", 1, 1,
243                                  upcall_unixctl_set_flow_limit, NULL);
244         ovsthread_once_done(&once);
245     }
246
247     udpif->dpif = dpif;
248     udpif->backer = backer;
249     atomic_init(&udpif->flow_limit, MIN(ofproto_flow_limit, 10000));
250     udpif->secret = random_uint32();
251     udpif->reval_seq = seq_create();
252     udpif->dump_seq = seq_create();
253     latch_init(&udpif->exit_latch);
254     list_push_back(&all_udpifs, &udpif->list_node);
255     atomic_init(&udpif->n_flows, 0);
256     atomic_init(&udpif->n_flows_timestamp, LLONG_MIN);
257     ovs_mutex_init(&udpif->n_flows_mutex);
258
259     return udpif;
260 }
261
262 void
263 udpif_destroy(struct udpif *udpif)
264 {
265     udpif_stop_threads(udpif);
266
267     list_remove(&udpif->list_node);
268     latch_destroy(&udpif->exit_latch);
269     seq_destroy(udpif->reval_seq);
270     seq_destroy(udpif->dump_seq);
271     ovs_mutex_destroy(&udpif->n_flows_mutex);
272     free(udpif);
273 }
274
275 /* Stops the handler and revalidator threads, must be enclosed in
276  * ovsrcu quiescent state unless when destroying udpif. */
277 static void
278 udpif_stop_threads(struct udpif *udpif)
279 {
280     if (udpif && (udpif->n_handlers != 0 || udpif->n_revalidators != 0)) {
281         size_t i;
282
283         latch_set(&udpif->exit_latch);
284
285         for (i = 0; i < udpif->n_handlers; i++) {
286             struct handler *handler = &udpif->handlers[i];
287
288             xpthread_join(handler->thread, NULL);
289         }
290
291         for (i = 0; i < udpif->n_revalidators; i++) {
292             xpthread_join(udpif->revalidators[i].thread, NULL);
293         }
294
295         for (i = 0; i < udpif->n_revalidators; i++) {
296             struct revalidator *revalidator = &udpif->revalidators[i];
297
298             /* Delete ukeys, and delete all flows from the datapath to prevent
299              * double-counting stats. */
300             revalidator_purge(revalidator);
301
302             hmap_destroy(&udpif->ukeys[i].hmap);
303             ovs_mutex_destroy(&udpif->ukeys[i].mutex);
304         }
305
306         latch_poll(&udpif->exit_latch);
307
308         xpthread_barrier_destroy(&udpif->reval_barrier);
309
310         free(udpif->revalidators);
311         udpif->revalidators = NULL;
312         udpif->n_revalidators = 0;
313
314         free(udpif->handlers);
315         udpif->handlers = NULL;
316         udpif->n_handlers = 0;
317
318         free(udpif->ukeys);
319         udpif->ukeys = NULL;
320     }
321 }
322
323 /* Starts the handler and revalidator threads, must be enclosed in
324  * ovsrcu quiescent state. */
325 static void
326 udpif_start_threads(struct udpif *udpif, size_t n_handlers,
327                     size_t n_revalidators)
328 {
329     if (udpif && n_handlers && n_revalidators) {
330         size_t i;
331
332         udpif->n_handlers = n_handlers;
333         udpif->n_revalidators = n_revalidators;
334
335         udpif->handlers = xzalloc(udpif->n_handlers * sizeof *udpif->handlers);
336         for (i = 0; i < udpif->n_handlers; i++) {
337             struct handler *handler = &udpif->handlers[i];
338
339             handler->udpif = udpif;
340             handler->handler_id = i;
341             handler->thread = ovs_thread_create(
342                 "handler", udpif_upcall_handler, handler);
343         }
344
345         xpthread_barrier_init(&udpif->reval_barrier, NULL,
346                               udpif->n_revalidators);
347         udpif->reval_exit = false;
348         udpif->revalidators = xzalloc(udpif->n_revalidators
349                                       * sizeof *udpif->revalidators);
350         udpif->ukeys = xmalloc(sizeof *udpif->ukeys * n_revalidators);
351         for (i = 0; i < udpif->n_revalidators; i++) {
352             struct revalidator *revalidator = &udpif->revalidators[i];
353
354             revalidator->udpif = udpif;
355             hmap_init(&udpif->ukeys[i].hmap);
356             ovs_mutex_init(&udpif->ukeys[i].mutex);
357             revalidator->ukeys = &udpif->ukeys[i].hmap;
358             revalidator->thread = ovs_thread_create(
359                 "revalidator", udpif_revalidator, revalidator);
360         }
361     }
362 }
363
364 /* Tells 'udpif' how many threads it should use to handle upcalls.
365  * 'n_handlers' and 'n_revalidators' can never be zero.  'udpif''s
366  * datapath handle must have packet reception enabled before starting
367  * threads. */
368 void
369 udpif_set_threads(struct udpif *udpif, size_t n_handlers,
370                   size_t n_revalidators)
371 {
372     ovs_assert(udpif);
373     ovs_assert(n_handlers && n_revalidators);
374
375     ovsrcu_quiesce_start();
376     if (udpif->n_handlers != n_handlers
377         || udpif->n_revalidators != n_revalidators) {
378         udpif_stop_threads(udpif);
379     }
380
381     if (!udpif->handlers && !udpif->revalidators) {
382         int error;
383
384         error = dpif_handlers_set(udpif->dpif, n_handlers);
385         if (error) {
386             VLOG_ERR("failed to configure handlers in dpif %s: %s",
387                      dpif_name(udpif->dpif), ovs_strerror(error));
388             return;
389         }
390
391         udpif_start_threads(udpif, n_handlers, n_revalidators);
392     }
393     ovsrcu_quiesce_end();
394 }
395
396 /* Waits for all ongoing upcall translations to complete.  This ensures that
397  * there are no transient references to any removed ofprotos (or other
398  * objects).  In particular, this should be called after an ofproto is removed
399  * (e.g. via xlate_remove_ofproto()) but before it is destroyed. */
400 void
401 udpif_synchronize(struct udpif *udpif)
402 {
403     /* This is stronger than necessary.  It would be sufficient to ensure
404      * (somehow) that each handler and revalidator thread had passed through
405      * its main loop once. */
406     size_t n_handlers = udpif->n_handlers;
407     size_t n_revalidators = udpif->n_revalidators;
408
409     ovsrcu_quiesce_start();
410     udpif_stop_threads(udpif);
411     udpif_start_threads(udpif, n_handlers, n_revalidators);
412     ovsrcu_quiesce_end();
413 }
414
415 /* Notifies 'udpif' that something changed which may render previous
416  * xlate_actions() results invalid. */
417 void
418 udpif_revalidate(struct udpif *udpif)
419 {
420     seq_change(udpif->reval_seq);
421 }
422
423 /* Returns a seq which increments every time 'udpif' pulls stats from the
424  * datapath.  Callers can use this to get a sense of when might be a good time
425  * to do periodic work which relies on relatively up to date statistics. */
426 struct seq *
427 udpif_dump_seq(struct udpif *udpif)
428 {
429     return udpif->dump_seq;
430 }
431
432 void
433 udpif_get_memory_usage(struct udpif *udpif, struct simap *usage)
434 {
435     size_t i;
436
437     simap_increase(usage, "handlers", udpif->n_handlers);
438
439     simap_increase(usage, "revalidators", udpif->n_revalidators);
440     for (i = 0; i < udpif->n_revalidators; i++) {
441         ovs_mutex_lock(&udpif->ukeys[i].mutex);
442         simap_increase(usage, "udpif keys", hmap_count(&udpif->ukeys[i].hmap));
443         ovs_mutex_unlock(&udpif->ukeys[i].mutex);
444     }
445 }
446
447 /* Remove flows from a single datapath. */
448 void
449 udpif_flush(struct udpif *udpif)
450 {
451     size_t n_handlers, n_revalidators;
452
453     n_handlers = udpif->n_handlers;
454     n_revalidators = udpif->n_revalidators;
455
456     ovsrcu_quiesce_start();
457
458     udpif_stop_threads(udpif);
459     dpif_flow_flush(udpif->dpif);
460     udpif_start_threads(udpif, n_handlers, n_revalidators);
461
462     ovsrcu_quiesce_end();
463 }
464
465 /* Removes all flows from all datapaths. */
466 static void
467 udpif_flush_all_datapaths(void)
468 {
469     struct udpif *udpif;
470
471     LIST_FOR_EACH (udpif, list_node, &all_udpifs) {
472         udpif_flush(udpif);
473     }
474 }
475
476 \f
477 static unsigned long
478 udpif_get_n_flows(struct udpif *udpif)
479 {
480     long long int time, now;
481     unsigned long flow_count;
482
483     now = time_msec();
484     atomic_read(&udpif->n_flows_timestamp, &time);
485     if (time < now - 100 && !ovs_mutex_trylock(&udpif->n_flows_mutex)) {
486         struct dpif_dp_stats stats;
487
488         atomic_store(&udpif->n_flows_timestamp, now);
489         dpif_get_dp_stats(udpif->dpif, &stats);
490         flow_count = stats.n_flows;
491         atomic_store(&udpif->n_flows, flow_count);
492         ovs_mutex_unlock(&udpif->n_flows_mutex);
493     } else {
494         atomic_read(&udpif->n_flows, &flow_count);
495     }
496     return flow_count;
497 }
498
499 /* The upcall handler thread tries to read a batch of UPCALL_MAX_BATCH
500  * upcalls from dpif, processes the batch and installs corresponding flows
501  * in dpif. */
502 static void *
503 udpif_upcall_handler(void *arg)
504 {
505     struct handler *handler = arg;
506     struct udpif *udpif = handler->udpif;
507
508     while (!latch_is_set(&handler->udpif->exit_latch)) {
509         struct upcall upcalls[UPCALL_MAX_BATCH];
510         size_t n_upcalls, i;
511
512         n_upcalls = read_upcalls(handler, upcalls);
513         if (!n_upcalls) {
514             dpif_recv_wait(udpif->dpif, handler->handler_id);
515             latch_wait(&udpif->exit_latch);
516             poll_block();
517         } else {
518             handle_upcalls(handler, upcalls, n_upcalls);
519
520             for (i = 0; i < n_upcalls; i++) {
521                 xlate_out_uninit(&upcalls[i].xout);
522                 ofpbuf_uninit(&upcalls[i].dpif_upcall.packet);
523                 ofpbuf_uninit(&upcalls[i].upcall_buf);
524             }
525         }
526         coverage_clear();
527     }
528
529     return NULL;
530 }
531
532 static void *
533 udpif_revalidator(void *arg)
534 {
535     /* Used by all revalidators. */
536     struct revalidator *revalidator = arg;
537     struct udpif *udpif = revalidator->udpif;
538     bool leader = revalidator == &udpif->revalidators[0];
539
540     /* Used only by the leader. */
541     long long int start_time = 0;
542     uint64_t last_reval_seq = 0;
543     unsigned int flow_limit = 0;
544     size_t n_flows = 0;
545
546     revalidator->id = ovsthread_id_self();
547     for (;;) {
548         if (leader) {
549             uint64_t reval_seq;
550
551             reval_seq = seq_read(udpif->reval_seq);
552             udpif->need_revalidate = last_reval_seq != reval_seq;
553             last_reval_seq = reval_seq;
554
555             n_flows = udpif_get_n_flows(udpif);
556             udpif->max_n_flows = MAX(n_flows, udpif->max_n_flows);
557             udpif->avg_n_flows = (udpif->avg_n_flows + n_flows) / 2;
558
559             /* Only the leader checks the exit latch to prevent a race where
560              * some threads think it's true and exit and others think it's
561              * false and block indefinitely on the reval_barrier */
562             udpif->reval_exit = latch_is_set(&udpif->exit_latch);
563
564             start_time = time_msec();
565             if (!udpif->reval_exit) {
566                 udpif->dump = dpif_flow_dump_create(udpif->dpif);
567             }
568         }
569
570         /* Wait for the leader to start the flow dump. */
571         xpthread_barrier_wait(&udpif->reval_barrier);
572         if (udpif->reval_exit) {
573             break;
574         }
575         revalidate(revalidator);
576
577         /* Wait for all flows to have been dumped before we garbage collect. */
578         xpthread_barrier_wait(&udpif->reval_barrier);
579         revalidator_sweep(revalidator);
580
581         /* Wait for all revalidators to finish garbage collection. */
582         xpthread_barrier_wait(&udpif->reval_barrier);
583
584         if (leader) {
585             long long int duration;
586
587             dpif_flow_dump_destroy(udpif->dump);
588             seq_change(udpif->dump_seq);
589
590             duration = MAX(time_msec() - start_time, 1);
591             atomic_read(&udpif->flow_limit, &flow_limit);
592             udpif->dump_duration = duration;
593             if (duration > 2000) {
594                 flow_limit /= duration / 1000;
595             } else if (duration > 1300) {
596                 flow_limit = flow_limit * 3 / 4;
597             } else if (duration < 1000 && n_flows > 2000
598                        && flow_limit < n_flows * 1000 / duration) {
599                 flow_limit += 1000;
600             }
601             flow_limit = MIN(ofproto_flow_limit, MAX(flow_limit, 1000));
602             atomic_store(&udpif->flow_limit, flow_limit);
603
604             if (duration > 2000) {
605                 VLOG_INFO("Spent an unreasonably long %lldms dumping flows",
606                           duration);
607             }
608
609             poll_timer_wait_until(start_time + MIN(ofproto_max_idle, 500));
610             seq_wait(udpif->reval_seq, last_reval_seq);
611             latch_wait(&udpif->exit_latch);
612             poll_block();
613         }
614     }
615
616     return NULL;
617 }
618 \f
619 static enum upcall_type
620 classify_upcall(const struct upcall *upcall)
621 {
622     const struct dpif_upcall *dpif_upcall = &upcall->dpif_upcall;
623     union user_action_cookie cookie;
624     size_t userdata_len;
625
626     /* First look at the upcall type. */
627     switch (dpif_upcall->type) {
628     case DPIF_UC_ACTION:
629         break;
630
631     case DPIF_UC_MISS:
632         return MISS_UPCALL;
633
634     case DPIF_N_UC_TYPES:
635     default:
636         VLOG_WARN_RL(&rl, "upcall has unexpected type %"PRIu32,
637                      dpif_upcall->type);
638         return BAD_UPCALL;
639     }
640
641     /* "action" upcalls need a closer look. */
642     if (!dpif_upcall->userdata) {
643         VLOG_WARN_RL(&rl, "action upcall missing cookie");
644         return BAD_UPCALL;
645     }
646     userdata_len = nl_attr_get_size(dpif_upcall->userdata);
647     if (userdata_len < sizeof cookie.type
648         || userdata_len > sizeof cookie) {
649         VLOG_WARN_RL(&rl, "action upcall cookie has unexpected size %"PRIuSIZE,
650                      userdata_len);
651         return BAD_UPCALL;
652     }
653     memset(&cookie, 0, sizeof cookie);
654     memcpy(&cookie, nl_attr_get(dpif_upcall->userdata), userdata_len);
655     if (userdata_len == MAX(8, sizeof cookie.sflow)
656         && cookie.type == USER_ACTION_COOKIE_SFLOW) {
657         return SFLOW_UPCALL;
658     } else if (userdata_len == MAX(8, sizeof cookie.slow_path)
659                && cookie.type == USER_ACTION_COOKIE_SLOW_PATH) {
660         return MISS_UPCALL;
661     } else if (userdata_len == MAX(8, sizeof cookie.flow_sample)
662                && cookie.type == USER_ACTION_COOKIE_FLOW_SAMPLE) {
663         return FLOW_SAMPLE_UPCALL;
664     } else if (userdata_len == MAX(8, sizeof cookie.ipfix)
665                && cookie.type == USER_ACTION_COOKIE_IPFIX) {
666         return IPFIX_UPCALL;
667     } else {
668         VLOG_WARN_RL(&rl, "invalid user cookie of type %"PRIu16
669                      " and size %"PRIuSIZE, cookie.type, userdata_len);
670         return BAD_UPCALL;
671     }
672 }
673
674 /* Calculates slow path actions for 'xout'.  'buf' must statically be
675  * initialized with at least 128 bytes of space. */
676 static void
677 compose_slow_path(struct udpif *udpif, struct xlate_out *xout,
678                   struct flow *flow, odp_port_t odp_in_port,
679                   struct ofpbuf *buf)
680 {
681     union user_action_cookie cookie;
682     odp_port_t port;
683     uint32_t pid;
684
685     cookie.type = USER_ACTION_COOKIE_SLOW_PATH;
686     cookie.slow_path.unused = 0;
687     cookie.slow_path.reason = xout->slow;
688
689     port = xout->slow & (SLOW_CFM | SLOW_BFD | SLOW_LACP | SLOW_STP)
690         ? ODPP_NONE
691         : odp_in_port;
692     pid = dpif_port_get_pid(udpif->dpif, port, flow_hash_5tuple(flow, 0));
693     odp_put_userspace_action(pid, &cookie, sizeof cookie.slow_path, buf);
694 }
695
696 static void
697 upcall_init(struct upcall *upcall, struct flow *flow, struct ofpbuf *packet,
698             struct ofproto_dpif *ofproto, struct dpif_upcall *dupcall,
699             odp_port_t odp_in_port)
700 {
701     struct pkt_metadata md = pkt_metadata_from_flow(flow);
702     struct xlate_in xin;
703
704     flow_extract(packet, &md, &upcall->flow);
705
706     upcall->ofproto = ofproto;
707     upcall->key = dupcall->key;
708     upcall->key_len = dupcall->key_len;
709     upcall->upcall_type = dupcall->type;
710     upcall->stats.n_packets = 1;
711     upcall->stats.n_bytes = ofpbuf_size(packet);
712     upcall->stats.used = time_msec();
713     upcall->stats.tcp_flags = ntohs(upcall->flow.tcp_flags);
714     upcall->odp_in_port = odp_in_port;
715
716     xlate_in_init(&xin, upcall->ofproto, &upcall->flow, NULL,
717                   upcall->stats.tcp_flags, packet);
718
719     if (upcall->upcall_type == DPIF_UC_MISS) {
720         xin.resubmit_stats = &upcall->stats;
721     } else {
722         /* For non-miss upcalls, there's a flow in the datapath which this
723          * packet was accounted to.  Presumably the revalidators will deal
724          * with pushing its stats eventually. */
725     }
726
727     xlate_actions(&xin, &upcall->xout);
728 }
729
730 /* Reads and classifies upcalls.  Returns the number of upcalls successfully
731  * read. */
732 static size_t
733 read_upcalls(struct handler *handler,
734              struct upcall upcalls[UPCALL_MAX_BATCH])
735 {
736     struct udpif *udpif = handler->udpif;
737     size_t i;
738     size_t n_upcalls = 0;
739
740     /* Try reading UPCALL_MAX_BATCH upcalls from dpif. */
741     for (i = 0; i < UPCALL_MAX_BATCH; i++) {
742         struct upcall *upcall = &upcalls[n_upcalls];
743         struct dpif_upcall *dupcall;
744         struct ofpbuf *packet;
745         struct ofproto_dpif *ofproto;
746         struct dpif_sflow *sflow;
747         struct dpif_ipfix *ipfix;
748         struct flow flow;
749         enum upcall_type type;
750         odp_port_t odp_in_port;
751         int error;
752
753         ofpbuf_use_stub(&upcall->upcall_buf, upcall->upcall_stub,
754                         sizeof upcall->upcall_stub);
755         error = dpif_recv(udpif->dpif, handler->handler_id,
756                           &upcall->dpif_upcall, &upcall->upcall_buf);
757         if (error) {
758             ofpbuf_uninit(&upcall->upcall_buf);
759             break;
760         }
761
762         dupcall = &upcall->dpif_upcall;
763         packet = &dupcall->packet;
764         error = xlate_receive(udpif->backer, packet, dupcall->key,
765                               dupcall->key_len, &flow,
766                               &ofproto, &ipfix, &sflow, NULL, &odp_in_port);
767         if (error) {
768             if (error == ENODEV) {
769                 /* Received packet on datapath port for which we couldn't
770                  * associate an ofproto.  This can happen if a port is removed
771                  * while traffic is being received.  Print a rate-limited
772                  * message in case it happens frequently.  Install a drop flow
773                  * so that future packets of the flow are inexpensively dropped
774                  * in the kernel. */
775                 VLOG_INFO_RL(&rl, "received packet on unassociated datapath "
776                              "port %"PRIu32, odp_in_port);
777                 dpif_flow_put(udpif->dpif, DPIF_FP_CREATE | DPIF_FP_MODIFY,
778                               dupcall->key, dupcall->key_len, NULL, 0, NULL, 0,
779                               NULL);
780             }
781             goto destroy_upcall;
782         }
783
784         type = classify_upcall(upcall);
785         if (type == MISS_UPCALL) {
786             upcall_init(upcall, &flow, packet, ofproto, dupcall, odp_in_port);
787             n_upcalls++;
788             continue;
789         }
790
791         switch (type) {
792         case SFLOW_UPCALL:
793             if (sflow) {
794                 union user_action_cookie cookie;
795
796                 memset(&cookie, 0, sizeof cookie);
797                 memcpy(&cookie, nl_attr_get(dupcall->userdata),
798                        sizeof cookie.sflow);
799                 dpif_sflow_received(sflow, packet, &flow, odp_in_port,
800                                     &cookie);
801             }
802             break;
803         case IPFIX_UPCALL:
804             if (ipfix) {
805                 dpif_ipfix_bridge_sample(ipfix, packet, &flow);
806             }
807             break;
808         case FLOW_SAMPLE_UPCALL:
809             if (ipfix) {
810                 union user_action_cookie cookie;
811
812                 memset(&cookie, 0, sizeof cookie);
813                 memcpy(&cookie, nl_attr_get(dupcall->userdata),
814                        sizeof cookie.flow_sample);
815
816                 /* The flow reflects exactly the contents of the packet.
817                  * Sample the packet using it. */
818                 dpif_ipfix_flow_sample(ipfix, packet, &flow,
819                                        cookie.flow_sample.collector_set_id,
820                                        cookie.flow_sample.probability,
821                                        cookie.flow_sample.obs_domain_id,
822                                        cookie.flow_sample.obs_point_id);
823             }
824             break;
825         case BAD_UPCALL:
826             break;
827         case MISS_UPCALL:
828             OVS_NOT_REACHED();
829         }
830
831         dpif_ipfix_unref(ipfix);
832         dpif_sflow_unref(sflow);
833
834 destroy_upcall:
835         ofpbuf_uninit(&upcall->dpif_upcall.packet);
836         ofpbuf_uninit(&upcall->upcall_buf);
837     }
838
839     return n_upcalls;
840 }
841
842 static void
843 handle_upcalls(struct handler *handler, struct upcall *upcalls,
844                size_t n_upcalls)
845 {
846     struct udpif *udpif = handler->udpif;
847     struct dpif_op *opsp[UPCALL_MAX_BATCH * 2];
848     struct dpif_op ops[UPCALL_MAX_BATCH * 2];
849     size_t n_ops, i;
850     unsigned int flow_limit;
851     bool fail_open, may_put;
852
853     atomic_read(&udpif->flow_limit, &flow_limit);
854     may_put = udpif_get_n_flows(udpif) < flow_limit;
855
856     /* Handle the packets individually in order of arrival.
857      *
858      *   - For SLOW_CFM, SLOW_LACP, SLOW_STP, and SLOW_BFD, translation is what
859      *     processes received packets for these protocols.
860      *
861      *   - For SLOW_CONTROLLER, translation sends the packet to the OpenFlow
862      *     controller.
863      *
864      * The loop fills 'ops' with an array of operations to execute in the
865      * datapath. */
866     fail_open = false;
867     n_ops = 0;
868     for (i = 0; i < n_upcalls; i++) {
869         struct upcall *upcall = &upcalls[i];
870         struct ofpbuf *packet = &upcall->dpif_upcall.packet;
871         struct dpif_op *op;
872
873         fail_open = fail_open || upcall->xout.fail_open;
874
875         if (upcall->flow.in_port.ofp_port
876             != vsp_realdev_to_vlandev(upcall->ofproto,
877                                       upcall->flow.in_port.ofp_port,
878                                       upcall->flow.vlan_tci)) {
879             /* This packet was received on a VLAN splinter port.  We
880              * added a VLAN to the packet to make the packet resemble
881              * the flow, but the actions were composed assuming that
882              * the packet contained no VLAN.  So, we must remove the
883              * VLAN header from the packet before trying to execute the
884              * actions. */
885             if (ofpbuf_size(&upcall->xout.odp_actions)) {
886                 eth_pop_vlan(packet);
887             }
888
889             /* Remove the flow vlan tags inserted by vlan splinter logic
890              * to ensure megaflow masks generated match the data path flow. */
891             upcall->flow.vlan_tci = 0;
892         }
893
894         /* Do not install a flow into the datapath if:
895          *
896          *    - The datapath already has too many flows.
897          *
898          *    - We received this packet via some flow installed in the kernel
899          *      already. */
900         if (may_put
901             && upcall->dpif_upcall.type == DPIF_UC_MISS) {
902             struct ofpbuf mask;
903             bool megaflow;
904
905             atomic_read(&enable_megaflows, &megaflow);
906             ofpbuf_use_stack(&mask, &upcall->mask_buf, sizeof upcall->mask_buf);
907             if (megaflow) {
908                 size_t max_mpls;
909                 bool recirc;
910
911                 recirc = ofproto_dpif_get_enable_recirc(upcall->ofproto);
912                 max_mpls = ofproto_dpif_get_max_mpls_depth(upcall->ofproto);
913                 odp_flow_key_from_mask(&mask, &upcall->xout.wc.masks,
914                                        &upcall->flow, UINT32_MAX, max_mpls,
915                                        recirc);
916             }
917
918             op = &ops[n_ops++];
919             op->type = DPIF_OP_FLOW_PUT;
920             op->u.flow_put.flags = DPIF_FP_CREATE | DPIF_FP_MODIFY;
921             op->u.flow_put.key = upcall->key;
922             op->u.flow_put.key_len = upcall->key_len;
923             op->u.flow_put.mask = ofpbuf_data(&mask);
924             op->u.flow_put.mask_len = ofpbuf_size(&mask);
925             op->u.flow_put.stats = NULL;
926
927             if (!upcall->xout.slow) {
928                 op->u.flow_put.actions = ofpbuf_data(&upcall->xout.odp_actions);
929                 op->u.flow_put.actions_len = ofpbuf_size(&upcall->xout.odp_actions);
930             } else {
931                 struct ofpbuf buf;
932
933                 ofpbuf_use_stack(&buf, upcall->slow_path_buf,
934                                  sizeof upcall->slow_path_buf);
935                 compose_slow_path(udpif, &upcall->xout, &upcall->flow,
936                                   upcall->odp_in_port, &buf);
937                 op->u.flow_put.actions = ofpbuf_data(&buf);
938                 op->u.flow_put.actions_len = ofpbuf_size(&buf);
939             }
940         }
941
942         if (ofpbuf_size(&upcall->xout.odp_actions)) {
943
944             op = &ops[n_ops++];
945             op->type = DPIF_OP_EXECUTE;
946             op->u.execute.packet = packet;
947             odp_key_to_pkt_metadata(upcall->key, upcall->key_len,
948                                     &op->u.execute.md);
949             op->u.execute.actions = ofpbuf_data(&upcall->xout.odp_actions);
950             op->u.execute.actions_len = ofpbuf_size(&upcall->xout.odp_actions);
951             op->u.execute.needs_help = (upcall->xout.slow & SLOW_ACTION) != 0;
952         }
953     }
954
955     /* Special case for fail-open mode.
956      *
957      * If we are in fail-open mode, but we are connected to a controller too,
958      * then we should send the packet up to the controller in the hope that it
959      * will try to set up a flow and thereby allow us to exit fail-open.
960      *
961      * See the top-level comment in fail-open.c for more information.
962      *
963      * Copy packets before they are modified by execution. */
964     if (fail_open) {
965         for (i = 0; i < n_upcalls; i++) {
966             struct upcall *upcall = &upcalls[i];
967             struct ofpbuf *packet = &upcall->dpif_upcall.packet;
968             struct ofproto_packet_in *pin;
969
970             pin = xmalloc(sizeof *pin);
971             pin->up.packet = xmemdup(ofpbuf_data(packet), ofpbuf_size(packet));
972             pin->up.packet_len = ofpbuf_size(packet);
973             pin->up.reason = OFPR_NO_MATCH;
974             pin->up.table_id = 0;
975             pin->up.cookie = OVS_BE64_MAX;
976             flow_get_metadata(&upcall->flow, &pin->up.fmd);
977             pin->send_len = 0; /* Not used for flow table misses. */
978             pin->miss_type = OFPROTO_PACKET_IN_NO_MISS;
979             ofproto_dpif_send_packet_in(upcall->ofproto, pin);
980         }
981     }
982
983     /* Execute batch. */
984     for (i = 0; i < n_ops; i++) {
985         opsp[i] = &ops[i];
986     }
987     dpif_operate(udpif->dpif, opsp, n_ops);
988 }
989
990 /* Must be called with udpif->ukeys[hash % udpif->n_revalidators].mutex. */
991 static struct udpif_key *
992 ukey_lookup(struct udpif *udpif, const struct nlattr *key, size_t key_len,
993             uint32_t hash)
994     OVS_REQUIRES(udpif->ukeys->mutex)
995 {
996     struct udpif_key *ukey;
997     struct hmap *hmap = &udpif->ukeys[hash % udpif->n_revalidators].hmap;
998
999     HMAP_FOR_EACH_WITH_HASH (ukey, hmap_node, hash, hmap) {
1000         if (ukey->key_len == key_len && !memcmp(ukey->key, key, key_len)) {
1001             return ukey;
1002         }
1003     }
1004     return NULL;
1005 }
1006
1007 /* Creates a ukey for 'key' and 'key_len', returning it with ukey->mutex in
1008  * a locked state. */
1009 static struct udpif_key *
1010 ukey_create(const struct nlattr *key, size_t key_len, long long int used)
1011     OVS_NO_THREAD_SAFETY_ANALYSIS
1012 {
1013     struct udpif_key *ukey = xmalloc(sizeof *ukey);
1014
1015     ovs_mutex_init(&ukey->mutex);
1016     ukey->key = (struct nlattr *) &ukey->key_buf;
1017     memcpy(&ukey->key_buf, key, key_len);
1018     ukey->key_len = key_len;
1019
1020     ovs_mutex_lock(&ukey->mutex);
1021     ukey->mark = false;
1022     ukey->flow_exists = true;
1023     ukey->created = used ? used : time_msec();
1024     memset(&ukey->stats, 0, sizeof ukey->stats);
1025     ukey->xcache = NULL;
1026
1027     return ukey;
1028 }
1029
1030 /* Searches for a ukey in 'udpif->ukeys' that matches 'key' and 'key_len' and
1031  * attempts to lock the ukey. If the ukey does not exist, create it.
1032  *
1033  * Returns true on success, setting *result to the matching ukey and returning
1034  * it in a locked state. Otherwise, returns false and clears *result. */
1035 static bool
1036 ukey_acquire(struct udpif *udpif, const struct nlattr *key, size_t key_len,
1037              long long int used, struct udpif_key **result)
1038     OVS_TRY_LOCK(true, (*result)->mutex)
1039 {
1040     struct udpif_key *ukey;
1041     uint32_t hash, idx;
1042     bool locked = false;
1043
1044     hash = hash_bytes(key, key_len, udpif->secret);
1045     idx = hash % udpif->n_revalidators;
1046
1047     ovs_mutex_lock(&udpif->ukeys[idx].mutex);
1048     ukey = ukey_lookup(udpif, key, key_len, hash);
1049     if (!ukey) {
1050         ukey = ukey_create(key, key_len, used);
1051         hmap_insert(&udpif->ukeys[idx].hmap, &ukey->hmap_node, hash);
1052         locked = true;
1053     } else if (!ovs_mutex_trylock(&ukey->mutex)) {
1054         locked = true;
1055     }
1056     ovs_mutex_unlock(&udpif->ukeys[idx].mutex);
1057
1058     if (locked) {
1059         *result = ukey;
1060     } else {
1061         *result = NULL;
1062     }
1063     return locked;
1064 }
1065
1066 static void
1067 ukey_delete(struct revalidator *revalidator, struct udpif_key *ukey)
1068     OVS_NO_THREAD_SAFETY_ANALYSIS
1069 {
1070     if (revalidator) {
1071         hmap_remove(revalidator->ukeys, &ukey->hmap_node);
1072     }
1073     xlate_cache_delete(ukey->xcache);
1074     ovs_mutex_destroy(&ukey->mutex);
1075     free(ukey);
1076 }
1077
1078 static bool
1079 should_revalidate(uint64_t packets, long long int used)
1080 {
1081     long long int metric, now, duration;
1082
1083     /* Calculate the mean time between seeing these packets. If this
1084      * exceeds the threshold, then delete the flow rather than performing
1085      * costly revalidation for flows that aren't being hit frequently.
1086      *
1087      * This is targeted at situations where the dump_duration is high (~1s),
1088      * and revalidation is triggered by a call to udpif_revalidate(). In
1089      * these situations, revalidation of all flows causes fluctuations in the
1090      * flow_limit due to the interaction with the dump_duration and max_idle.
1091      * This tends to result in deletion of low-throughput flows anyway, so
1092      * skip the revalidation and just delete those flows. */
1093     packets = MAX(packets, 1);
1094     now = MAX(used, time_msec());
1095     duration = now - used;
1096     metric = duration / packets;
1097
1098     if (metric > 200) {
1099         return false;
1100     }
1101     return true;
1102 }
1103
1104 static bool
1105 revalidate_ukey(struct udpif *udpif, struct udpif_key *ukey,
1106                 const struct dpif_flow *f)
1107     OVS_REQUIRES(ukey->mutex)
1108 {
1109     uint64_t slow_path_buf[128 / 8];
1110     struct xlate_out xout, *xoutp;
1111     struct netflow *netflow;
1112     struct ofproto_dpif *ofproto;
1113     struct dpif_flow_stats push;
1114     struct ofpbuf xout_actions;
1115     struct flow flow, dp_mask;
1116     uint32_t *dp32, *xout32;
1117     odp_port_t odp_in_port;
1118     struct xlate_in xin;
1119     long long int last_used;
1120     int error;
1121     size_t i;
1122     bool may_learn, ok;
1123
1124     ok = false;
1125     xoutp = NULL;
1126     netflow = NULL;
1127
1128     last_used = ukey->stats.used;
1129     push.used = f->stats.used;
1130     push.tcp_flags = f->stats.tcp_flags;
1131     push.n_packets = (f->stats.n_packets > ukey->stats.n_packets
1132                       ? f->stats.n_packets - ukey->stats.n_packets
1133                       : 0);
1134     push.n_bytes = (f->stats.n_bytes > ukey->stats.n_bytes
1135                     ? f->stats.n_bytes - ukey->stats.n_bytes
1136                     : 0);
1137
1138     if (udpif->need_revalidate && last_used
1139         && !should_revalidate(push.n_packets, last_used)) {
1140         ok = false;
1141         goto exit;
1142     }
1143
1144     /* We will push the stats, so update the ukey stats cache. */
1145     ukey->stats = f->stats;
1146     if (!push.n_packets && !udpif->need_revalidate) {
1147         ok = true;
1148         goto exit;
1149     }
1150
1151     may_learn = push.n_packets > 0;
1152     if (ukey->xcache && !udpif->need_revalidate) {
1153         xlate_push_stats(ukey->xcache, may_learn, &push);
1154         ok = true;
1155         goto exit;
1156     }
1157
1158     error = xlate_receive(udpif->backer, NULL, ukey->key, ukey->key_len, &flow,
1159                           &ofproto, NULL, NULL, &netflow, &odp_in_port);
1160     if (error) {
1161         goto exit;
1162     }
1163
1164     if (udpif->need_revalidate) {
1165         xlate_cache_clear(ukey->xcache);
1166     }
1167     if (!ukey->xcache) {
1168         ukey->xcache = xlate_cache_new();
1169     }
1170
1171     xlate_in_init(&xin, ofproto, &flow, NULL, push.tcp_flags, NULL);
1172     xin.resubmit_stats = push.n_packets ? &push : NULL;
1173     xin.xcache = ukey->xcache;
1174     xin.may_learn = may_learn;
1175     xin.skip_wildcards = !udpif->need_revalidate;
1176     xlate_actions(&xin, &xout);
1177     xoutp = &xout;
1178
1179     if (!udpif->need_revalidate) {
1180         ok = true;
1181         goto exit;
1182     }
1183
1184     if (!xout.slow) {
1185         ofpbuf_use_const(&xout_actions, ofpbuf_data(&xout.odp_actions),
1186                          ofpbuf_size(&xout.odp_actions));
1187     } else {
1188         ofpbuf_use_stack(&xout_actions, slow_path_buf, sizeof slow_path_buf);
1189         compose_slow_path(udpif, &xout, &flow, odp_in_port, &xout_actions);
1190     }
1191
1192     if (f->actions_len != ofpbuf_size(&xout_actions)
1193         || memcmp(ofpbuf_data(&xout_actions), f->actions, f->actions_len)) {
1194         goto exit;
1195     }
1196
1197     if (odp_flow_key_to_mask(f->mask, f->mask_len, &dp_mask, &flow)
1198         == ODP_FIT_ERROR) {
1199         goto exit;
1200     }
1201
1202     /* Since the kernel is free to ignore wildcarded bits in the mask, we can't
1203      * directly check that the masks are the same.  Instead we check that the
1204      * mask in the kernel is more specific i.e. less wildcarded, than what
1205      * we've calculated here.  This guarantees we don't catch any packets we
1206      * shouldn't with the megaflow. */
1207     dp32 = (uint32_t *) &dp_mask;
1208     xout32 = (uint32_t *) &xout.wc.masks;
1209     for (i = 0; i < FLOW_U32S; i++) {
1210         if ((dp32[i] | xout32[i]) != dp32[i]) {
1211             goto exit;
1212         }
1213     }
1214     ok = true;
1215
1216 exit:
1217     if (netflow) {
1218         if (!ok) {
1219             netflow_expire(netflow, &flow);
1220             netflow_flow_clear(netflow, &flow);
1221         }
1222         netflow_unref(netflow);
1223     }
1224     xlate_out_uninit(xoutp);
1225     return ok;
1226 }
1227
1228 struct dump_op {
1229     struct udpif_key *ukey;
1230     struct dpif_flow_stats stats; /* Stats for 'op'. */
1231     struct dpif_op op;            /* Flow del operation. */
1232 };
1233
1234 static void
1235 dump_op_init(struct dump_op *op, const struct nlattr *key, size_t key_len,
1236              struct udpif_key *ukey)
1237 {
1238     op->ukey = ukey;
1239     op->op.type = DPIF_OP_FLOW_DEL;
1240     op->op.u.flow_del.key = key;
1241     op->op.u.flow_del.key_len = key_len;
1242     op->op.u.flow_del.stats = &op->stats;
1243 }
1244
1245 static void
1246 push_dump_ops__(struct udpif *udpif, struct dump_op *ops, size_t n_ops)
1247 {
1248     struct dpif_op *opsp[REVALIDATE_MAX_BATCH];
1249     size_t i;
1250
1251     ovs_assert(n_ops <= REVALIDATE_MAX_BATCH);
1252     for (i = 0; i < n_ops; i++) {
1253         opsp[i] = &ops[i].op;
1254     }
1255     dpif_operate(udpif->dpif, opsp, n_ops);
1256
1257     for (i = 0; i < n_ops; i++) {
1258         struct dump_op *op = &ops[i];
1259         struct dpif_flow_stats *push, *stats, push_buf;
1260
1261         stats = op->op.u.flow_del.stats;
1262         if (op->ukey) {
1263             push = &push_buf;
1264             ovs_mutex_lock(&op->ukey->mutex);
1265             push->used = MAX(stats->used, op->ukey->stats.used);
1266             push->tcp_flags = stats->tcp_flags | op->ukey->stats.tcp_flags;
1267             push->n_packets = stats->n_packets - op->ukey->stats.n_packets;
1268             push->n_bytes = stats->n_bytes - op->ukey->stats.n_bytes;
1269             ovs_mutex_unlock(&op->ukey->mutex);
1270         } else {
1271             push = stats;
1272         }
1273
1274         if (push->n_packets || netflow_exists()) {
1275             struct ofproto_dpif *ofproto;
1276             struct netflow *netflow;
1277             struct flow flow;
1278             bool may_learn;
1279
1280             may_learn = push->n_packets > 0;
1281             if (op->ukey) {
1282                 ovs_mutex_lock(&op->ukey->mutex);
1283                 if (op->ukey->xcache) {
1284                     xlate_push_stats(op->ukey->xcache, may_learn, push);
1285                     ovs_mutex_unlock(&op->ukey->mutex);
1286                     continue;
1287                 }
1288                 ovs_mutex_unlock(&op->ukey->mutex);
1289             }
1290
1291             if (!xlate_receive(udpif->backer, NULL, op->op.u.flow_del.key,
1292                                op->op.u.flow_del.key_len, &flow, &ofproto,
1293                                NULL, NULL, &netflow, NULL)) {
1294                 struct xlate_in xin;
1295
1296                 xlate_in_init(&xin, ofproto, &flow, NULL, push->tcp_flags,
1297                               NULL);
1298                 xin.resubmit_stats = push->n_packets ? push : NULL;
1299                 xin.may_learn = may_learn;
1300                 xin.skip_wildcards = true;
1301                 xlate_actions_for_side_effects(&xin);
1302
1303                 if (netflow) {
1304                     netflow_expire(netflow, &flow);
1305                     netflow_flow_clear(netflow, &flow);
1306                     netflow_unref(netflow);
1307                 }
1308             }
1309         }
1310     }
1311 }
1312
1313 static void
1314 push_dump_ops(struct revalidator *revalidator,
1315               struct dump_op *ops, size_t n_ops)
1316 {
1317     int i;
1318
1319     push_dump_ops__(revalidator->udpif, ops, n_ops);
1320     for (i = 0; i < n_ops; i++) {
1321         ukey_delete(revalidator, ops[i].ukey);
1322     }
1323 }
1324
1325 static void
1326 revalidate(struct revalidator *revalidator)
1327 {
1328     struct udpif *udpif = revalidator->udpif;
1329     struct dpif_flow_dump_thread *dump_thread;
1330     unsigned int flow_limit;
1331
1332     atomic_read(&udpif->flow_limit, &flow_limit);
1333     dump_thread = dpif_flow_dump_thread_create(udpif->dump);
1334     for (;;) {
1335         struct dump_op ops[REVALIDATE_MAX_BATCH];
1336         int n_ops = 0;
1337
1338         struct dpif_flow flows[REVALIDATE_MAX_BATCH];
1339         const struct dpif_flow *f;
1340         int n_dumped;
1341
1342         long long int max_idle;
1343         long long int now;
1344         size_t n_dp_flows;
1345         bool kill_them_all;
1346
1347         n_dumped = dpif_flow_dump_next(dump_thread, flows, ARRAY_SIZE(flows));
1348         if (!n_dumped) {
1349             break;
1350         }
1351
1352         now = time_msec();
1353
1354         /* In normal operation we want to keep flows around until they have
1355          * been idle for 'ofproto_max_idle' milliseconds.  However:
1356          *
1357          *     - If the number of datapath flows climbs above 'flow_limit',
1358          *       drop that down to 100 ms to try to bring the flows down to
1359          *       the limit.
1360          *
1361          *     - If the number of datapath flows climbs above twice
1362          *       'flow_limit', delete all the datapath flows as an emergency
1363          *       measure.  (We reassess this condition for the next batch of
1364          *       datapath flows, so we will recover before all the flows are
1365          *       gone.) */
1366         n_dp_flows = udpif_get_n_flows(udpif);
1367         kill_them_all = n_dp_flows > flow_limit * 2;
1368         max_idle = n_dp_flows > flow_limit ? 100 : ofproto_max_idle;
1369
1370         for (f = flows; f < &flows[n_dumped]; f++) {
1371             long long int used = f->stats.used;
1372             struct udpif_key *ukey;
1373             bool already_dumped, mark;
1374
1375             if (!ukey_acquire(udpif, f->key, f->key_len, used, &ukey)) {
1376                 /* We couldn't acquire the ukey. This means that
1377                  * another revalidator is processing this flow
1378                  * concurrently, so don't bother processing it. */
1379                 COVERAGE_INC(upcall_duplicate_flow);
1380                 continue;
1381             }
1382
1383             already_dumped = ukey->mark || !ukey->flow_exists;
1384             if (already_dumped) {
1385                 /* The flow has already been dumped and handled by another
1386                  * revalidator during this flow dump operation. Skip it. */
1387                 COVERAGE_INC(upcall_duplicate_flow);
1388                 ovs_mutex_unlock(&ukey->mutex);
1389                 continue;
1390             }
1391
1392             if (!used) {
1393                 used = ukey->created;
1394             }
1395             if (kill_them_all || (used && used < now - max_idle)) {
1396                 mark = false;
1397             } else {
1398                 mark = revalidate_ukey(udpif, ukey, f);
1399             }
1400             ukey->mark = ukey->flow_exists = mark;
1401
1402             if (!mark) {
1403                 dump_op_init(&ops[n_ops++], f->key, f->key_len, ukey);
1404             }
1405             ovs_mutex_unlock(&ukey->mutex);
1406         }
1407
1408         if (n_ops) {
1409             push_dump_ops__(udpif, ops, n_ops);
1410         }
1411     }
1412     dpif_flow_dump_thread_destroy(dump_thread);
1413 }
1414
1415 static void
1416 revalidator_sweep__(struct revalidator *revalidator, bool purge)
1417     OVS_NO_THREAD_SAFETY_ANALYSIS
1418 {
1419     struct dump_op ops[REVALIDATE_MAX_BATCH];
1420     struct udpif_key *ukey, *next;
1421     size_t n_ops;
1422
1423     n_ops = 0;
1424
1425     /* During garbage collection, this revalidator completely owns its ukeys
1426      * map, and therefore doesn't need to do any locking. */
1427     HMAP_FOR_EACH_SAFE (ukey, next, hmap_node, revalidator->ukeys) {
1428         if (!purge && ukey->mark) {
1429             ukey->mark = false;
1430         } else if (!ukey->flow_exists) {
1431             ukey_delete(revalidator, ukey);
1432         } else {
1433             struct dump_op *op = &ops[n_ops++];
1434
1435             /* If we have previously seen a flow in the datapath, but didn't
1436              * see it during the most recent dump, delete it. This allows us
1437              * to clean up the ukey and keep the statistics consistent. */
1438             dump_op_init(op, ukey->key, ukey->key_len, ukey);
1439             if (n_ops == REVALIDATE_MAX_BATCH) {
1440                 push_dump_ops(revalidator, ops, n_ops);
1441                 n_ops = 0;
1442             }
1443         }
1444     }
1445
1446     if (n_ops) {
1447         push_dump_ops(revalidator, ops, n_ops);
1448     }
1449 }
1450
1451 static void
1452 revalidator_sweep(struct revalidator *revalidator)
1453 {
1454     revalidator_sweep__(revalidator, false);
1455 }
1456
1457 static void
1458 revalidator_purge(struct revalidator *revalidator)
1459 {
1460     revalidator_sweep__(revalidator, true);
1461 }
1462 \f
1463 static void
1464 upcall_unixctl_show(struct unixctl_conn *conn, int argc OVS_UNUSED,
1465                     const char *argv[] OVS_UNUSED, void *aux OVS_UNUSED)
1466 {
1467     struct ds ds = DS_EMPTY_INITIALIZER;
1468     struct udpif *udpif;
1469
1470     LIST_FOR_EACH (udpif, list_node, &all_udpifs) {
1471         unsigned int flow_limit;
1472         size_t i;
1473
1474         atomic_read(&udpif->flow_limit, &flow_limit);
1475
1476         ds_put_format(&ds, "%s:\n", dpif_name(udpif->dpif));
1477         ds_put_format(&ds, "\tflows         : (current %lu)"
1478             " (avg %u) (max %u) (limit %u)\n", udpif_get_n_flows(udpif),
1479             udpif->avg_n_flows, udpif->max_n_flows, flow_limit);
1480         ds_put_format(&ds, "\tdump duration : %lldms\n", udpif->dump_duration);
1481
1482         ds_put_char(&ds, '\n');
1483         for (i = 0; i < n_revalidators; i++) {
1484             struct revalidator *revalidator = &udpif->revalidators[i];
1485
1486             ovs_mutex_lock(&udpif->ukeys[i].mutex);
1487             ds_put_format(&ds, "\t%u: (keys %"PRIuSIZE")\n",
1488                           revalidator->id, hmap_count(&udpif->ukeys[i].hmap));
1489             ovs_mutex_unlock(&udpif->ukeys[i].mutex);
1490         }
1491     }
1492
1493     unixctl_command_reply(conn, ds_cstr(&ds));
1494     ds_destroy(&ds);
1495 }
1496
1497 /* Disable using the megaflows.
1498  *
1499  * This command is only needed for advanced debugging, so it's not
1500  * documented in the man page. */
1501 static void
1502 upcall_unixctl_disable_megaflows(struct unixctl_conn *conn,
1503                                  int argc OVS_UNUSED,
1504                                  const char *argv[] OVS_UNUSED,
1505                                  void *aux OVS_UNUSED)
1506 {
1507     atomic_store(&enable_megaflows, false);
1508     udpif_flush_all_datapaths();
1509     unixctl_command_reply(conn, "megaflows disabled");
1510 }
1511
1512 /* Re-enable using megaflows.
1513  *
1514  * This command is only needed for advanced debugging, so it's not
1515  * documented in the man page. */
1516 static void
1517 upcall_unixctl_enable_megaflows(struct unixctl_conn *conn,
1518                                 int argc OVS_UNUSED,
1519                                 const char *argv[] OVS_UNUSED,
1520                                 void *aux OVS_UNUSED)
1521 {
1522     atomic_store(&enable_megaflows, true);
1523     udpif_flush_all_datapaths();
1524     unixctl_command_reply(conn, "megaflows enabled");
1525 }
1526
1527 /* Set the flow limit.
1528  *
1529  * This command is only needed for advanced debugging, so it's not
1530  * documented in the man page. */
1531 static void
1532 upcall_unixctl_set_flow_limit(struct unixctl_conn *conn,
1533                               int argc OVS_UNUSED,
1534                               const char *argv[] OVS_UNUSED,
1535                               void *aux OVS_UNUSED)
1536 {
1537     struct ds ds = DS_EMPTY_INITIALIZER;
1538     struct udpif *udpif;
1539     unsigned int flow_limit = atoi(argv[1]);
1540
1541     LIST_FOR_EACH (udpif, list_node, &all_udpifs) {
1542         atomic_store(&udpif->flow_limit, flow_limit);
1543     }
1544     ds_put_format(&ds, "set flow_limit to %u\n", flow_limit);
1545     unixctl_command_reply(conn, ds_cstr(&ds));
1546     ds_destroy(&ds);
1547 }