ofproto: per-table statistics
[cascardo/ovs.git] / ofproto / ofproto-dpif-upcall.c
1 /* Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014 Nicira, Inc.
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.  */
14
15 #include <config.h>
16 #include "ofproto-dpif-upcall.h"
17
18 #include <errno.h>
19 #include <stdbool.h>
20 #include <inttypes.h>
21
22 #include "connmgr.h"
23 #include "coverage.h"
24 #include "dpif.h"
25 #include "dynamic-string.h"
26 #include "fail-open.h"
27 #include "guarded-list.h"
28 #include "latch.h"
29 #include "list.h"
30 #include "netlink.h"
31 #include "ofpbuf.h"
32 #include "ofproto-dpif-ipfix.h"
33 #include "ofproto-dpif-sflow.h"
34 #include "ofproto-dpif-xlate.h"
35 #include "ovs-rcu.h"
36 #include "packets.h"
37 #include "poll-loop.h"
38 #include "seq.h"
39 #include "unixctl.h"
40 #include "vlog.h"
41
42 #define MAX_QUEUE_LENGTH 512
43 #define UPCALL_MAX_BATCH 50
44 #define REVALIDATE_MAX_BATCH 50
45
46 VLOG_DEFINE_THIS_MODULE(ofproto_dpif_upcall);
47
48 COVERAGE_DEFINE(upcall_duplicate_flow);
49
50 /* A thread that reads upcalls from dpif, forwards each upcall's packet,
51  * and possibly sets up a kernel flow as a cache. */
52 struct handler {
53     struct udpif *udpif;               /* Parent udpif. */
54     pthread_t thread;                  /* Thread ID. */
55     uint32_t handler_id;               /* Handler id. */
56 };
57
58 /* A thread that processes datapath flows, updates OpenFlow statistics, and
59  * updates or removes them if necessary. */
60 struct revalidator {
61     struct udpif *udpif;               /* Parent udpif. */
62     pthread_t thread;                  /* Thread ID. */
63     unsigned int id;                   /* ovsthread_id_self(). */
64     struct hmap *ukeys;                /* Points into udpif->ukeys for this
65                                           revalidator. Used for GC phase. */
66 };
67
68 /* An upcall handler for ofproto_dpif.
69  *
70  * udpif keeps records of two kind of logically separate units:
71  *
72  * upcall handling
73  * ---------------
74  *
75  *    - An array of 'struct handler's for upcall handling and flow
76  *      installation.
77  *
78  * flow revalidation
79  * -----------------
80  *
81  *    - Revalidation threads which read the datapath flow table and maintains
82  *      them.
83  */
84 struct udpif {
85     struct list list_node;             /* In all_udpifs list. */
86
87     struct dpif *dpif;                 /* Datapath handle. */
88     struct dpif_backer *backer;        /* Opaque dpif_backer pointer. */
89
90     uint32_t secret;                   /* Random seed for upcall hash. */
91
92     struct handler *handlers;          /* Upcall handlers. */
93     size_t n_handlers;
94
95     struct revalidator *revalidators;  /* Flow revalidators. */
96     size_t n_revalidators;
97
98     struct latch exit_latch;           /* Tells child threads to exit. */
99
100     /* Revalidation. */
101     struct seq *reval_seq;             /* Incremented to force revalidation. */
102     bool need_revalidate;              /* As indicated by 'reval_seq'. */
103     bool reval_exit;                   /* Set by leader on 'exit_latch. */
104     pthread_barrier_t reval_barrier;   /* Barrier used by revalidators. */
105     struct dpif_flow_dump *dump;       /* DPIF flow dump state. */
106     long long int dump_duration;       /* Duration of the last flow dump. */
107     struct seq *dump_seq;              /* Increments each dump iteration. */
108
109     /* There are 'n_revalidators' ukey hmaps. Each revalidator retains a
110      * reference to one of these for garbage collection.
111      *
112      * During the flow dump phase, revalidators insert into these with a random
113      * distribution. During the garbage collection phase, each revalidator
114      * takes care of garbage collecting one of these hmaps. */
115     struct {
116         struct ovs_mutex mutex;        /* Guards the following. */
117         struct hmap hmap OVS_GUARDED;  /* Datapath flow keys. */
118     } *ukeys;
119
120     /* Datapath flow statistics. */
121     unsigned int max_n_flows;
122     unsigned int avg_n_flows;
123
124     /* Following fields are accessed and modified by different threads. */
125     atomic_uint flow_limit;            /* Datapath flow hard limit. */
126
127     /* n_flows_mutex prevents multiple threads updating these concurrently. */
128     atomic_ulong n_flows;           /* Number of flows in the datapath. */
129     atomic_llong n_flows_timestamp;    /* Last time n_flows was updated. */
130     struct ovs_mutex n_flows_mutex;
131 };
132
133 enum upcall_type {
134     BAD_UPCALL,                 /* Some kind of bug somewhere. */
135     MISS_UPCALL,                /* A flow miss.  */
136     SFLOW_UPCALL,               /* sFlow sample. */
137     FLOW_SAMPLE_UPCALL,         /* Per-flow sampling. */
138     IPFIX_UPCALL                /* Per-bridge sampling. */
139 };
140
141 struct upcall {
142     struct ofproto_dpif *ofproto;
143
144     struct flow flow;
145     const struct nlattr *key;
146     size_t key_len;
147     enum dpif_upcall_type upcall_type;
148     struct dpif_flow_stats stats;
149     odp_port_t odp_in_port;
150
151     uint64_t slow_path_buf[128 / 8];
152     struct odputil_keybuf mask_buf;
153
154     struct xlate_out xout;
155
156     /* Raw upcall plus data for keeping track of the memory backing it. */
157     struct dpif_upcall dpif_upcall; /* As returned by dpif_recv() */
158     struct ofpbuf upcall_buf;       /* Owns some data in 'dpif_upcall'. */
159     uint64_t upcall_stub[512 / 8];  /* Buffer to reduce need for malloc(). */
160 };
161
162 /* 'udpif_key's are responsible for tracking the little bit of state udpif
163  * needs to do flow expiration which can't be pulled directly from the
164  * datapath.  They may be created or maintained by any revalidator during
165  * the dump phase, but are owned by a single revalidator, and are destroyed
166  * by that revalidator during the garbage-collection phase.
167  *
168  * While some elements of a udpif_key are protected by a mutex, the ukey itself
169  * is not.  Therefore it is not safe to destroy a udpif_key except when all
170  * revalidators are in garbage collection phase, or they aren't running. */
171 struct udpif_key {
172     struct hmap_node hmap_node;     /* In parent revalidator 'ukeys' map. */
173
174     /* These elements are read only once created, and therefore aren't
175      * protected by a mutex. */
176     const struct nlattr *key;      /* Datapath flow key. */
177     size_t key_len;                /* Length of 'key'. */
178
179     struct ovs_mutex mutex;                   /* Guards the following. */
180     struct dpif_flow_stats stats OVS_GUARDED; /* Last known stats.*/
181     long long int created OVS_GUARDED;        /* Estimate of creation time. */
182     bool mark OVS_GUARDED;                    /* For mark and sweep garbage
183                                                  collection. */
184     bool flow_exists OVS_GUARDED;             /* Ensures flows are only deleted
185                                                  once. */
186
187     struct xlate_cache *xcache OVS_GUARDED;   /* Cache for xlate entries that
188                                                * are affected by this ukey.
189                                                * Used for stats and learning.*/
190     struct odputil_keybuf key_buf;            /* Memory for 'key'. */
191 };
192
193 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
194 static struct list all_udpifs = LIST_INITIALIZER(&all_udpifs);
195
196 static size_t read_upcalls(struct handler *,
197                            struct upcall upcalls[UPCALL_MAX_BATCH]);
198 static void handle_upcalls(struct handler *, struct upcall *, size_t n_upcalls);
199 static void udpif_stop_threads(struct udpif *);
200 static void udpif_start_threads(struct udpif *, size_t n_handlers,
201                                 size_t n_revalidators);
202 static void *udpif_upcall_handler(void *);
203 static void *udpif_revalidator(void *);
204 static unsigned long udpif_get_n_flows(struct udpif *);
205 static void revalidate(struct revalidator *);
206 static void revalidator_sweep(struct revalidator *);
207 static void revalidator_purge(struct revalidator *);
208 static void upcall_unixctl_show(struct unixctl_conn *conn, int argc,
209                                 const char *argv[], void *aux);
210 static void upcall_unixctl_disable_megaflows(struct unixctl_conn *, int argc,
211                                              const char *argv[], void *aux);
212 static void upcall_unixctl_enable_megaflows(struct unixctl_conn *, int argc,
213                                             const char *argv[], void *aux);
214 static void upcall_unixctl_set_flow_limit(struct unixctl_conn *conn, int argc,
215                                             const char *argv[], void *aux);
216
217 static struct udpif_key *ukey_create(const struct nlattr *key, size_t key_len,
218                                      long long int used);
219 static void ukey_delete(struct revalidator *, struct udpif_key *);
220
221 static atomic_bool enable_megaflows = ATOMIC_VAR_INIT(true);
222
223 struct udpif *
224 udpif_create(struct dpif_backer *backer, struct dpif *dpif)
225 {
226     static struct ovsthread_once once = OVSTHREAD_ONCE_INITIALIZER;
227     struct udpif *udpif = xzalloc(sizeof *udpif);
228
229     if (ovsthread_once_start(&once)) {
230         unixctl_command_register("upcall/show", "", 0, 0, upcall_unixctl_show,
231                                  NULL);
232         unixctl_command_register("upcall/disable-megaflows", "", 0, 0,
233                                  upcall_unixctl_disable_megaflows, NULL);
234         unixctl_command_register("upcall/enable-megaflows", "", 0, 0,
235                                  upcall_unixctl_enable_megaflows, NULL);
236         unixctl_command_register("upcall/set-flow-limit", "", 1, 1,
237                                  upcall_unixctl_set_flow_limit, NULL);
238         ovsthread_once_done(&once);
239     }
240
241     udpif->dpif = dpif;
242     udpif->backer = backer;
243     atomic_init(&udpif->flow_limit, MIN(ofproto_flow_limit, 10000));
244     udpif->secret = random_uint32();
245     udpif->reval_seq = seq_create();
246     udpif->dump_seq = seq_create();
247     latch_init(&udpif->exit_latch);
248     list_push_back(&all_udpifs, &udpif->list_node);
249     atomic_init(&udpif->n_flows, 0);
250     atomic_init(&udpif->n_flows_timestamp, LLONG_MIN);
251     ovs_mutex_init(&udpif->n_flows_mutex);
252
253     return udpif;
254 }
255
256 void
257 udpif_destroy(struct udpif *udpif)
258 {
259     udpif_stop_threads(udpif);
260
261     list_remove(&udpif->list_node);
262     latch_destroy(&udpif->exit_latch);
263     seq_destroy(udpif->reval_seq);
264     seq_destroy(udpif->dump_seq);
265     ovs_mutex_destroy(&udpif->n_flows_mutex);
266     free(udpif);
267 }
268
269 /* Stops the handler and revalidator threads, must be enclosed in
270  * ovsrcu quiescent state unless when destroying udpif. */
271 static void
272 udpif_stop_threads(struct udpif *udpif)
273 {
274     if (udpif && (udpif->n_handlers != 0 || udpif->n_revalidators != 0)) {
275         size_t i;
276
277         latch_set(&udpif->exit_latch);
278
279         for (i = 0; i < udpif->n_handlers; i++) {
280             struct handler *handler = &udpif->handlers[i];
281
282             xpthread_join(handler->thread, NULL);
283         }
284
285         for (i = 0; i < udpif->n_revalidators; i++) {
286             xpthread_join(udpif->revalidators[i].thread, NULL);
287         }
288
289         for (i = 0; i < udpif->n_revalidators; i++) {
290             struct revalidator *revalidator = &udpif->revalidators[i];
291
292             /* Delete ukeys, and delete all flows from the datapath to prevent
293              * double-counting stats. */
294             revalidator_purge(revalidator);
295
296             hmap_destroy(&udpif->ukeys[i].hmap);
297             ovs_mutex_destroy(&udpif->ukeys[i].mutex);
298         }
299
300         latch_poll(&udpif->exit_latch);
301
302         xpthread_barrier_destroy(&udpif->reval_barrier);
303
304         free(udpif->revalidators);
305         udpif->revalidators = NULL;
306         udpif->n_revalidators = 0;
307
308         free(udpif->handlers);
309         udpif->handlers = NULL;
310         udpif->n_handlers = 0;
311
312         free(udpif->ukeys);
313         udpif->ukeys = NULL;
314     }
315 }
316
317 /* Starts the handler and revalidator threads, must be enclosed in
318  * ovsrcu quiescent state. */
319 static void
320 udpif_start_threads(struct udpif *udpif, size_t n_handlers,
321                     size_t n_revalidators)
322 {
323     if (udpif && n_handlers && n_revalidators) {
324         size_t i;
325
326         udpif->n_handlers = n_handlers;
327         udpif->n_revalidators = n_revalidators;
328
329         udpif->handlers = xzalloc(udpif->n_handlers * sizeof *udpif->handlers);
330         for (i = 0; i < udpif->n_handlers; i++) {
331             struct handler *handler = &udpif->handlers[i];
332
333             handler->udpif = udpif;
334             handler->handler_id = i;
335             handler->thread = ovs_thread_create(
336                 "handler", udpif_upcall_handler, handler);
337         }
338
339         xpthread_barrier_init(&udpif->reval_barrier, NULL,
340                               udpif->n_revalidators);
341         udpif->reval_exit = false;
342         udpif->revalidators = xzalloc(udpif->n_revalidators
343                                       * sizeof *udpif->revalidators);
344         udpif->ukeys = xmalloc(sizeof *udpif->ukeys * n_revalidators);
345         for (i = 0; i < udpif->n_revalidators; i++) {
346             struct revalidator *revalidator = &udpif->revalidators[i];
347
348             revalidator->udpif = udpif;
349             hmap_init(&udpif->ukeys[i].hmap);
350             ovs_mutex_init(&udpif->ukeys[i].mutex);
351             revalidator->ukeys = &udpif->ukeys[i].hmap;
352             revalidator->thread = ovs_thread_create(
353                 "revalidator", udpif_revalidator, revalidator);
354         }
355     }
356 }
357
358 /* Tells 'udpif' how many threads it should use to handle upcalls.
359  * 'n_handlers' and 'n_revalidators' can never be zero.  'udpif''s
360  * datapath handle must have packet reception enabled before starting
361  * threads. */
362 void
363 udpif_set_threads(struct udpif *udpif, size_t n_handlers,
364                   size_t n_revalidators)
365 {
366     ovs_assert(udpif);
367     ovs_assert(n_handlers && n_revalidators);
368
369     ovsrcu_quiesce_start();
370     if (udpif->n_handlers != n_handlers
371         || udpif->n_revalidators != n_revalidators) {
372         udpif_stop_threads(udpif);
373     }
374
375     if (!udpif->handlers && !udpif->revalidators) {
376         int error;
377
378         error = dpif_handlers_set(udpif->dpif, n_handlers);
379         if (error) {
380             VLOG_ERR("failed to configure handlers in dpif %s: %s",
381                      dpif_name(udpif->dpif), ovs_strerror(error));
382             return;
383         }
384
385         udpif_start_threads(udpif, n_handlers, n_revalidators);
386     }
387     ovsrcu_quiesce_end();
388 }
389
390 /* Waits for all ongoing upcall translations to complete.  This ensures that
391  * there are no transient references to any removed ofprotos (or other
392  * objects).  In particular, this should be called after an ofproto is removed
393  * (e.g. via xlate_remove_ofproto()) but before it is destroyed. */
394 void
395 udpif_synchronize(struct udpif *udpif)
396 {
397     /* This is stronger than necessary.  It would be sufficient to ensure
398      * (somehow) that each handler and revalidator thread had passed through
399      * its main loop once. */
400     size_t n_handlers = udpif->n_handlers;
401     size_t n_revalidators = udpif->n_revalidators;
402
403     ovsrcu_quiesce_start();
404     udpif_stop_threads(udpif);
405     udpif_start_threads(udpif, n_handlers, n_revalidators);
406     ovsrcu_quiesce_end();
407 }
408
409 /* Notifies 'udpif' that something changed which may render previous
410  * xlate_actions() results invalid. */
411 void
412 udpif_revalidate(struct udpif *udpif)
413 {
414     seq_change(udpif->reval_seq);
415 }
416
417 /* Returns a seq which increments every time 'udpif' pulls stats from the
418  * datapath.  Callers can use this to get a sense of when might be a good time
419  * to do periodic work which relies on relatively up to date statistics. */
420 struct seq *
421 udpif_dump_seq(struct udpif *udpif)
422 {
423     return udpif->dump_seq;
424 }
425
426 void
427 udpif_get_memory_usage(struct udpif *udpif, struct simap *usage)
428 {
429     size_t i;
430
431     simap_increase(usage, "handlers", udpif->n_handlers);
432
433     simap_increase(usage, "revalidators", udpif->n_revalidators);
434     for (i = 0; i < udpif->n_revalidators; i++) {
435         ovs_mutex_lock(&udpif->ukeys[i].mutex);
436         simap_increase(usage, "udpif keys", hmap_count(&udpif->ukeys[i].hmap));
437         ovs_mutex_unlock(&udpif->ukeys[i].mutex);
438     }
439 }
440
441 /* Remove flows from a single datapath. */
442 void
443 udpif_flush(struct udpif *udpif)
444 {
445     size_t n_handlers, n_revalidators;
446
447     n_handlers = udpif->n_handlers;
448     n_revalidators = udpif->n_revalidators;
449
450     ovsrcu_quiesce_start();
451
452     udpif_stop_threads(udpif);
453     dpif_flow_flush(udpif->dpif);
454     udpif_start_threads(udpif, n_handlers, n_revalidators);
455
456     ovsrcu_quiesce_end();
457 }
458
459 /* Removes all flows from all datapaths. */
460 static void
461 udpif_flush_all_datapaths(void)
462 {
463     struct udpif *udpif;
464
465     LIST_FOR_EACH (udpif, list_node, &all_udpifs) {
466         udpif_flush(udpif);
467     }
468 }
469
470 \f
471 static unsigned long
472 udpif_get_n_flows(struct udpif *udpif)
473 {
474     long long int time, now;
475     unsigned long flow_count;
476
477     now = time_msec();
478     atomic_read(&udpif->n_flows_timestamp, &time);
479     if (time < now - 100 && !ovs_mutex_trylock(&udpif->n_flows_mutex)) {
480         struct dpif_dp_stats stats;
481
482         atomic_store(&udpif->n_flows_timestamp, now);
483         dpif_get_dp_stats(udpif->dpif, &stats);
484         flow_count = stats.n_flows;
485         atomic_store(&udpif->n_flows, flow_count);
486         ovs_mutex_unlock(&udpif->n_flows_mutex);
487     } else {
488         atomic_read(&udpif->n_flows, &flow_count);
489     }
490     return flow_count;
491 }
492
493 /* The upcall handler thread tries to read a batch of UPCALL_MAX_BATCH
494  * upcalls from dpif, processes the batch and installs corresponding flows
495  * in dpif. */
496 static void *
497 udpif_upcall_handler(void *arg)
498 {
499     struct handler *handler = arg;
500     struct udpif *udpif = handler->udpif;
501
502     while (!latch_is_set(&handler->udpif->exit_latch)) {
503         struct upcall upcalls[UPCALL_MAX_BATCH];
504         size_t n_upcalls, i;
505
506         n_upcalls = read_upcalls(handler, upcalls);
507         if (!n_upcalls) {
508             dpif_recv_wait(udpif->dpif, handler->handler_id);
509             latch_wait(&udpif->exit_latch);
510             poll_block();
511         } else {
512             handle_upcalls(handler, upcalls, n_upcalls);
513
514             for (i = 0; i < n_upcalls; i++) {
515                 xlate_out_uninit(&upcalls[i].xout);
516                 ofpbuf_uninit(&upcalls[i].dpif_upcall.packet);
517                 ofpbuf_uninit(&upcalls[i].upcall_buf);
518             }
519         }
520         coverage_clear();
521     }
522
523     return NULL;
524 }
525
526 static void *
527 udpif_revalidator(void *arg)
528 {
529     /* Used by all revalidators. */
530     struct revalidator *revalidator = arg;
531     struct udpif *udpif = revalidator->udpif;
532     bool leader = revalidator == &udpif->revalidators[0];
533
534     /* Used only by the leader. */
535     long long int start_time = 0;
536     uint64_t last_reval_seq = 0;
537     unsigned int flow_limit = 0;
538     size_t n_flows = 0;
539
540     revalidator->id = ovsthread_id_self();
541     for (;;) {
542         if (leader) {
543             uint64_t reval_seq;
544
545             reval_seq = seq_read(udpif->reval_seq);
546             udpif->need_revalidate = last_reval_seq != reval_seq;
547             last_reval_seq = reval_seq;
548
549             n_flows = udpif_get_n_flows(udpif);
550             udpif->max_n_flows = MAX(n_flows, udpif->max_n_flows);
551             udpif->avg_n_flows = (udpif->avg_n_flows + n_flows) / 2;
552
553             /* Only the leader checks the exit latch to prevent a race where
554              * some threads think it's true and exit and others think it's
555              * false and block indefinitely on the reval_barrier */
556             udpif->reval_exit = latch_is_set(&udpif->exit_latch);
557
558             start_time = time_msec();
559             if (!udpif->reval_exit) {
560                 udpif->dump = dpif_flow_dump_create(udpif->dpif);
561             }
562         }
563
564         /* Wait for the leader to start the flow dump. */
565         xpthread_barrier_wait(&udpif->reval_barrier);
566         if (udpif->reval_exit) {
567             break;
568         }
569         revalidate(revalidator);
570
571         /* Wait for all flows to have been dumped before we garbage collect. */
572         xpthread_barrier_wait(&udpif->reval_barrier);
573         revalidator_sweep(revalidator);
574
575         /* Wait for all revalidators to finish garbage collection. */
576         xpthread_barrier_wait(&udpif->reval_barrier);
577
578         if (leader) {
579             long long int duration;
580
581             dpif_flow_dump_destroy(udpif->dump);
582             seq_change(udpif->dump_seq);
583
584             duration = MAX(time_msec() - start_time, 1);
585             atomic_read(&udpif->flow_limit, &flow_limit);
586             udpif->dump_duration = duration;
587             if (duration > 2000) {
588                 flow_limit /= duration / 1000;
589             } else if (duration > 1300) {
590                 flow_limit = flow_limit * 3 / 4;
591             } else if (duration < 1000 && n_flows > 2000
592                        && flow_limit < n_flows * 1000 / duration) {
593                 flow_limit += 1000;
594             }
595             flow_limit = MIN(ofproto_flow_limit, MAX(flow_limit, 1000));
596             atomic_store(&udpif->flow_limit, flow_limit);
597
598             if (duration > 2000) {
599                 VLOG_INFO("Spent an unreasonably long %lldms dumping flows",
600                           duration);
601             }
602
603             poll_timer_wait_until(start_time + MIN(ofproto_max_idle, 500));
604             seq_wait(udpif->reval_seq, last_reval_seq);
605             latch_wait(&udpif->exit_latch);
606             poll_block();
607         }
608     }
609
610     return NULL;
611 }
612 \f
613 static enum upcall_type
614 classify_upcall(const struct upcall *upcall)
615 {
616     const struct dpif_upcall *dpif_upcall = &upcall->dpif_upcall;
617     union user_action_cookie cookie;
618     size_t userdata_len;
619
620     /* First look at the upcall type. */
621     switch (dpif_upcall->type) {
622     case DPIF_UC_ACTION:
623         break;
624
625     case DPIF_UC_MISS:
626         return MISS_UPCALL;
627
628     case DPIF_N_UC_TYPES:
629     default:
630         VLOG_WARN_RL(&rl, "upcall has unexpected type %"PRIu32,
631                      dpif_upcall->type);
632         return BAD_UPCALL;
633     }
634
635     /* "action" upcalls need a closer look. */
636     if (!dpif_upcall->userdata) {
637         VLOG_WARN_RL(&rl, "action upcall missing cookie");
638         return BAD_UPCALL;
639     }
640     userdata_len = nl_attr_get_size(dpif_upcall->userdata);
641     if (userdata_len < sizeof cookie.type
642         || userdata_len > sizeof cookie) {
643         VLOG_WARN_RL(&rl, "action upcall cookie has unexpected size %"PRIuSIZE,
644                      userdata_len);
645         return BAD_UPCALL;
646     }
647     memset(&cookie, 0, sizeof cookie);
648     memcpy(&cookie, nl_attr_get(dpif_upcall->userdata), userdata_len);
649     if (userdata_len == MAX(8, sizeof cookie.sflow)
650         && cookie.type == USER_ACTION_COOKIE_SFLOW) {
651         return SFLOW_UPCALL;
652     } else if (userdata_len == MAX(8, sizeof cookie.slow_path)
653                && cookie.type == USER_ACTION_COOKIE_SLOW_PATH) {
654         return MISS_UPCALL;
655     } else if (userdata_len == MAX(8, sizeof cookie.flow_sample)
656                && cookie.type == USER_ACTION_COOKIE_FLOW_SAMPLE) {
657         return FLOW_SAMPLE_UPCALL;
658     } else if (userdata_len == MAX(8, sizeof cookie.ipfix)
659                && cookie.type == USER_ACTION_COOKIE_IPFIX) {
660         return IPFIX_UPCALL;
661     } else {
662         VLOG_WARN_RL(&rl, "invalid user cookie of type %"PRIu16
663                      " and size %"PRIuSIZE, cookie.type, userdata_len);
664         return BAD_UPCALL;
665     }
666 }
667
668 /* Calculates slow path actions for 'xout'.  'buf' must statically be
669  * initialized with at least 128 bytes of space. */
670 static void
671 compose_slow_path(struct udpif *udpif, struct xlate_out *xout,
672                   struct flow *flow, odp_port_t odp_in_port,
673                   struct ofpbuf *buf)
674 {
675     union user_action_cookie cookie;
676     odp_port_t port;
677     uint32_t pid;
678
679     cookie.type = USER_ACTION_COOKIE_SLOW_PATH;
680     cookie.slow_path.unused = 0;
681     cookie.slow_path.reason = xout->slow;
682
683     port = xout->slow & (SLOW_CFM | SLOW_BFD | SLOW_LACP | SLOW_STP)
684         ? ODPP_NONE
685         : odp_in_port;
686     pid = dpif_port_get_pid(udpif->dpif, port, flow_hash_5tuple(flow, 0));
687     odp_put_userspace_action(pid, &cookie, sizeof cookie.slow_path, buf);
688 }
689
690 static void
691 upcall_init(struct upcall *upcall, struct flow *flow, struct ofpbuf *packet,
692             struct ofproto_dpif *ofproto, struct dpif_upcall *dupcall,
693             odp_port_t odp_in_port)
694 {
695     struct xlate_in xin;
696     struct pkt_metadata md = pkt_metadata_from_flow(flow);
697
698     flow_extract(packet, &md, &upcall->flow);
699
700     upcall->ofproto = ofproto;
701     upcall->key = dupcall->key;
702     upcall->key_len = dupcall->key_len;
703     upcall->upcall_type = dupcall->type;
704     upcall->stats.n_packets = 1;
705     upcall->stats.n_bytes = ofpbuf_size(packet);
706     upcall->stats.used = time_msec();
707     upcall->stats.tcp_flags = ntohs(upcall->flow.tcp_flags);
708     upcall->odp_in_port = odp_in_port;
709
710     xlate_in_init(&xin, upcall->ofproto, &upcall->flow, NULL,
711                   upcall->stats.tcp_flags, NULL);
712     xin.may_learn = true;
713
714     if (upcall->upcall_type == DPIF_UC_MISS) {
715         xin.resubmit_stats = &upcall->stats;
716     } else {
717         /* For non-miss upcalls, there's a flow in the datapath which this
718          * packet was accounted to.  Presumably the revalidators will deal
719          * with pushing its stats eventually. */
720     }
721
722     xlate_actions(&xin, &upcall->xout);
723 }
724
725 /* Reads and classifies upcalls.  Returns the number of upcalls successfully
726  * read. */
727 static size_t
728 read_upcalls(struct handler *handler,
729              struct upcall upcalls[UPCALL_MAX_BATCH])
730 {
731     struct udpif *udpif = handler->udpif;
732     size_t i;
733     size_t n_upcalls = 0;
734
735     /* Try reading UPCALL_MAX_BATCH upcalls from dpif. */
736     for (i = 0; i < UPCALL_MAX_BATCH; i++) {
737         struct upcall *upcall = &upcalls[n_upcalls];
738         struct dpif_upcall *dupcall;
739         struct ofpbuf *packet;
740         struct ofproto_dpif *ofproto;
741         struct dpif_sflow *sflow;
742         struct dpif_ipfix *ipfix;
743         struct flow flow;
744         enum upcall_type type;
745         odp_port_t odp_in_port;
746         int error;
747
748         ofpbuf_use_stub(&upcall->upcall_buf, upcall->upcall_stub,
749                         sizeof upcall->upcall_stub);
750         error = dpif_recv(udpif->dpif, handler->handler_id,
751                           &upcall->dpif_upcall, &upcall->upcall_buf);
752         if (error) {
753             ofpbuf_uninit(&upcall->upcall_buf);
754             break;
755         }
756
757         dupcall = &upcall->dpif_upcall;
758         packet = &dupcall->packet;
759         error = xlate_receive(udpif->backer, packet, dupcall->key,
760                               dupcall->key_len, &flow,
761                               &ofproto, &ipfix, &sflow, NULL, &odp_in_port);
762         if (error) {
763             if (error == ENODEV) {
764                 /* Received packet on datapath port for which we couldn't
765                  * associate an ofproto.  This can happen if a port is removed
766                  * while traffic is being received.  Print a rate-limited
767                  * message in case it happens frequently.  Install a drop flow
768                  * so that future packets of the flow are inexpensively dropped
769                  * in the kernel. */
770                 VLOG_INFO_RL(&rl, "received packet on unassociated datapath "
771                              "port %"PRIu32, odp_in_port);
772                 dpif_flow_put(udpif->dpif, DPIF_FP_CREATE | DPIF_FP_MODIFY,
773                               dupcall->key, dupcall->key_len, NULL, 0, NULL, 0,
774                               NULL);
775             }
776             goto destroy_upcall;
777         }
778
779         type = classify_upcall(upcall);
780         if (type == MISS_UPCALL) {
781             upcall_init(upcall, &flow, packet, ofproto, dupcall, odp_in_port);
782             n_upcalls++;
783             continue;
784         }
785
786         switch (type) {
787         case SFLOW_UPCALL:
788             if (sflow) {
789                 union user_action_cookie cookie;
790
791                 memset(&cookie, 0, sizeof cookie);
792                 memcpy(&cookie, nl_attr_get(dupcall->userdata),
793                        sizeof cookie.sflow);
794                 dpif_sflow_received(sflow, packet, &flow, odp_in_port,
795                                     &cookie);
796             }
797             break;
798         case IPFIX_UPCALL:
799             if (ipfix) {
800                 dpif_ipfix_bridge_sample(ipfix, packet, &flow);
801             }
802             break;
803         case FLOW_SAMPLE_UPCALL:
804             if (ipfix) {
805                 union user_action_cookie cookie;
806
807                 memset(&cookie, 0, sizeof cookie);
808                 memcpy(&cookie, nl_attr_get(dupcall->userdata),
809                        sizeof cookie.flow_sample);
810
811                 /* The flow reflects exactly the contents of the packet.
812                  * Sample the packet using it. */
813                 dpif_ipfix_flow_sample(ipfix, packet, &flow,
814                                        cookie.flow_sample.collector_set_id,
815                                        cookie.flow_sample.probability,
816                                        cookie.flow_sample.obs_domain_id,
817                                        cookie.flow_sample.obs_point_id);
818             }
819             break;
820         case BAD_UPCALL:
821             break;
822         case MISS_UPCALL:
823             OVS_NOT_REACHED();
824         }
825
826         dpif_ipfix_unref(ipfix);
827         dpif_sflow_unref(sflow);
828
829 destroy_upcall:
830         ofpbuf_uninit(&upcall->dpif_upcall.packet);
831         ofpbuf_uninit(&upcall->upcall_buf);
832     }
833
834     return n_upcalls;
835 }
836
837 static void
838 handle_upcalls(struct handler *handler, struct upcall *upcalls,
839                size_t n_upcalls)
840 {
841     struct udpif *udpif = handler->udpif;
842     struct dpif_op *opsp[UPCALL_MAX_BATCH * 2];
843     struct dpif_op ops[UPCALL_MAX_BATCH * 2];
844     size_t n_ops, i;
845     unsigned int flow_limit;
846     bool fail_open, may_put;
847
848     atomic_read(&udpif->flow_limit, &flow_limit);
849     may_put = udpif_get_n_flows(udpif) < flow_limit;
850
851     /* Handle the packets individually in order of arrival.
852      *
853      *   - For SLOW_CFM, SLOW_LACP, SLOW_STP, and SLOW_BFD, translation is what
854      *     processes received packets for these protocols.
855      *
856      *   - For SLOW_CONTROLLER, translation sends the packet to the OpenFlow
857      *     controller.
858      *
859      * The loop fills 'ops' with an array of operations to execute in the
860      * datapath. */
861     fail_open = false;
862     n_ops = 0;
863     for (i = 0; i < n_upcalls; i++) {
864         struct upcall *upcall = &upcalls[i];
865         struct ofpbuf *packet = &upcall->dpif_upcall.packet;
866         struct dpif_op *op;
867
868         fail_open = fail_open || upcall->xout.fail_open;
869
870         if (upcall->xout.slow) {
871             struct xlate_in xin;
872
873             xlate_in_init(&xin, upcall->ofproto, &upcall->flow, NULL, 0, packet);
874             xlate_actions_for_side_effects(&xin);
875         }
876
877         if (upcall->flow.in_port.ofp_port
878             != vsp_realdev_to_vlandev(upcall->ofproto,
879                                       upcall->flow.in_port.ofp_port,
880                                       upcall->flow.vlan_tci)) {
881             /* This packet was received on a VLAN splinter port.  We
882              * added a VLAN to the packet to make the packet resemble
883              * the flow, but the actions were composed assuming that
884              * the packet contained no VLAN.  So, we must remove the
885              * VLAN header from the packet before trying to execute the
886              * actions. */
887             if (ofpbuf_size(&upcall->xout.odp_actions)) {
888                 eth_pop_vlan(packet);
889             }
890
891             /* Remove the flow vlan tags inserted by vlan splinter logic
892              * to ensure megaflow masks generated match the data path flow. */
893             upcall->flow.vlan_tci = 0;
894         }
895
896         /* Do not install a flow into the datapath if:
897          *
898          *    - The datapath already has too many flows.
899          *
900          *    - We received this packet via some flow installed in the kernel
901          *      already. */
902         if (may_put
903             && upcall->dpif_upcall.type == DPIF_UC_MISS) {
904             struct ofpbuf mask;
905             bool megaflow;
906
907             atomic_read(&enable_megaflows, &megaflow);
908             ofpbuf_use_stack(&mask, &upcall->mask_buf, sizeof upcall->mask_buf);
909             if (megaflow) {
910                 size_t max_mpls;
911                 bool recirc;
912
913                 recirc = ofproto_dpif_get_enable_recirc(upcall->ofproto);
914                 max_mpls = ofproto_dpif_get_max_mpls_depth(upcall->ofproto);
915                 odp_flow_key_from_mask(&mask, &upcall->xout.wc.masks,
916                                        &upcall->flow, UINT32_MAX, max_mpls,
917                                        recirc);
918             }
919
920             op = &ops[n_ops++];
921             op->type = DPIF_OP_FLOW_PUT;
922             op->u.flow_put.flags = DPIF_FP_CREATE | DPIF_FP_MODIFY;
923             op->u.flow_put.key = upcall->key;
924             op->u.flow_put.key_len = upcall->key_len;
925             op->u.flow_put.mask = ofpbuf_data(&mask);
926             op->u.flow_put.mask_len = ofpbuf_size(&mask);
927             op->u.flow_put.stats = NULL;
928
929             if (!upcall->xout.slow) {
930                 op->u.flow_put.actions = ofpbuf_data(&upcall->xout.odp_actions);
931                 op->u.flow_put.actions_len = ofpbuf_size(&upcall->xout.odp_actions);
932             } else {
933                 struct ofpbuf buf;
934
935                 ofpbuf_use_stack(&buf, upcall->slow_path_buf,
936                                  sizeof upcall->slow_path_buf);
937                 compose_slow_path(udpif, &upcall->xout, &upcall->flow,
938                                   upcall->odp_in_port, &buf);
939                 op->u.flow_put.actions = ofpbuf_data(&buf);
940                 op->u.flow_put.actions_len = ofpbuf_size(&buf);
941             }
942         }
943
944         if (ofpbuf_size(&upcall->xout.odp_actions)) {
945
946             op = &ops[n_ops++];
947             op->type = DPIF_OP_EXECUTE;
948             op->u.execute.packet = packet;
949             odp_key_to_pkt_metadata(upcall->key, upcall->key_len,
950                                     &op->u.execute.md);
951             op->u.execute.actions = ofpbuf_data(&upcall->xout.odp_actions);
952             op->u.execute.actions_len = ofpbuf_size(&upcall->xout.odp_actions);
953             op->u.execute.needs_help = (upcall->xout.slow & SLOW_ACTION) != 0;
954         }
955     }
956
957     /* Special case for fail-open mode.
958      *
959      * If we are in fail-open mode, but we are connected to a controller too,
960      * then we should send the packet up to the controller in the hope that it
961      * will try to set up a flow and thereby allow us to exit fail-open.
962      *
963      * See the top-level comment in fail-open.c for more information.
964      *
965      * Copy packets before they are modified by execution. */
966     if (fail_open) {
967         for (i = 0; i < n_upcalls; i++) {
968             struct upcall *upcall = &upcalls[i];
969             struct ofpbuf *packet = &upcall->dpif_upcall.packet;
970             struct ofproto_packet_in *pin;
971
972             pin = xmalloc(sizeof *pin);
973             pin->up.packet = xmemdup(ofpbuf_data(packet), ofpbuf_size(packet));
974             pin->up.packet_len = ofpbuf_size(packet);
975             pin->up.reason = OFPR_NO_MATCH;
976             pin->up.table_id = 0;
977             pin->up.cookie = OVS_BE64_MAX;
978             flow_get_metadata(&upcall->flow, &pin->up.fmd);
979             pin->send_len = 0; /* Not used for flow table misses. */
980             pin->miss_type = OFPROTO_PACKET_IN_NO_MISS;
981             ofproto_dpif_send_packet_in(upcall->ofproto, pin);
982         }
983     }
984
985     /* Execute batch. */
986     for (i = 0; i < n_ops; i++) {
987         opsp[i] = &ops[i];
988     }
989     dpif_operate(udpif->dpif, opsp, n_ops);
990 }
991
992 /* Must be called with udpif->ukeys[hash % udpif->n_revalidators].mutex. */
993 static struct udpif_key *
994 ukey_lookup__(struct udpif *udpif, const struct nlattr *key, size_t key_len,
995               uint32_t hash)
996 {
997     struct udpif_key *ukey;
998     struct hmap *hmap = &udpif->ukeys[hash % udpif->n_revalidators].hmap;
999
1000     HMAP_FOR_EACH_WITH_HASH (ukey, hmap_node, hash, hmap) {
1001         if (ukey->key_len == key_len && !memcmp(ukey->key, key, key_len)) {
1002             return ukey;
1003         }
1004     }
1005     return NULL;
1006 }
1007
1008 static struct udpif_key *
1009 ukey_lookup(struct udpif *udpif, const struct nlattr *key, size_t key_len,
1010             uint32_t hash)
1011 {
1012     struct udpif_key *ukey;
1013     uint32_t idx = hash % udpif->n_revalidators;
1014
1015     ovs_mutex_lock(&udpif->ukeys[idx].mutex);
1016     ukey = ukey_lookup__(udpif, key, key_len, hash);
1017     ovs_mutex_unlock(&udpif->ukeys[idx].mutex);
1018
1019     return ukey;
1020 }
1021
1022 static struct udpif_key *
1023 ukey_create(const struct nlattr *key, size_t key_len, long long int used)
1024 {
1025     struct udpif_key *ukey = xmalloc(sizeof *ukey);
1026     ovs_mutex_init(&ukey->mutex);
1027
1028     ukey->key = (struct nlattr *) &ukey->key_buf;
1029     memcpy(&ukey->key_buf, key, key_len);
1030     ukey->key_len = key_len;
1031
1032     ovs_mutex_lock(&ukey->mutex);
1033     ukey->mark = false;
1034     ukey->flow_exists = true;
1035     ukey->created = used ? used : time_msec();
1036     memset(&ukey->stats, 0, sizeof ukey->stats);
1037     ukey->xcache = NULL;
1038     ovs_mutex_unlock(&ukey->mutex);
1039
1040     return ukey;
1041 }
1042
1043 /* Checks for a ukey in 'udpif->ukeys' with the same 'ukey->key' and 'hash',
1044  * and inserts 'ukey' if it does not exist.
1045  *
1046  * Returns true if 'ukey' was inserted into 'udpif->ukeys', false otherwise. */
1047 static bool
1048 udpif_insert_ukey(struct udpif *udpif, struct udpif_key *ukey, uint32_t hash)
1049 {
1050     struct udpif_key *duplicate;
1051     uint32_t idx = hash % udpif->n_revalidators;
1052     bool ok;
1053
1054     ovs_mutex_lock(&udpif->ukeys[idx].mutex);
1055     duplicate = ukey_lookup__(udpif, ukey->key, ukey->key_len, hash);
1056     if (duplicate) {
1057         ok = false;
1058     } else {
1059         hmap_insert(&udpif->ukeys[idx].hmap, &ukey->hmap_node, hash);
1060         ok = true;
1061     }
1062     ovs_mutex_unlock(&udpif->ukeys[idx].mutex);
1063
1064     return ok;
1065 }
1066
1067 static void
1068 ukey_delete(struct revalidator *revalidator, struct udpif_key *ukey)
1069     OVS_NO_THREAD_SAFETY_ANALYSIS
1070 {
1071     if (revalidator) {
1072         hmap_remove(revalidator->ukeys, &ukey->hmap_node);
1073     }
1074     xlate_cache_delete(ukey->xcache);
1075     ovs_mutex_destroy(&ukey->mutex);
1076     free(ukey);
1077 }
1078
1079 static bool
1080 should_revalidate(uint64_t packets, long long int used)
1081 {
1082     long long int metric, now, duration;
1083
1084     /* Calculate the mean time between seeing these packets. If this
1085      * exceeds the threshold, then delete the flow rather than performing
1086      * costly revalidation for flows that aren't being hit frequently.
1087      *
1088      * This is targeted at situations where the dump_duration is high (~1s),
1089      * and revalidation is triggered by a call to udpif_revalidate(). In
1090      * these situations, revalidation of all flows causes fluctuations in the
1091      * flow_limit due to the interaction with the dump_duration and max_idle.
1092      * This tends to result in deletion of low-throughput flows anyway, so
1093      * skip the revalidation and just delete those flows. */
1094     packets = MAX(packets, 1);
1095     now = MAX(used, time_msec());
1096     duration = now - used;
1097     metric = duration / packets;
1098
1099     if (metric > 200) {
1100         return false;
1101     }
1102     return true;
1103 }
1104
1105 static bool
1106 revalidate_ukey(struct udpif *udpif, struct udpif_key *ukey,
1107                 const struct dpif_flow *f)
1108 {
1109     uint64_t slow_path_buf[128 / 8];
1110     struct xlate_out xout, *xoutp;
1111     struct netflow *netflow;
1112     struct ofproto_dpif *ofproto;
1113     struct dpif_flow_stats push;
1114     struct ofpbuf xout_actions;
1115     struct flow flow, dp_mask;
1116     uint32_t *dp32, *xout32;
1117     odp_port_t odp_in_port;
1118     struct xlate_in xin;
1119     long long int last_used;
1120     int error;
1121     size_t i;
1122     bool may_learn, ok;
1123
1124     ok = false;
1125     xoutp = NULL;
1126     netflow = NULL;
1127
1128     ovs_mutex_lock(&ukey->mutex);
1129     last_used = ukey->stats.used;
1130     push.used = f->stats.used;
1131     push.tcp_flags = f->stats.tcp_flags;
1132     push.n_packets = (f->stats.n_packets > ukey->stats.n_packets
1133                       ? f->stats.n_packets - ukey->stats.n_packets
1134                       : 0);
1135     push.n_bytes = (f->stats.n_bytes > ukey->stats.n_bytes
1136                     ? f->stats.n_bytes - ukey->stats.n_bytes
1137                     : 0);
1138
1139     if (!ukey->flow_exists) {
1140         /* Don't bother revalidating if the flow was already deleted. */
1141         goto exit;
1142     }
1143
1144     if (udpif->need_revalidate && last_used
1145         && !should_revalidate(push.n_packets, last_used)) {
1146         ok = false;
1147         goto exit;
1148     }
1149
1150     /* We will push the stats, so update the ukey stats cache. */
1151     ukey->stats = f->stats;
1152     if (!push.n_packets && !udpif->need_revalidate) {
1153         ok = true;
1154         goto exit;
1155     }
1156
1157     may_learn = push.n_packets > 0;
1158     if (ukey->xcache && !udpif->need_revalidate) {
1159         xlate_push_stats(ukey->xcache, may_learn, &push);
1160         ok = true;
1161         goto exit;
1162     }
1163
1164     error = xlate_receive(udpif->backer, NULL, ukey->key, ukey->key_len, &flow,
1165                           &ofproto, NULL, NULL, &netflow, &odp_in_port);
1166     if (error) {
1167         goto exit;
1168     }
1169
1170     if (udpif->need_revalidate) {
1171         xlate_cache_clear(ukey->xcache);
1172     }
1173     if (!ukey->xcache) {
1174         ukey->xcache = xlate_cache_new();
1175     }
1176
1177     xlate_in_init(&xin, ofproto, &flow, NULL, push.tcp_flags, NULL);
1178     xin.resubmit_stats = push.n_packets ? &push : NULL;
1179     xin.xcache = ukey->xcache;
1180     xin.may_learn = may_learn;
1181     xin.skip_wildcards = !udpif->need_revalidate;
1182     xlate_actions(&xin, &xout);
1183     xoutp = &xout;
1184
1185     if (!udpif->need_revalidate) {
1186         ok = true;
1187         goto exit;
1188     }
1189
1190     if (!xout.slow) {
1191         ofpbuf_use_const(&xout_actions, ofpbuf_data(&xout.odp_actions),
1192                          ofpbuf_size(&xout.odp_actions));
1193     } else {
1194         ofpbuf_use_stack(&xout_actions, slow_path_buf, sizeof slow_path_buf);
1195         compose_slow_path(udpif, &xout, &flow, odp_in_port, &xout_actions);
1196     }
1197
1198     if (f->actions_len != ofpbuf_size(&xout_actions)
1199         || memcmp(ofpbuf_data(&xout_actions), f->actions, f->actions_len)) {
1200         goto exit;
1201     }
1202
1203     if (odp_flow_key_to_mask(f->mask, f->mask_len, &dp_mask, &flow)
1204         == ODP_FIT_ERROR) {
1205         goto exit;
1206     }
1207
1208     /* Since the kernel is free to ignore wildcarded bits in the mask, we can't
1209      * directly check that the masks are the same.  Instead we check that the
1210      * mask in the kernel is more specific i.e. less wildcarded, than what
1211      * we've calculated here.  This guarantees we don't catch any packets we
1212      * shouldn't with the megaflow. */
1213     dp32 = (uint32_t *) &dp_mask;
1214     xout32 = (uint32_t *) &xout.wc.masks;
1215     for (i = 0; i < FLOW_U32S; i++) {
1216         if ((dp32[i] | xout32[i]) != dp32[i]) {
1217             goto exit;
1218         }
1219     }
1220     ok = true;
1221
1222 exit:
1223     ovs_mutex_unlock(&ukey->mutex);
1224     if (netflow) {
1225         if (!ok) {
1226             netflow_expire(netflow, &flow);
1227             netflow_flow_clear(netflow, &flow);
1228         }
1229         netflow_unref(netflow);
1230     }
1231     xlate_out_uninit(xoutp);
1232     return ok;
1233 }
1234
1235 struct dump_op {
1236     struct udpif_key *ukey;
1237     struct dpif_flow_stats stats; /* Stats for 'op'. */
1238     struct dpif_op op;            /* Flow del operation. */
1239 };
1240
1241 static void
1242 dump_op_init(struct dump_op *op, const struct nlattr *key, size_t key_len,
1243              struct udpif_key *ukey)
1244 {
1245     op->ukey = ukey;
1246     op->op.type = DPIF_OP_FLOW_DEL;
1247     op->op.u.flow_del.key = key;
1248     op->op.u.flow_del.key_len = key_len;
1249     op->op.u.flow_del.stats = &op->stats;
1250 }
1251
1252 static void
1253 push_dump_ops__(struct udpif *udpif, struct dump_op *ops, size_t n_ops)
1254 {
1255     struct dpif_op *opsp[REVALIDATE_MAX_BATCH];
1256     size_t i;
1257
1258     ovs_assert(n_ops <= REVALIDATE_MAX_BATCH);
1259     for (i = 0; i < n_ops; i++) {
1260         opsp[i] = &ops[i].op;
1261     }
1262     dpif_operate(udpif->dpif, opsp, n_ops);
1263
1264     for (i = 0; i < n_ops; i++) {
1265         struct dump_op *op = &ops[i];
1266         struct dpif_flow_stats *push, *stats, push_buf;
1267
1268         stats = op->op.u.flow_del.stats;
1269         if (op->ukey) {
1270             push = &push_buf;
1271             ovs_mutex_lock(&op->ukey->mutex);
1272             push->used = MAX(stats->used, op->ukey->stats.used);
1273             push->tcp_flags = stats->tcp_flags | op->ukey->stats.tcp_flags;
1274             push->n_packets = stats->n_packets - op->ukey->stats.n_packets;
1275             push->n_bytes = stats->n_bytes - op->ukey->stats.n_bytes;
1276             ovs_mutex_unlock(&op->ukey->mutex);
1277         } else {
1278             push = stats;
1279         }
1280
1281         if (push->n_packets || netflow_exists()) {
1282             struct ofproto_dpif *ofproto;
1283             struct netflow *netflow;
1284             struct flow flow;
1285             bool may_learn;
1286
1287             may_learn = push->n_packets > 0;
1288             if (op->ukey) {
1289                 ovs_mutex_lock(&op->ukey->mutex);
1290                 if (op->ukey->xcache) {
1291                     xlate_push_stats(op->ukey->xcache, may_learn, push);
1292                     ovs_mutex_unlock(&op->ukey->mutex);
1293                     continue;
1294                 }
1295                 ovs_mutex_unlock(&op->ukey->mutex);
1296             }
1297
1298             if (!xlate_receive(udpif->backer, NULL, op->op.u.flow_del.key,
1299                                op->op.u.flow_del.key_len, &flow, &ofproto,
1300                                NULL, NULL, &netflow, NULL)) {
1301                 struct xlate_in xin;
1302
1303                 xlate_in_init(&xin, ofproto, &flow, NULL, push->tcp_flags,
1304                               NULL);
1305                 xin.resubmit_stats = push->n_packets ? push : NULL;
1306                 xin.may_learn = may_learn;
1307                 xin.skip_wildcards = true;
1308                 xlate_actions_for_side_effects(&xin);
1309
1310                 if (netflow) {
1311                     netflow_expire(netflow, &flow);
1312                     netflow_flow_clear(netflow, &flow);
1313                     netflow_unref(netflow);
1314                 }
1315             }
1316         }
1317     }
1318 }
1319
1320 static void
1321 push_dump_ops(struct revalidator *revalidator,
1322               struct dump_op *ops, size_t n_ops)
1323 {
1324     int i;
1325
1326     push_dump_ops__(revalidator->udpif, ops, n_ops);
1327     for (i = 0; i < n_ops; i++) {
1328         ukey_delete(revalidator, ops[i].ukey);
1329     }
1330 }
1331
1332 static void
1333 revalidate(struct revalidator *revalidator)
1334 {
1335     struct udpif *udpif = revalidator->udpif;
1336     struct dpif_flow_dump_thread *dump_thread;
1337     unsigned int flow_limit;
1338
1339     atomic_read(&udpif->flow_limit, &flow_limit);
1340     dump_thread = dpif_flow_dump_thread_create(udpif->dump);
1341     for (;;) {
1342         struct dump_op ops[REVALIDATE_MAX_BATCH];
1343         int n_ops = 0;
1344
1345         struct dpif_flow flows[REVALIDATE_MAX_BATCH];
1346         const struct dpif_flow *f;
1347         int n_dumped;
1348
1349         long long int max_idle;
1350         long long int now;
1351         size_t n_dp_flows;
1352         bool kill_them_all;
1353
1354         n_dumped = dpif_flow_dump_next(dump_thread, flows, ARRAY_SIZE(flows));
1355         if (!n_dumped) {
1356             break;
1357         }
1358
1359         now = time_msec();
1360
1361         /* In normal operation we want to keep flows around until they have
1362          * been idle for 'ofproto_max_idle' milliseconds.  However:
1363          *
1364          *     - If the number of datapath flows climbs above 'flow_limit',
1365          *       drop that down to 100 ms to try to bring the flows down to
1366          *       the limit.
1367          *
1368          *     - If the number of datapath flows climbs above twice
1369          *       'flow_limit', delete all the datapath flows as an emergency
1370          *       measure.  (We reassess this condition for the next batch of
1371          *       datapath flows, so we will recover before all the flows are
1372          *       gone.) */
1373         n_dp_flows = udpif_get_n_flows(udpif);
1374         kill_them_all = n_dp_flows > flow_limit * 2;
1375         max_idle = n_dp_flows > flow_limit ? 100 : ofproto_max_idle;
1376
1377         for (f = flows; f < &flows[n_dumped]; f++) {
1378             long long int used = f->stats.used;
1379             uint32_t hash = hash_bytes(f->key, f->key_len, udpif->secret);
1380             struct udpif_key *ukey = ukey_lookup(udpif, f->key, f->key_len,
1381                                                  hash);
1382             bool mark;
1383
1384             if (ukey) {
1385                 bool already_dumped;
1386
1387                 ovs_mutex_lock(&ukey->mutex);
1388                 already_dumped = ukey->mark || !ukey->flow_exists;
1389                 if (!used) {
1390                     used = ukey->created;
1391                 }
1392                 ovs_mutex_unlock(&ukey->mutex);
1393
1394                 if (already_dumped) {
1395                     /* The flow has already been dumped. This can occasionally
1396                      * occur if the datapath is changed in the middle of a flow
1397                      * dump. Rather than perform the same work twice, skip the
1398                      * flow this time. */
1399                     COVERAGE_INC(upcall_duplicate_flow);
1400                     continue;
1401                 }
1402             }
1403
1404             if (kill_them_all || (used && used < now - max_idle)) {
1405                 mark = false;
1406             } else {
1407                 if (!ukey) {
1408                     ukey = ukey_create(f->key, f->key_len, used);
1409                     if (!udpif_insert_ukey(udpif, ukey, hash)) {
1410                         /* The same ukey has already been created. This means
1411                          * that another revalidator is processing this flow
1412                          * concurrently, so don't bother processing it. */
1413                         ukey_delete(NULL, ukey);
1414                         continue;
1415                     }
1416                 }
1417
1418                 mark = revalidate_ukey(udpif, ukey, f);
1419             }
1420
1421             if (ukey) {
1422                 ovs_mutex_lock(&ukey->mutex);
1423                 ukey->mark = ukey->flow_exists = mark;
1424                 ovs_mutex_unlock(&ukey->mutex);
1425             }
1426
1427             if (!mark) {
1428                 dump_op_init(&ops[n_ops++], f->key, f->key_len, ukey);
1429             }
1430         }
1431
1432         if (n_ops) {
1433             push_dump_ops__(udpif, ops, n_ops);
1434         }
1435     }
1436     dpif_flow_dump_thread_destroy(dump_thread);
1437 }
1438
1439 static void
1440 revalidator_sweep__(struct revalidator *revalidator, bool purge)
1441     OVS_NO_THREAD_SAFETY_ANALYSIS
1442 {
1443     struct dump_op ops[REVALIDATE_MAX_BATCH];
1444     struct udpif_key *ukey, *next;
1445     size_t n_ops;
1446
1447     n_ops = 0;
1448
1449     /* During garbage collection, this revalidator completely owns its ukeys
1450      * map, and therefore doesn't need to do any locking. */
1451     HMAP_FOR_EACH_SAFE (ukey, next, hmap_node, revalidator->ukeys) {
1452         if (!purge && ukey->mark) {
1453             ukey->mark = false;
1454         } else if (!ukey->flow_exists) {
1455             ukey_delete(revalidator, ukey);
1456         } else {
1457             struct dump_op *op = &ops[n_ops++];
1458
1459             /* If we have previously seen a flow in the datapath, but didn't
1460              * see it during the most recent dump, delete it. This allows us
1461              * to clean up the ukey and keep the statistics consistent. */
1462             dump_op_init(op, ukey->key, ukey->key_len, ukey);
1463             if (n_ops == REVALIDATE_MAX_BATCH) {
1464                 push_dump_ops(revalidator, ops, n_ops);
1465                 n_ops = 0;
1466             }
1467         }
1468     }
1469
1470     if (n_ops) {
1471         push_dump_ops(revalidator, ops, n_ops);
1472     }
1473 }
1474
1475 static void
1476 revalidator_sweep(struct revalidator *revalidator)
1477 {
1478     revalidator_sweep__(revalidator, false);
1479 }
1480
1481 static void
1482 revalidator_purge(struct revalidator *revalidator)
1483 {
1484     revalidator_sweep__(revalidator, true);
1485 }
1486 \f
1487 static void
1488 upcall_unixctl_show(struct unixctl_conn *conn, int argc OVS_UNUSED,
1489                     const char *argv[] OVS_UNUSED, void *aux OVS_UNUSED)
1490 {
1491     struct ds ds = DS_EMPTY_INITIALIZER;
1492     struct udpif *udpif;
1493
1494     LIST_FOR_EACH (udpif, list_node, &all_udpifs) {
1495         unsigned int flow_limit;
1496         size_t i;
1497
1498         atomic_read(&udpif->flow_limit, &flow_limit);
1499
1500         ds_put_format(&ds, "%s:\n", dpif_name(udpif->dpif));
1501         ds_put_format(&ds, "\tflows         : (current %lu)"
1502             " (avg %u) (max %u) (limit %u)\n", udpif_get_n_flows(udpif),
1503             udpif->avg_n_flows, udpif->max_n_flows, flow_limit);
1504         ds_put_format(&ds, "\tdump duration : %lldms\n", udpif->dump_duration);
1505
1506         ds_put_char(&ds, '\n');
1507         for (i = 0; i < n_revalidators; i++) {
1508             struct revalidator *revalidator = &udpif->revalidators[i];
1509
1510             ovs_mutex_lock(&udpif->ukeys[i].mutex);
1511             ds_put_format(&ds, "\t%u: (keys %"PRIuSIZE")\n",
1512                           revalidator->id, hmap_count(&udpif->ukeys[i].hmap));
1513             ovs_mutex_unlock(&udpif->ukeys[i].mutex);
1514         }
1515     }
1516
1517     unixctl_command_reply(conn, ds_cstr(&ds));
1518     ds_destroy(&ds);
1519 }
1520
1521 /* Disable using the megaflows.
1522  *
1523  * This command is only needed for advanced debugging, so it's not
1524  * documented in the man page. */
1525 static void
1526 upcall_unixctl_disable_megaflows(struct unixctl_conn *conn,
1527                                  int argc OVS_UNUSED,
1528                                  const char *argv[] OVS_UNUSED,
1529                                  void *aux OVS_UNUSED)
1530 {
1531     atomic_store(&enable_megaflows, false);
1532     udpif_flush_all_datapaths();
1533     unixctl_command_reply(conn, "megaflows disabled");
1534 }
1535
1536 /* Re-enable using megaflows.
1537  *
1538  * This command is only needed for advanced debugging, so it's not
1539  * documented in the man page. */
1540 static void
1541 upcall_unixctl_enable_megaflows(struct unixctl_conn *conn,
1542                                 int argc OVS_UNUSED,
1543                                 const char *argv[] OVS_UNUSED,
1544                                 void *aux OVS_UNUSED)
1545 {
1546     atomic_store(&enable_megaflows, true);
1547     udpif_flush_all_datapaths();
1548     unixctl_command_reply(conn, "megaflows enabled");
1549 }
1550
1551 /* Set the flow limit.
1552  *
1553  * This command is only needed for advanced debugging, so it's not
1554  * documented in the man page. */
1555 static void
1556 upcall_unixctl_set_flow_limit(struct unixctl_conn *conn,
1557                               int argc OVS_UNUSED,
1558                               const char *argv[] OVS_UNUSED,
1559                               void *aux OVS_UNUSED)
1560 {
1561     struct ds ds = DS_EMPTY_INITIALIZER;
1562     struct udpif *udpif;
1563     unsigned int flow_limit = atoi(argv[1]);
1564
1565     LIST_FOR_EACH (udpif, list_node, &all_udpifs) {
1566         atomic_store(&udpif->flow_limit, flow_limit);
1567     }
1568     ds_put_format(&ds, "set flow_limit to %u\n", flow_limit);
1569     unixctl_command_reply(conn, ds_cstr(&ds));
1570     ds_destroy(&ds);
1571 }