ofproto: Report controller rate limiting statistics in database.
[cascardo/ovs.git] / ofproto / ofproto-dpif-upcall.c
1 /* Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014 Nicira, Inc.
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.  */
14
15 #include <config.h>
16 #include "ofproto-dpif-upcall.h"
17
18 #include <errno.h>
19 #include <stdbool.h>
20 #include <inttypes.h>
21
22 #include "connmgr.h"
23 #include "coverage.h"
24 #include "dpif.h"
25 #include "dynamic-string.h"
26 #include "fail-open.h"
27 #include "guarded-list.h"
28 #include "latch.h"
29 #include "list.h"
30 #include "netlink.h"
31 #include "ofpbuf.h"
32 #include "ofproto-dpif-ipfix.h"
33 #include "ofproto-dpif-sflow.h"
34 #include "ofproto-dpif-xlate.h"
35 #include "ovs-rcu.h"
36 #include "packets.h"
37 #include "poll-loop.h"
38 #include "seq.h"
39 #include "unixctl.h"
40 #include "vlog.h"
41
42 #define MAX_QUEUE_LENGTH 512
43 #define UPCALL_MAX_BATCH 50
44 #define REVALIDATE_MAX_BATCH 50
45
46 VLOG_DEFINE_THIS_MODULE(ofproto_dpif_upcall);
47
48 COVERAGE_DEFINE(upcall_duplicate_flow);
49 COVERAGE_DEFINE(revalidate_missed_dp_flow);
50
51 /* A thread that reads upcalls from dpif, forwards each upcall's packet,
52  * and possibly sets up a kernel flow as a cache. */
53 struct handler {
54     struct udpif *udpif;               /* Parent udpif. */
55     pthread_t thread;                  /* Thread ID. */
56     uint32_t handler_id;               /* Handler id. */
57 };
58
59 /* A thread that processes datapath flows, updates OpenFlow statistics, and
60  * updates or removes them if necessary. */
61 struct revalidator {
62     struct udpif *udpif;               /* Parent udpif. */
63     pthread_t thread;                  /* Thread ID. */
64     unsigned int id;                   /* ovsthread_id_self(). */
65     struct hmap *ukeys;                /* Points into udpif->ukeys for this
66                                           revalidator. Used for GC phase. */
67 };
68
69 /* An upcall handler for ofproto_dpif.
70  *
71  * udpif keeps records of two kind of logically separate units:
72  *
73  * upcall handling
74  * ---------------
75  *
76  *    - An array of 'struct handler's for upcall handling and flow
77  *      installation.
78  *
79  * flow revalidation
80  * -----------------
81  *
82  *    - Revalidation threads which read the datapath flow table and maintains
83  *      them.
84  */
85 struct udpif {
86     struct list list_node;             /* In all_udpifs list. */
87
88     struct dpif *dpif;                 /* Datapath handle. */
89     struct dpif_backer *backer;        /* Opaque dpif_backer pointer. */
90
91     uint32_t secret;                   /* Random seed for upcall hash. */
92
93     struct handler *handlers;          /* Upcall handlers. */
94     size_t n_handlers;
95
96     struct revalidator *revalidators;  /* Flow revalidators. */
97     size_t n_revalidators;
98
99     struct latch exit_latch;           /* Tells child threads to exit. */
100
101     /* Revalidation. */
102     struct seq *reval_seq;             /* Incremented to force revalidation. */
103     bool need_revalidate;              /* As indicated by 'reval_seq'. */
104     bool reval_exit;                   /* Set by leader on 'exit_latch. */
105     struct ovs_barrier reval_barrier;  /* Barrier used by revalidators. */
106     struct dpif_flow_dump *dump;       /* DPIF flow dump state. */
107     long long int dump_duration;       /* Duration of the last flow dump. */
108     struct seq *dump_seq;              /* Increments each dump iteration. */
109
110     /* There are 'n_revalidators' ukey hmaps. Each revalidator retains a
111      * reference to one of these for garbage collection.
112      *
113      * During the flow dump phase, revalidators insert into these with a random
114      * distribution. During the garbage collection phase, each revalidator
115      * takes care of garbage collecting one of these hmaps. */
116     struct {
117         struct ovs_mutex mutex;        /* Guards the following. */
118         struct hmap hmap OVS_GUARDED;  /* Datapath flow keys. */
119     } *ukeys;
120
121     /* Datapath flow statistics. */
122     unsigned int max_n_flows;
123     unsigned int avg_n_flows;
124
125     /* Following fields are accessed and modified by different threads. */
126     atomic_uint flow_limit;            /* Datapath flow hard limit. */
127
128     /* n_flows_mutex prevents multiple threads updating these concurrently. */
129     atomic_ulong n_flows;           /* Number of flows in the datapath. */
130     atomic_llong n_flows_timestamp;    /* Last time n_flows was updated. */
131     struct ovs_mutex n_flows_mutex;
132
133     /* Following fields are accessed and modified only from the main thread. */
134     struct unixctl_conn **conns;       /* Connections waiting on dump_seq. */
135     uint64_t conn_seq;                 /* Corresponds to 'dump_seq' when
136                                           conns[n_conns-1] was stored. */
137     size_t n_conns;                    /* Number of connections waiting. */
138 };
139
140 enum upcall_type {
141     BAD_UPCALL,                 /* Some kind of bug somewhere. */
142     MISS_UPCALL,                /* A flow miss.  */
143     SFLOW_UPCALL,               /* sFlow sample. */
144     FLOW_SAMPLE_UPCALL,         /* Per-flow sampling. */
145     IPFIX_UPCALL                /* Per-bridge sampling. */
146 };
147
148 struct upcall {
149     struct ofproto_dpif *ofproto;
150
151     struct flow flow;
152     const struct nlattr *key;
153     size_t key_len;
154     enum dpif_upcall_type upcall_type;
155     struct dpif_flow_stats stats;
156     odp_port_t odp_in_port;
157
158     uint64_t slow_path_buf[128 / 8];
159     struct odputil_keybuf mask_buf;
160
161     struct xlate_out xout;
162
163     /* Raw upcall plus data for keeping track of the memory backing it. */
164     struct dpif_upcall dpif_upcall; /* As returned by dpif_recv() */
165     struct ofpbuf upcall_buf;       /* Owns some data in 'dpif_upcall'. */
166     uint64_t upcall_stub[512 / 8];  /* Buffer to reduce need for malloc(). */
167 };
168
169 /* 'udpif_key's are responsible for tracking the little bit of state udpif
170  * needs to do flow expiration which can't be pulled directly from the
171  * datapath.  They may be created or maintained by any revalidator during
172  * the dump phase, but are owned by a single revalidator, and are destroyed
173  * by that revalidator during the garbage-collection phase.
174  *
175  * While some elements of a udpif_key are protected by a mutex, the ukey itself
176  * is not.  Therefore it is not safe to destroy a udpif_key except when all
177  * revalidators are in garbage collection phase, or they aren't running. */
178 struct udpif_key {
179     struct hmap_node hmap_node;     /* In parent revalidator 'ukeys' map. */
180
181     /* These elements are read only once created, and therefore aren't
182      * protected by a mutex. */
183     const struct nlattr *key;      /* Datapath flow key. */
184     size_t key_len;                /* Length of 'key'. */
185
186     struct ovs_mutex mutex;                   /* Guards the following. */
187     struct dpif_flow_stats stats OVS_GUARDED; /* Last known stats.*/
188     long long int created OVS_GUARDED;        /* Estimate of creation time. */
189     uint64_t dump_seq OVS_GUARDED;            /* Tracks udpif->dump_seq. */
190     bool flow_exists OVS_GUARDED;             /* Ensures flows are only deleted
191                                                  once. */
192
193     struct xlate_cache *xcache OVS_GUARDED;   /* Cache for xlate entries that
194                                                * are affected by this ukey.
195                                                * Used for stats and learning.*/
196     struct odputil_keybuf key_buf;            /* Memory for 'key'. */
197 };
198
199 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
200 static struct list all_udpifs = LIST_INITIALIZER(&all_udpifs);
201
202 static size_t read_upcalls(struct handler *,
203                            struct upcall upcalls[UPCALL_MAX_BATCH]);
204 static void handle_upcalls(struct handler *, struct upcall *, size_t n_upcalls);
205 static void udpif_stop_threads(struct udpif *);
206 static void udpif_start_threads(struct udpif *, size_t n_handlers,
207                                 size_t n_revalidators);
208 static void *udpif_upcall_handler(void *);
209 static void *udpif_revalidator(void *);
210 static unsigned long udpif_get_n_flows(struct udpif *);
211 static void revalidate(struct revalidator *);
212 static void revalidator_sweep(struct revalidator *);
213 static void revalidator_purge(struct revalidator *);
214 static void upcall_unixctl_show(struct unixctl_conn *conn, int argc,
215                                 const char *argv[], void *aux);
216 static void upcall_unixctl_disable_megaflows(struct unixctl_conn *, int argc,
217                                              const char *argv[], void *aux);
218 static void upcall_unixctl_enable_megaflows(struct unixctl_conn *, int argc,
219                                             const char *argv[], void *aux);
220 static void upcall_unixctl_set_flow_limit(struct unixctl_conn *conn, int argc,
221                                             const char *argv[], void *aux);
222 static void upcall_unixctl_dump_wait(struct unixctl_conn *conn, int argc,
223                                      const char *argv[], void *aux);
224
225 static struct udpif_key *ukey_create(const struct nlattr *key, size_t key_len,
226                                      long long int used);
227 static struct udpif_key *ukey_lookup(struct udpif *udpif,
228                                      const struct nlattr *key, size_t key_len,
229                                      uint32_t hash);
230 static bool ukey_acquire(struct udpif *udpif, const struct nlattr *key,
231                          size_t key_len, long long int used,
232                          struct udpif_key **result);
233 static void ukey_delete(struct revalidator *, struct udpif_key *);
234
235 static atomic_bool enable_megaflows = ATOMIC_VAR_INIT(true);
236
237 struct udpif *
238 udpif_create(struct dpif_backer *backer, struct dpif *dpif)
239 {
240     static struct ovsthread_once once = OVSTHREAD_ONCE_INITIALIZER;
241     struct udpif *udpif = xzalloc(sizeof *udpif);
242
243     if (ovsthread_once_start(&once)) {
244         unixctl_command_register("upcall/show", "", 0, 0, upcall_unixctl_show,
245                                  NULL);
246         unixctl_command_register("upcall/disable-megaflows", "", 0, 0,
247                                  upcall_unixctl_disable_megaflows, NULL);
248         unixctl_command_register("upcall/enable-megaflows", "", 0, 0,
249                                  upcall_unixctl_enable_megaflows, NULL);
250         unixctl_command_register("upcall/set-flow-limit", "", 1, 1,
251                                  upcall_unixctl_set_flow_limit, NULL);
252         unixctl_command_register("revalidator/wait", "", 0, 0,
253                                  upcall_unixctl_dump_wait, NULL);
254         ovsthread_once_done(&once);
255     }
256
257     udpif->dpif = dpif;
258     udpif->backer = backer;
259     atomic_init(&udpif->flow_limit, MIN(ofproto_flow_limit, 10000));
260     udpif->secret = random_uint32();
261     udpif->reval_seq = seq_create();
262     udpif->dump_seq = seq_create();
263     latch_init(&udpif->exit_latch);
264     list_push_back(&all_udpifs, &udpif->list_node);
265     atomic_init(&udpif->n_flows, 0);
266     atomic_init(&udpif->n_flows_timestamp, LLONG_MIN);
267     ovs_mutex_init(&udpif->n_flows_mutex);
268
269     return udpif;
270 }
271
272 void
273 udpif_run(struct udpif *udpif)
274 {
275     if (udpif->conns && udpif->conn_seq != seq_read(udpif->dump_seq)) {
276         int i;
277
278         for (i = 0; i < udpif->n_conns; i++) {
279             unixctl_command_reply(udpif->conns[i], NULL);
280         }
281         free(udpif->conns);
282         udpif->conns = NULL;
283         udpif->n_conns = 0;
284     }
285 }
286
287 void
288 udpif_destroy(struct udpif *udpif)
289 {
290     udpif_stop_threads(udpif);
291
292     list_remove(&udpif->list_node);
293     latch_destroy(&udpif->exit_latch);
294     seq_destroy(udpif->reval_seq);
295     seq_destroy(udpif->dump_seq);
296     ovs_mutex_destroy(&udpif->n_flows_mutex);
297     free(udpif);
298 }
299
300 /* Stops the handler and revalidator threads, must be enclosed in
301  * ovsrcu quiescent state unless when destroying udpif. */
302 static void
303 udpif_stop_threads(struct udpif *udpif)
304 {
305     if (udpif && (udpif->n_handlers != 0 || udpif->n_revalidators != 0)) {
306         size_t i;
307
308         latch_set(&udpif->exit_latch);
309
310         for (i = 0; i < udpif->n_handlers; i++) {
311             struct handler *handler = &udpif->handlers[i];
312
313             xpthread_join(handler->thread, NULL);
314         }
315
316         for (i = 0; i < udpif->n_revalidators; i++) {
317             xpthread_join(udpif->revalidators[i].thread, NULL);
318         }
319
320         for (i = 0; i < udpif->n_revalidators; i++) {
321             struct revalidator *revalidator = &udpif->revalidators[i];
322
323             /* Delete ukeys, and delete all flows from the datapath to prevent
324              * double-counting stats. */
325             revalidator_purge(revalidator);
326
327             hmap_destroy(&udpif->ukeys[i].hmap);
328             ovs_mutex_destroy(&udpif->ukeys[i].mutex);
329         }
330
331         latch_poll(&udpif->exit_latch);
332
333         ovs_barrier_destroy(&udpif->reval_barrier);
334
335         free(udpif->revalidators);
336         udpif->revalidators = NULL;
337         udpif->n_revalidators = 0;
338
339         free(udpif->handlers);
340         udpif->handlers = NULL;
341         udpif->n_handlers = 0;
342
343         free(udpif->ukeys);
344         udpif->ukeys = NULL;
345     }
346 }
347
348 /* Starts the handler and revalidator threads, must be enclosed in
349  * ovsrcu quiescent state. */
350 static void
351 udpif_start_threads(struct udpif *udpif, size_t n_handlers,
352                     size_t n_revalidators)
353 {
354     if (udpif && n_handlers && n_revalidators) {
355         size_t i;
356
357         udpif->n_handlers = n_handlers;
358         udpif->n_revalidators = n_revalidators;
359
360         udpif->handlers = xzalloc(udpif->n_handlers * sizeof *udpif->handlers);
361         for (i = 0; i < udpif->n_handlers; i++) {
362             struct handler *handler = &udpif->handlers[i];
363
364             handler->udpif = udpif;
365             handler->handler_id = i;
366             handler->thread = ovs_thread_create(
367                 "handler", udpif_upcall_handler, handler);
368         }
369
370         ovs_barrier_init(&udpif->reval_barrier, udpif->n_revalidators);
371         udpif->reval_exit = false;
372         udpif->revalidators = xzalloc(udpif->n_revalidators
373                                       * sizeof *udpif->revalidators);
374         udpif->ukeys = xmalloc(sizeof *udpif->ukeys * n_revalidators);
375         for (i = 0; i < udpif->n_revalidators; i++) {
376             struct revalidator *revalidator = &udpif->revalidators[i];
377
378             revalidator->udpif = udpif;
379             hmap_init(&udpif->ukeys[i].hmap);
380             ovs_mutex_init(&udpif->ukeys[i].mutex);
381             revalidator->ukeys = &udpif->ukeys[i].hmap;
382             revalidator->thread = ovs_thread_create(
383                 "revalidator", udpif_revalidator, revalidator);
384         }
385     }
386 }
387
388 /* Tells 'udpif' how many threads it should use to handle upcalls.
389  * 'n_handlers' and 'n_revalidators' can never be zero.  'udpif''s
390  * datapath handle must have packet reception enabled before starting
391  * threads. */
392 void
393 udpif_set_threads(struct udpif *udpif, size_t n_handlers,
394                   size_t n_revalidators)
395 {
396     ovs_assert(udpif);
397     ovs_assert(n_handlers && n_revalidators);
398
399     ovsrcu_quiesce_start();
400     if (udpif->n_handlers != n_handlers
401         || udpif->n_revalidators != n_revalidators) {
402         udpif_stop_threads(udpif);
403     }
404
405     if (!udpif->handlers && !udpif->revalidators) {
406         int error;
407
408         error = dpif_handlers_set(udpif->dpif, n_handlers);
409         if (error) {
410             VLOG_ERR("failed to configure handlers in dpif %s: %s",
411                      dpif_name(udpif->dpif), ovs_strerror(error));
412             return;
413         }
414
415         udpif_start_threads(udpif, n_handlers, n_revalidators);
416     }
417     ovsrcu_quiesce_end();
418 }
419
420 /* Waits for all ongoing upcall translations to complete.  This ensures that
421  * there are no transient references to any removed ofprotos (or other
422  * objects).  In particular, this should be called after an ofproto is removed
423  * (e.g. via xlate_remove_ofproto()) but before it is destroyed. */
424 void
425 udpif_synchronize(struct udpif *udpif)
426 {
427     /* This is stronger than necessary.  It would be sufficient to ensure
428      * (somehow) that each handler and revalidator thread had passed through
429      * its main loop once. */
430     size_t n_handlers = udpif->n_handlers;
431     size_t n_revalidators = udpif->n_revalidators;
432
433     ovsrcu_quiesce_start();
434     udpif_stop_threads(udpif);
435     udpif_start_threads(udpif, n_handlers, n_revalidators);
436     ovsrcu_quiesce_end();
437 }
438
439 /* Notifies 'udpif' that something changed which may render previous
440  * xlate_actions() results invalid. */
441 void
442 udpif_revalidate(struct udpif *udpif)
443 {
444     seq_change(udpif->reval_seq);
445 }
446
447 /* Returns a seq which increments every time 'udpif' pulls stats from the
448  * datapath.  Callers can use this to get a sense of when might be a good time
449  * to do periodic work which relies on relatively up to date statistics. */
450 struct seq *
451 udpif_dump_seq(struct udpif *udpif)
452 {
453     return udpif->dump_seq;
454 }
455
456 void
457 udpif_get_memory_usage(struct udpif *udpif, struct simap *usage)
458 {
459     size_t i;
460
461     simap_increase(usage, "handlers", udpif->n_handlers);
462
463     simap_increase(usage, "revalidators", udpif->n_revalidators);
464     for (i = 0; i < udpif->n_revalidators; i++) {
465         ovs_mutex_lock(&udpif->ukeys[i].mutex);
466         simap_increase(usage, "udpif keys", hmap_count(&udpif->ukeys[i].hmap));
467         ovs_mutex_unlock(&udpif->ukeys[i].mutex);
468     }
469 }
470
471 /* Remove flows from a single datapath. */
472 void
473 udpif_flush(struct udpif *udpif)
474 {
475     size_t n_handlers, n_revalidators;
476
477     n_handlers = udpif->n_handlers;
478     n_revalidators = udpif->n_revalidators;
479
480     ovsrcu_quiesce_start();
481
482     udpif_stop_threads(udpif);
483     dpif_flow_flush(udpif->dpif);
484     udpif_start_threads(udpif, n_handlers, n_revalidators);
485
486     ovsrcu_quiesce_end();
487 }
488
489 /* Removes all flows from all datapaths. */
490 static void
491 udpif_flush_all_datapaths(void)
492 {
493     struct udpif *udpif;
494
495     LIST_FOR_EACH (udpif, list_node, &all_udpifs) {
496         udpif_flush(udpif);
497     }
498 }
499
500 \f
501 static unsigned long
502 udpif_get_n_flows(struct udpif *udpif)
503 {
504     long long int time, now;
505     unsigned long flow_count;
506
507     now = time_msec();
508     atomic_read(&udpif->n_flows_timestamp, &time);
509     if (time < now - 100 && !ovs_mutex_trylock(&udpif->n_flows_mutex)) {
510         struct dpif_dp_stats stats;
511
512         atomic_store(&udpif->n_flows_timestamp, now);
513         dpif_get_dp_stats(udpif->dpif, &stats);
514         flow_count = stats.n_flows;
515         atomic_store(&udpif->n_flows, flow_count);
516         ovs_mutex_unlock(&udpif->n_flows_mutex);
517     } else {
518         atomic_read(&udpif->n_flows, &flow_count);
519     }
520     return flow_count;
521 }
522
523 /* The upcall handler thread tries to read a batch of UPCALL_MAX_BATCH
524  * upcalls from dpif, processes the batch and installs corresponding flows
525  * in dpif. */
526 static void *
527 udpif_upcall_handler(void *arg)
528 {
529     struct handler *handler = arg;
530     struct udpif *udpif = handler->udpif;
531
532     while (!latch_is_set(&handler->udpif->exit_latch)) {
533         struct upcall upcalls[UPCALL_MAX_BATCH];
534         size_t n_upcalls, i;
535
536         n_upcalls = read_upcalls(handler, upcalls);
537         if (!n_upcalls) {
538             dpif_recv_wait(udpif->dpif, handler->handler_id);
539             latch_wait(&udpif->exit_latch);
540             poll_block();
541         } else {
542             handle_upcalls(handler, upcalls, n_upcalls);
543
544             for (i = 0; i < n_upcalls; i++) {
545                 xlate_out_uninit(&upcalls[i].xout);
546                 ofpbuf_uninit(&upcalls[i].dpif_upcall.packet);
547                 ofpbuf_uninit(&upcalls[i].upcall_buf);
548             }
549         }
550         coverage_clear();
551     }
552
553     return NULL;
554 }
555
556 static void *
557 udpif_revalidator(void *arg)
558 {
559     /* Used by all revalidators. */
560     struct revalidator *revalidator = arg;
561     struct udpif *udpif = revalidator->udpif;
562     bool leader = revalidator == &udpif->revalidators[0];
563
564     /* Used only by the leader. */
565     long long int start_time = 0;
566     uint64_t last_reval_seq = 0;
567     unsigned int flow_limit = 0;
568     size_t n_flows = 0;
569
570     revalidator->id = ovsthread_id_self();
571     for (;;) {
572         if (leader) {
573             uint64_t reval_seq;
574
575             reval_seq = seq_read(udpif->reval_seq);
576             udpif->need_revalidate = last_reval_seq != reval_seq;
577             last_reval_seq = reval_seq;
578
579             n_flows = udpif_get_n_flows(udpif);
580             udpif->max_n_flows = MAX(n_flows, udpif->max_n_flows);
581             udpif->avg_n_flows = (udpif->avg_n_flows + n_flows) / 2;
582
583             /* Only the leader checks the exit latch to prevent a race where
584              * some threads think it's true and exit and others think it's
585              * false and block indefinitely on the reval_barrier */
586             udpif->reval_exit = latch_is_set(&udpif->exit_latch);
587
588             start_time = time_msec();
589             if (!udpif->reval_exit) {
590                 udpif->dump = dpif_flow_dump_create(udpif->dpif);
591             }
592         }
593
594         /* Wait for the leader to start the flow dump. */
595         ovs_barrier_block(&udpif->reval_barrier);
596         if (udpif->reval_exit) {
597             break;
598         }
599         revalidate(revalidator);
600
601         /* Wait for all flows to have been dumped before we garbage collect. */
602         ovs_barrier_block(&udpif->reval_barrier);
603         revalidator_sweep(revalidator);
604
605         /* Wait for all revalidators to finish garbage collection. */
606         ovs_barrier_block(&udpif->reval_barrier);
607
608         if (leader) {
609             long long int duration;
610
611             dpif_flow_dump_destroy(udpif->dump);
612             seq_change(udpif->dump_seq);
613
614             duration = MAX(time_msec() - start_time, 1);
615             atomic_read(&udpif->flow_limit, &flow_limit);
616             udpif->dump_duration = duration;
617             if (duration > 2000) {
618                 flow_limit /= duration / 1000;
619             } else if (duration > 1300) {
620                 flow_limit = flow_limit * 3 / 4;
621             } else if (duration < 1000 && n_flows > 2000
622                        && flow_limit < n_flows * 1000 / duration) {
623                 flow_limit += 1000;
624             }
625             flow_limit = MIN(ofproto_flow_limit, MAX(flow_limit, 1000));
626             atomic_store(&udpif->flow_limit, flow_limit);
627
628             if (duration > 2000) {
629                 VLOG_INFO("Spent an unreasonably long %lldms dumping flows",
630                           duration);
631             }
632
633             poll_timer_wait_until(start_time + MIN(ofproto_max_idle, 500));
634             seq_wait(udpif->reval_seq, last_reval_seq);
635             latch_wait(&udpif->exit_latch);
636             poll_block();
637         }
638     }
639
640     return NULL;
641 }
642 \f
643 static enum upcall_type
644 classify_upcall(const struct upcall *upcall)
645 {
646     const struct dpif_upcall *dpif_upcall = &upcall->dpif_upcall;
647     union user_action_cookie cookie;
648     size_t userdata_len;
649
650     /* First look at the upcall type. */
651     switch (dpif_upcall->type) {
652     case DPIF_UC_ACTION:
653         break;
654
655     case DPIF_UC_MISS:
656         return MISS_UPCALL;
657
658     case DPIF_N_UC_TYPES:
659     default:
660         VLOG_WARN_RL(&rl, "upcall has unexpected type %"PRIu32,
661                      dpif_upcall->type);
662         return BAD_UPCALL;
663     }
664
665     /* "action" upcalls need a closer look. */
666     if (!dpif_upcall->userdata) {
667         VLOG_WARN_RL(&rl, "action upcall missing cookie");
668         return BAD_UPCALL;
669     }
670     userdata_len = nl_attr_get_size(dpif_upcall->userdata);
671     if (userdata_len < sizeof cookie.type
672         || userdata_len > sizeof cookie) {
673         VLOG_WARN_RL(&rl, "action upcall cookie has unexpected size %"PRIuSIZE,
674                      userdata_len);
675         return BAD_UPCALL;
676     }
677     memset(&cookie, 0, sizeof cookie);
678     memcpy(&cookie, nl_attr_get(dpif_upcall->userdata), userdata_len);
679     if (userdata_len == MAX(8, sizeof cookie.sflow)
680         && cookie.type == USER_ACTION_COOKIE_SFLOW) {
681         return SFLOW_UPCALL;
682     } else if (userdata_len == MAX(8, sizeof cookie.slow_path)
683                && cookie.type == USER_ACTION_COOKIE_SLOW_PATH) {
684         return MISS_UPCALL;
685     } else if (userdata_len == MAX(8, sizeof cookie.flow_sample)
686                && cookie.type == USER_ACTION_COOKIE_FLOW_SAMPLE) {
687         return FLOW_SAMPLE_UPCALL;
688     } else if (userdata_len == MAX(8, sizeof cookie.ipfix)
689                && cookie.type == USER_ACTION_COOKIE_IPFIX) {
690         return IPFIX_UPCALL;
691     } else {
692         VLOG_WARN_RL(&rl, "invalid user cookie of type %"PRIu16
693                      " and size %"PRIuSIZE, cookie.type, userdata_len);
694         return BAD_UPCALL;
695     }
696 }
697
698 /* Calculates slow path actions for 'xout'.  'buf' must statically be
699  * initialized with at least 128 bytes of space. */
700 static void
701 compose_slow_path(struct udpif *udpif, struct xlate_out *xout,
702                   struct flow *flow, odp_port_t odp_in_port,
703                   struct ofpbuf *buf)
704 {
705     union user_action_cookie cookie;
706     odp_port_t port;
707     uint32_t pid;
708
709     cookie.type = USER_ACTION_COOKIE_SLOW_PATH;
710     cookie.slow_path.unused = 0;
711     cookie.slow_path.reason = xout->slow;
712
713     port = xout->slow & (SLOW_CFM | SLOW_BFD | SLOW_LACP | SLOW_STP)
714         ? ODPP_NONE
715         : odp_in_port;
716     pid = dpif_port_get_pid(udpif->dpif, port, flow_hash_5tuple(flow, 0));
717     odp_put_userspace_action(pid, &cookie, sizeof cookie.slow_path, buf);
718 }
719
720 static void
721 upcall_init(struct upcall *upcall, struct flow *flow, struct ofpbuf *packet,
722             struct ofproto_dpif *ofproto, struct dpif_upcall *dupcall,
723             odp_port_t odp_in_port)
724 {
725     struct pkt_metadata md = pkt_metadata_from_flow(flow);
726     struct xlate_in xin;
727
728     flow_extract(packet, &md, &upcall->flow);
729
730     upcall->ofproto = ofproto;
731     upcall->key = dupcall->key;
732     upcall->key_len = dupcall->key_len;
733     upcall->upcall_type = dupcall->type;
734     upcall->stats.n_packets = 1;
735     upcall->stats.n_bytes = ofpbuf_size(packet);
736     upcall->stats.used = time_msec();
737     upcall->stats.tcp_flags = ntohs(upcall->flow.tcp_flags);
738     upcall->odp_in_port = odp_in_port;
739
740     xlate_in_init(&xin, upcall->ofproto, &upcall->flow, NULL,
741                   upcall->stats.tcp_flags, packet);
742
743     if (upcall->upcall_type == DPIF_UC_MISS) {
744         xin.resubmit_stats = &upcall->stats;
745     } else {
746         /* For non-miss upcalls, there's a flow in the datapath which this
747          * packet was accounted to.  Presumably the revalidators will deal
748          * with pushing its stats eventually. */
749     }
750
751     xlate_actions(&xin, &upcall->xout);
752 }
753
754 /* Reads and classifies upcalls.  Returns the number of upcalls successfully
755  * read. */
756 static size_t
757 read_upcalls(struct handler *handler,
758              struct upcall upcalls[UPCALL_MAX_BATCH])
759 {
760     struct udpif *udpif = handler->udpif;
761     size_t i;
762     size_t n_upcalls = 0;
763
764     /* Try reading UPCALL_MAX_BATCH upcalls from dpif. */
765     for (i = 0; i < UPCALL_MAX_BATCH; i++) {
766         struct upcall *upcall = &upcalls[n_upcalls];
767         struct dpif_upcall *dupcall;
768         struct ofpbuf *packet;
769         struct ofproto_dpif *ofproto;
770         struct dpif_sflow *sflow;
771         struct dpif_ipfix *ipfix;
772         struct flow flow;
773         enum upcall_type type;
774         odp_port_t odp_in_port;
775         int error;
776
777         ofpbuf_use_stub(&upcall->upcall_buf, upcall->upcall_stub,
778                         sizeof upcall->upcall_stub);
779         error = dpif_recv(udpif->dpif, handler->handler_id,
780                           &upcall->dpif_upcall, &upcall->upcall_buf);
781         if (error) {
782             ofpbuf_uninit(&upcall->upcall_buf);
783             break;
784         }
785
786         dupcall = &upcall->dpif_upcall;
787         packet = &dupcall->packet;
788         error = xlate_receive(udpif->backer, packet, dupcall->key,
789                               dupcall->key_len, &flow,
790                               &ofproto, &ipfix, &sflow, NULL, &odp_in_port);
791         if (error) {
792             if (error == ENODEV) {
793                 /* Received packet on datapath port for which we couldn't
794                  * associate an ofproto.  This can happen if a port is removed
795                  * while traffic is being received.  Print a rate-limited
796                  * message in case it happens frequently.  Install a drop flow
797                  * so that future packets of the flow are inexpensively dropped
798                  * in the kernel. */
799                 VLOG_INFO_RL(&rl, "received packet on unassociated datapath "
800                              "port %"PRIu32, odp_in_port);
801                 dpif_flow_put(udpif->dpif, DPIF_FP_CREATE | DPIF_FP_MODIFY,
802                               dupcall->key, dupcall->key_len, NULL, 0, NULL, 0,
803                               NULL);
804             }
805             goto destroy_upcall;
806         }
807
808         type = classify_upcall(upcall);
809         if (type == MISS_UPCALL) {
810             upcall_init(upcall, &flow, packet, ofproto, dupcall, odp_in_port);
811             n_upcalls++;
812             continue;
813         }
814
815         switch (type) {
816         case SFLOW_UPCALL:
817             if (sflow) {
818                 union user_action_cookie cookie;
819
820                 memset(&cookie, 0, sizeof cookie);
821                 memcpy(&cookie, nl_attr_get(dupcall->userdata),
822                        sizeof cookie.sflow);
823                 dpif_sflow_received(sflow, packet, &flow, odp_in_port,
824                                     &cookie);
825             }
826             break;
827         case IPFIX_UPCALL:
828             if (ipfix) {
829                 dpif_ipfix_bridge_sample(ipfix, packet, &flow);
830             }
831             break;
832         case FLOW_SAMPLE_UPCALL:
833             if (ipfix) {
834                 union user_action_cookie cookie;
835
836                 memset(&cookie, 0, sizeof cookie);
837                 memcpy(&cookie, nl_attr_get(dupcall->userdata),
838                        sizeof cookie.flow_sample);
839
840                 /* The flow reflects exactly the contents of the packet.
841                  * Sample the packet using it. */
842                 dpif_ipfix_flow_sample(ipfix, packet, &flow,
843                                        cookie.flow_sample.collector_set_id,
844                                        cookie.flow_sample.probability,
845                                        cookie.flow_sample.obs_domain_id,
846                                        cookie.flow_sample.obs_point_id);
847             }
848             break;
849         case BAD_UPCALL:
850             break;
851         case MISS_UPCALL:
852             OVS_NOT_REACHED();
853         }
854
855         dpif_ipfix_unref(ipfix);
856         dpif_sflow_unref(sflow);
857
858 destroy_upcall:
859         ofpbuf_uninit(&upcall->dpif_upcall.packet);
860         ofpbuf_uninit(&upcall->upcall_buf);
861     }
862
863     return n_upcalls;
864 }
865
866 static void
867 handle_upcalls(struct handler *handler, struct upcall *upcalls,
868                size_t n_upcalls)
869 {
870     struct udpif *udpif = handler->udpif;
871     struct dpif_op *opsp[UPCALL_MAX_BATCH * 2];
872     struct dpif_op ops[UPCALL_MAX_BATCH * 2];
873     size_t n_ops, i;
874     unsigned int flow_limit;
875     bool fail_open, may_put;
876
877     atomic_read(&udpif->flow_limit, &flow_limit);
878     may_put = udpif_get_n_flows(udpif) < flow_limit;
879
880     /* Handle the packets individually in order of arrival.
881      *
882      *   - For SLOW_CFM, SLOW_LACP, SLOW_STP, and SLOW_BFD, translation is what
883      *     processes received packets for these protocols.
884      *
885      *   - For SLOW_CONTROLLER, translation sends the packet to the OpenFlow
886      *     controller.
887      *
888      * The loop fills 'ops' with an array of operations to execute in the
889      * datapath. */
890     fail_open = false;
891     n_ops = 0;
892     for (i = 0; i < n_upcalls; i++) {
893         struct upcall *upcall = &upcalls[i];
894         struct ofpbuf *packet = &upcall->dpif_upcall.packet;
895         struct dpif_op *op;
896
897         fail_open = fail_open || upcall->xout.fail_open;
898
899         if (upcall->flow.in_port.ofp_port
900             != vsp_realdev_to_vlandev(upcall->ofproto,
901                                       upcall->flow.in_port.ofp_port,
902                                       upcall->flow.vlan_tci)) {
903             /* This packet was received on a VLAN splinter port.  We
904              * added a VLAN to the packet to make the packet resemble
905              * the flow, but the actions were composed assuming that
906              * the packet contained no VLAN.  So, we must remove the
907              * VLAN header from the packet before trying to execute the
908              * actions. */
909             if (ofpbuf_size(&upcall->xout.odp_actions)) {
910                 eth_pop_vlan(packet);
911             }
912
913             /* Remove the flow vlan tags inserted by vlan splinter logic
914              * to ensure megaflow masks generated match the data path flow. */
915             upcall->flow.vlan_tci = 0;
916         }
917
918         /* Do not install a flow into the datapath if:
919          *
920          *    - The datapath already has too many flows.
921          *
922          *    - We received this packet via some flow installed in the kernel
923          *      already. */
924         if (may_put
925             && upcall->dpif_upcall.type == DPIF_UC_MISS) {
926             struct ofpbuf mask;
927             bool megaflow;
928
929             atomic_read(&enable_megaflows, &megaflow);
930             ofpbuf_use_stack(&mask, &upcall->mask_buf, sizeof upcall->mask_buf);
931             if (megaflow) {
932                 size_t max_mpls;
933                 bool recirc;
934
935                 recirc = ofproto_dpif_get_enable_recirc(upcall->ofproto);
936                 max_mpls = ofproto_dpif_get_max_mpls_depth(upcall->ofproto);
937                 odp_flow_key_from_mask(&mask, &upcall->xout.wc.masks,
938                                        &upcall->flow, UINT32_MAX, max_mpls,
939                                        recirc);
940             }
941
942             op = &ops[n_ops++];
943             op->type = DPIF_OP_FLOW_PUT;
944             op->u.flow_put.flags = DPIF_FP_CREATE | DPIF_FP_MODIFY;
945             op->u.flow_put.key = upcall->key;
946             op->u.flow_put.key_len = upcall->key_len;
947             op->u.flow_put.mask = ofpbuf_data(&mask);
948             op->u.flow_put.mask_len = ofpbuf_size(&mask);
949             op->u.flow_put.stats = NULL;
950
951             if (!upcall->xout.slow) {
952                 op->u.flow_put.actions = ofpbuf_data(&upcall->xout.odp_actions);
953                 op->u.flow_put.actions_len = ofpbuf_size(&upcall->xout.odp_actions);
954             } else {
955                 struct ofpbuf buf;
956
957                 ofpbuf_use_stack(&buf, upcall->slow_path_buf,
958                                  sizeof upcall->slow_path_buf);
959                 compose_slow_path(udpif, &upcall->xout, &upcall->flow,
960                                   upcall->odp_in_port, &buf);
961                 op->u.flow_put.actions = ofpbuf_data(&buf);
962                 op->u.flow_put.actions_len = ofpbuf_size(&buf);
963             }
964         }
965
966         if (ofpbuf_size(&upcall->xout.odp_actions)) {
967
968             op = &ops[n_ops++];
969             op->type = DPIF_OP_EXECUTE;
970             op->u.execute.packet = packet;
971             odp_key_to_pkt_metadata(upcall->key, upcall->key_len,
972                                     &op->u.execute.md);
973             op->u.execute.actions = ofpbuf_data(&upcall->xout.odp_actions);
974             op->u.execute.actions_len = ofpbuf_size(&upcall->xout.odp_actions);
975             op->u.execute.needs_help = (upcall->xout.slow & SLOW_ACTION) != 0;
976         }
977     }
978
979     /* Special case for fail-open mode.
980      *
981      * If we are in fail-open mode, but we are connected to a controller too,
982      * then we should send the packet up to the controller in the hope that it
983      * will try to set up a flow and thereby allow us to exit fail-open.
984      *
985      * See the top-level comment in fail-open.c for more information.
986      *
987      * Copy packets before they are modified by execution. */
988     if (fail_open) {
989         for (i = 0; i < n_upcalls; i++) {
990             struct upcall *upcall = &upcalls[i];
991             struct ofpbuf *packet = &upcall->dpif_upcall.packet;
992             struct ofproto_packet_in *pin;
993
994             pin = xmalloc(sizeof *pin);
995             pin->up.packet = xmemdup(ofpbuf_data(packet), ofpbuf_size(packet));
996             pin->up.packet_len = ofpbuf_size(packet);
997             pin->up.reason = OFPR_NO_MATCH;
998             pin->up.table_id = 0;
999             pin->up.cookie = OVS_BE64_MAX;
1000             flow_get_metadata(&upcall->flow, &pin->up.fmd);
1001             pin->send_len = 0; /* Not used for flow table misses. */
1002             pin->miss_type = OFPROTO_PACKET_IN_NO_MISS;
1003             ofproto_dpif_send_packet_in(upcall->ofproto, pin);
1004         }
1005     }
1006
1007     /* Execute batch. */
1008     for (i = 0; i < n_ops; i++) {
1009         opsp[i] = &ops[i];
1010     }
1011     dpif_operate(udpif->dpif, opsp, n_ops);
1012 }
1013
1014 /* Must be called with udpif->ukeys[hash % udpif->n_revalidators].mutex. */
1015 static struct udpif_key *
1016 ukey_lookup(struct udpif *udpif, const struct nlattr *key, size_t key_len,
1017             uint32_t hash)
1018     OVS_REQUIRES(udpif->ukeys->mutex)
1019 {
1020     struct udpif_key *ukey;
1021     struct hmap *hmap = &udpif->ukeys[hash % udpif->n_revalidators].hmap;
1022
1023     HMAP_FOR_EACH_WITH_HASH (ukey, hmap_node, hash, hmap) {
1024         if (ukey->key_len == key_len && !memcmp(ukey->key, key, key_len)) {
1025             return ukey;
1026         }
1027     }
1028     return NULL;
1029 }
1030
1031 /* Creates a ukey for 'key' and 'key_len', returning it with ukey->mutex in
1032  * a locked state. */
1033 static struct udpif_key *
1034 ukey_create(const struct nlattr *key, size_t key_len, long long int used)
1035     OVS_NO_THREAD_SAFETY_ANALYSIS
1036 {
1037     struct udpif_key *ukey = xmalloc(sizeof *ukey);
1038
1039     ovs_mutex_init(&ukey->mutex);
1040     ukey->key = (struct nlattr *) &ukey->key_buf;
1041     memcpy(&ukey->key_buf, key, key_len);
1042     ukey->key_len = key_len;
1043
1044     ovs_mutex_lock(&ukey->mutex);
1045     ukey->dump_seq = 0;
1046     ukey->flow_exists = true;
1047     ukey->created = used ? used : time_msec();
1048     memset(&ukey->stats, 0, sizeof ukey->stats);
1049     ukey->xcache = NULL;
1050
1051     return ukey;
1052 }
1053
1054 /* Searches for a ukey in 'udpif->ukeys' that matches 'key' and 'key_len' and
1055  * attempts to lock the ukey. If the ukey does not exist, create it.
1056  *
1057  * Returns true on success, setting *result to the matching ukey and returning
1058  * it in a locked state. Otherwise, returns false and clears *result. */
1059 static bool
1060 ukey_acquire(struct udpif *udpif, const struct nlattr *key, size_t key_len,
1061              long long int used, struct udpif_key **result)
1062     OVS_TRY_LOCK(true, (*result)->mutex)
1063 {
1064     struct udpif_key *ukey;
1065     uint32_t hash, idx;
1066     bool locked = false;
1067
1068     hash = hash_bytes(key, key_len, udpif->secret);
1069     idx = hash % udpif->n_revalidators;
1070
1071     ovs_mutex_lock(&udpif->ukeys[idx].mutex);
1072     ukey = ukey_lookup(udpif, key, key_len, hash);
1073     if (!ukey) {
1074         ukey = ukey_create(key, key_len, used);
1075         hmap_insert(&udpif->ukeys[idx].hmap, &ukey->hmap_node, hash);
1076         locked = true;
1077     } else if (!ovs_mutex_trylock(&ukey->mutex)) {
1078         locked = true;
1079     }
1080     ovs_mutex_unlock(&udpif->ukeys[idx].mutex);
1081
1082     if (locked) {
1083         *result = ukey;
1084     } else {
1085         *result = NULL;
1086     }
1087     return locked;
1088 }
1089
1090 static void
1091 ukey_delete(struct revalidator *revalidator, struct udpif_key *ukey)
1092     OVS_NO_THREAD_SAFETY_ANALYSIS
1093 {
1094     if (revalidator) {
1095         hmap_remove(revalidator->ukeys, &ukey->hmap_node);
1096     }
1097     xlate_cache_delete(ukey->xcache);
1098     ovs_mutex_destroy(&ukey->mutex);
1099     free(ukey);
1100 }
1101
1102 static bool
1103 should_revalidate(const struct udpif *udpif, uint64_t packets,
1104                   long long int used)
1105 {
1106     long long int metric, now, duration;
1107
1108     if (udpif->dump_duration < 200) {
1109         /* We are likely to handle full revalidation for the flows. */
1110         return true;
1111     }
1112
1113     /* Calculate the mean time between seeing these packets. If this
1114      * exceeds the threshold, then delete the flow rather than performing
1115      * costly revalidation for flows that aren't being hit frequently.
1116      *
1117      * This is targeted at situations where the dump_duration is high (~1s),
1118      * and revalidation is triggered by a call to udpif_revalidate(). In
1119      * these situations, revalidation of all flows causes fluctuations in the
1120      * flow_limit due to the interaction with the dump_duration and max_idle.
1121      * This tends to result in deletion of low-throughput flows anyway, so
1122      * skip the revalidation and just delete those flows. */
1123     packets = MAX(packets, 1);
1124     now = MAX(used, time_msec());
1125     duration = now - used;
1126     metric = duration / packets;
1127
1128     if (metric < 200) {
1129         /* The flow is receiving more than ~5pps, so keep it. */
1130         return true;
1131     }
1132     return false;
1133 }
1134
1135 static bool
1136 revalidate_ukey(struct udpif *udpif, struct udpif_key *ukey,
1137                 const struct dpif_flow *f)
1138     OVS_REQUIRES(ukey->mutex)
1139 {
1140     uint64_t slow_path_buf[128 / 8];
1141     struct xlate_out xout, *xoutp;
1142     struct netflow *netflow;
1143     struct ofproto_dpif *ofproto;
1144     struct dpif_flow_stats push;
1145     struct ofpbuf xout_actions;
1146     struct flow flow, dp_mask;
1147     uint32_t *dp32, *xout32;
1148     odp_port_t odp_in_port;
1149     struct xlate_in xin;
1150     long long int last_used;
1151     int error;
1152     size_t i;
1153     bool may_learn, ok;
1154
1155     ok = false;
1156     xoutp = NULL;
1157     netflow = NULL;
1158
1159     last_used = ukey->stats.used;
1160     push.used = f->stats.used;
1161     push.tcp_flags = f->stats.tcp_flags;
1162     push.n_packets = (f->stats.n_packets > ukey->stats.n_packets
1163                       ? f->stats.n_packets - ukey->stats.n_packets
1164                       : 0);
1165     push.n_bytes = (f->stats.n_bytes > ukey->stats.n_bytes
1166                     ? f->stats.n_bytes - ukey->stats.n_bytes
1167                     : 0);
1168
1169     if (udpif->need_revalidate && last_used
1170         && !should_revalidate(udpif, push.n_packets, last_used)) {
1171         ok = false;
1172         goto exit;
1173     }
1174
1175     /* We will push the stats, so update the ukey stats cache. */
1176     ukey->stats = f->stats;
1177     if (!push.n_packets && !udpif->need_revalidate) {
1178         ok = true;
1179         goto exit;
1180     }
1181
1182     may_learn = push.n_packets > 0;
1183     if (ukey->xcache && !udpif->need_revalidate) {
1184         xlate_push_stats(ukey->xcache, may_learn, &push);
1185         ok = true;
1186         goto exit;
1187     }
1188
1189     error = xlate_receive(udpif->backer, NULL, ukey->key, ukey->key_len, &flow,
1190                           &ofproto, NULL, NULL, &netflow, &odp_in_port);
1191     if (error) {
1192         goto exit;
1193     }
1194
1195     if (udpif->need_revalidate) {
1196         xlate_cache_clear(ukey->xcache);
1197     }
1198     if (!ukey->xcache) {
1199         ukey->xcache = xlate_cache_new();
1200     }
1201
1202     xlate_in_init(&xin, ofproto, &flow, NULL, push.tcp_flags, NULL);
1203     xin.resubmit_stats = push.n_packets ? &push : NULL;
1204     xin.xcache = ukey->xcache;
1205     xin.may_learn = may_learn;
1206     xin.skip_wildcards = !udpif->need_revalidate;
1207     xlate_actions(&xin, &xout);
1208     xoutp = &xout;
1209
1210     if (!udpif->need_revalidate) {
1211         ok = true;
1212         goto exit;
1213     }
1214
1215     if (!xout.slow) {
1216         ofpbuf_use_const(&xout_actions, ofpbuf_data(&xout.odp_actions),
1217                          ofpbuf_size(&xout.odp_actions));
1218     } else {
1219         ofpbuf_use_stack(&xout_actions, slow_path_buf, sizeof slow_path_buf);
1220         compose_slow_path(udpif, &xout, &flow, odp_in_port, &xout_actions);
1221     }
1222
1223     if (f->actions_len != ofpbuf_size(&xout_actions)
1224         || memcmp(ofpbuf_data(&xout_actions), f->actions, f->actions_len)) {
1225         goto exit;
1226     }
1227
1228     if (odp_flow_key_to_mask(f->mask, f->mask_len, &dp_mask, &flow)
1229         == ODP_FIT_ERROR) {
1230         goto exit;
1231     }
1232
1233     /* Since the kernel is free to ignore wildcarded bits in the mask, we can't
1234      * directly check that the masks are the same.  Instead we check that the
1235      * mask in the kernel is more specific i.e. less wildcarded, than what
1236      * we've calculated here.  This guarantees we don't catch any packets we
1237      * shouldn't with the megaflow. */
1238     dp32 = (uint32_t *) &dp_mask;
1239     xout32 = (uint32_t *) &xout.wc.masks;
1240     for (i = 0; i < FLOW_U32S; i++) {
1241         if ((dp32[i] | xout32[i]) != dp32[i]) {
1242             goto exit;
1243         }
1244     }
1245     ok = true;
1246
1247 exit:
1248     if (netflow) {
1249         if (!ok) {
1250             netflow_flow_clear(netflow, &flow);
1251         }
1252         netflow_unref(netflow);
1253     }
1254     xlate_out_uninit(xoutp);
1255     return ok;
1256 }
1257
1258 struct dump_op {
1259     struct udpif_key *ukey;
1260     struct dpif_flow_stats stats; /* Stats for 'op'. */
1261     struct dpif_op op;            /* Flow del operation. */
1262 };
1263
1264 static void
1265 dump_op_init(struct dump_op *op, const struct nlattr *key, size_t key_len,
1266              struct udpif_key *ukey)
1267 {
1268     op->ukey = ukey;
1269     op->op.type = DPIF_OP_FLOW_DEL;
1270     op->op.u.flow_del.key = key;
1271     op->op.u.flow_del.key_len = key_len;
1272     op->op.u.flow_del.stats = &op->stats;
1273 }
1274
1275 static void
1276 push_dump_ops__(struct udpif *udpif, struct dump_op *ops, size_t n_ops)
1277 {
1278     struct dpif_op *opsp[REVALIDATE_MAX_BATCH];
1279     size_t i;
1280
1281     ovs_assert(n_ops <= REVALIDATE_MAX_BATCH);
1282     for (i = 0; i < n_ops; i++) {
1283         opsp[i] = &ops[i].op;
1284     }
1285     dpif_operate(udpif->dpif, opsp, n_ops);
1286
1287     for (i = 0; i < n_ops; i++) {
1288         struct dump_op *op = &ops[i];
1289         struct dpif_flow_stats *push, *stats, push_buf;
1290
1291         stats = op->op.u.flow_del.stats;
1292         push = &push_buf;
1293
1294         ovs_mutex_lock(&op->ukey->mutex);
1295         push->used = MAX(stats->used, op->ukey->stats.used);
1296         push->tcp_flags = stats->tcp_flags | op->ukey->stats.tcp_flags;
1297         push->n_packets = stats->n_packets - op->ukey->stats.n_packets;
1298         push->n_bytes = stats->n_bytes - op->ukey->stats.n_bytes;
1299         ovs_mutex_unlock(&op->ukey->mutex);
1300
1301         if (push->n_packets || netflow_exists()) {
1302             struct ofproto_dpif *ofproto;
1303             struct netflow *netflow;
1304             struct flow flow;
1305             bool may_learn;
1306             int error;
1307
1308             may_learn = push->n_packets > 0;
1309             ovs_mutex_lock(&op->ukey->mutex);
1310             if (op->ukey->xcache) {
1311                 xlate_push_stats(op->ukey->xcache, may_learn, push);
1312                 ovs_mutex_unlock(&op->ukey->mutex);
1313                 continue;
1314             }
1315             ovs_mutex_unlock(&op->ukey->mutex);
1316
1317             error = xlate_receive(udpif->backer, NULL, op->op.u.flow_del.key,
1318                                   op->op.u.flow_del.key_len, &flow, &ofproto,
1319                                   NULL, NULL, &netflow, NULL);
1320             if (!error) {
1321                 struct xlate_in xin;
1322
1323                 xlate_in_init(&xin, ofproto, &flow, NULL, push->tcp_flags,
1324                               NULL);
1325                 xin.resubmit_stats = push->n_packets ? push : NULL;
1326                 xin.may_learn = may_learn;
1327                 xin.skip_wildcards = true;
1328                 xlate_actions_for_side_effects(&xin);
1329
1330                 if (netflow) {
1331                     netflow_flow_clear(netflow, &flow);
1332                     netflow_unref(netflow);
1333                 }
1334             }
1335         }
1336     }
1337 }
1338
1339 static void
1340 push_dump_ops(struct revalidator *revalidator,
1341               struct dump_op *ops, size_t n_ops)
1342 {
1343     int i;
1344
1345     push_dump_ops__(revalidator->udpif, ops, n_ops);
1346     for (i = 0; i < n_ops; i++) {
1347         ukey_delete(revalidator, ops[i].ukey);
1348     }
1349 }
1350
1351 static void
1352 revalidate(struct revalidator *revalidator)
1353 {
1354     struct udpif *udpif = revalidator->udpif;
1355     struct dpif_flow_dump_thread *dump_thread;
1356     uint64_t dump_seq;
1357     unsigned int flow_limit;
1358
1359     dump_seq = seq_read(udpif->dump_seq);
1360     atomic_read(&udpif->flow_limit, &flow_limit);
1361     dump_thread = dpif_flow_dump_thread_create(udpif->dump);
1362     for (;;) {
1363         struct dump_op ops[REVALIDATE_MAX_BATCH];
1364         int n_ops = 0;
1365
1366         struct dpif_flow flows[REVALIDATE_MAX_BATCH];
1367         const struct dpif_flow *f;
1368         int n_dumped;
1369
1370         long long int max_idle;
1371         long long int now;
1372         size_t n_dp_flows;
1373         bool kill_them_all;
1374
1375         n_dumped = dpif_flow_dump_next(dump_thread, flows, ARRAY_SIZE(flows));
1376         if (!n_dumped) {
1377             break;
1378         }
1379
1380         now = time_msec();
1381
1382         /* In normal operation we want to keep flows around until they have
1383          * been idle for 'ofproto_max_idle' milliseconds.  However:
1384          *
1385          *     - If the number of datapath flows climbs above 'flow_limit',
1386          *       drop that down to 100 ms to try to bring the flows down to
1387          *       the limit.
1388          *
1389          *     - If the number of datapath flows climbs above twice
1390          *       'flow_limit', delete all the datapath flows as an emergency
1391          *       measure.  (We reassess this condition for the next batch of
1392          *       datapath flows, so we will recover before all the flows are
1393          *       gone.) */
1394         n_dp_flows = udpif_get_n_flows(udpif);
1395         kill_them_all = n_dp_flows > flow_limit * 2;
1396         max_idle = n_dp_flows > flow_limit ? 100 : ofproto_max_idle;
1397
1398         for (f = flows; f < &flows[n_dumped]; f++) {
1399             long long int used = f->stats.used;
1400             struct udpif_key *ukey;
1401             bool already_dumped, keep;
1402
1403             if (!ukey_acquire(udpif, f->key, f->key_len, used, &ukey)) {
1404                 /* We couldn't acquire the ukey. This means that
1405                  * another revalidator is processing this flow
1406                  * concurrently, so don't bother processing it. */
1407                 COVERAGE_INC(upcall_duplicate_flow);
1408                 continue;
1409             }
1410
1411             already_dumped = ukey->dump_seq == dump_seq;
1412             if (already_dumped) {
1413                 /* The flow has already been dumped and handled by another
1414                  * revalidator during this flow dump operation. Skip it. */
1415                 COVERAGE_INC(upcall_duplicate_flow);
1416                 ovs_mutex_unlock(&ukey->mutex);
1417                 continue;
1418             }
1419
1420             if (!used) {
1421                 used = ukey->created;
1422             }
1423             if (kill_them_all || (used && used < now - max_idle)) {
1424                 keep = false;
1425             } else {
1426                 keep = revalidate_ukey(udpif, ukey, f);
1427             }
1428             ukey->dump_seq = dump_seq;
1429             ukey->flow_exists = keep;
1430
1431             if (!keep) {
1432                 dump_op_init(&ops[n_ops++], f->key, f->key_len, ukey);
1433             }
1434             ovs_mutex_unlock(&ukey->mutex);
1435         }
1436
1437         if (n_ops) {
1438             push_dump_ops__(udpif, ops, n_ops);
1439         }
1440     }
1441     dpif_flow_dump_thread_destroy(dump_thread);
1442 }
1443
1444 /* Called with exclusive access to 'revalidator' and 'ukey'. */
1445 static bool
1446 handle_missed_revalidation(struct revalidator *revalidator,
1447                            struct udpif_key *ukey)
1448     OVS_NO_THREAD_SAFETY_ANALYSIS
1449 {
1450     struct udpif *udpif = revalidator->udpif;
1451     struct dpif_flow flow;
1452     struct ofpbuf *buf;
1453     bool keep = false;
1454
1455     COVERAGE_INC(revalidate_missed_dp_flow);
1456
1457     if (!dpif_flow_get(udpif->dpif, ukey->key, ukey->key_len, &buf, &flow)) {
1458         keep = revalidate_ukey(udpif, ukey, &flow);
1459         ofpbuf_delete(buf);
1460     }
1461
1462     return keep;
1463 }
1464
1465 static void
1466 revalidator_sweep__(struct revalidator *revalidator, bool purge)
1467     OVS_NO_THREAD_SAFETY_ANALYSIS
1468 {
1469     struct dump_op ops[REVALIDATE_MAX_BATCH];
1470     struct udpif_key *ukey, *next;
1471     size_t n_ops;
1472     uint64_t dump_seq;
1473
1474     n_ops = 0;
1475     dump_seq = seq_read(revalidator->udpif->dump_seq);
1476
1477     /* During garbage collection, this revalidator completely owns its ukeys
1478      * map, and therefore doesn't need to do any locking. */
1479     HMAP_FOR_EACH_SAFE (ukey, next, hmap_node, revalidator->ukeys) {
1480         if (ukey->flow_exists
1481             && (purge
1482                 || (ukey->dump_seq != dump_seq
1483                     && revalidator->udpif->need_revalidate
1484                     && !handle_missed_revalidation(revalidator, ukey)))) {
1485             struct dump_op *op = &ops[n_ops++];
1486
1487             dump_op_init(op, ukey->key, ukey->key_len, ukey);
1488             if (n_ops == REVALIDATE_MAX_BATCH) {
1489                 push_dump_ops(revalidator, ops, n_ops);
1490                 n_ops = 0;
1491             }
1492         } else if (!ukey->flow_exists) {
1493             ukey_delete(revalidator, ukey);
1494         }
1495     }
1496
1497     if (n_ops) {
1498         push_dump_ops(revalidator, ops, n_ops);
1499     }
1500 }
1501
1502 static void
1503 revalidator_sweep(struct revalidator *revalidator)
1504 {
1505     revalidator_sweep__(revalidator, false);
1506 }
1507
1508 static void
1509 revalidator_purge(struct revalidator *revalidator)
1510 {
1511     revalidator_sweep__(revalidator, true);
1512 }
1513 \f
1514 static void
1515 upcall_unixctl_show(struct unixctl_conn *conn, int argc OVS_UNUSED,
1516                     const char *argv[] OVS_UNUSED, void *aux OVS_UNUSED)
1517 {
1518     struct ds ds = DS_EMPTY_INITIALIZER;
1519     struct udpif *udpif;
1520
1521     LIST_FOR_EACH (udpif, list_node, &all_udpifs) {
1522         unsigned int flow_limit;
1523         size_t i;
1524
1525         atomic_read(&udpif->flow_limit, &flow_limit);
1526
1527         ds_put_format(&ds, "%s:\n", dpif_name(udpif->dpif));
1528         ds_put_format(&ds, "\tflows         : (current %lu)"
1529             " (avg %u) (max %u) (limit %u)\n", udpif_get_n_flows(udpif),
1530             udpif->avg_n_flows, udpif->max_n_flows, flow_limit);
1531         ds_put_format(&ds, "\tdump duration : %lldms\n", udpif->dump_duration);
1532
1533         ds_put_char(&ds, '\n');
1534         for (i = 0; i < n_revalidators; i++) {
1535             struct revalidator *revalidator = &udpif->revalidators[i];
1536
1537             ovs_mutex_lock(&udpif->ukeys[i].mutex);
1538             ds_put_format(&ds, "\t%u: (keys %"PRIuSIZE")\n",
1539                           revalidator->id, hmap_count(&udpif->ukeys[i].hmap));
1540             ovs_mutex_unlock(&udpif->ukeys[i].mutex);
1541         }
1542     }
1543
1544     unixctl_command_reply(conn, ds_cstr(&ds));
1545     ds_destroy(&ds);
1546 }
1547
1548 /* Disable using the megaflows.
1549  *
1550  * This command is only needed for advanced debugging, so it's not
1551  * documented in the man page. */
1552 static void
1553 upcall_unixctl_disable_megaflows(struct unixctl_conn *conn,
1554                                  int argc OVS_UNUSED,
1555                                  const char *argv[] OVS_UNUSED,
1556                                  void *aux OVS_UNUSED)
1557 {
1558     atomic_store(&enable_megaflows, false);
1559     udpif_flush_all_datapaths();
1560     unixctl_command_reply(conn, "megaflows disabled");
1561 }
1562
1563 /* Re-enable using megaflows.
1564  *
1565  * This command is only needed for advanced debugging, so it's not
1566  * documented in the man page. */
1567 static void
1568 upcall_unixctl_enable_megaflows(struct unixctl_conn *conn,
1569                                 int argc OVS_UNUSED,
1570                                 const char *argv[] OVS_UNUSED,
1571                                 void *aux OVS_UNUSED)
1572 {
1573     atomic_store(&enable_megaflows, true);
1574     udpif_flush_all_datapaths();
1575     unixctl_command_reply(conn, "megaflows enabled");
1576 }
1577
1578 /* Set the flow limit.
1579  *
1580  * This command is only needed for advanced debugging, so it's not
1581  * documented in the man page. */
1582 static void
1583 upcall_unixctl_set_flow_limit(struct unixctl_conn *conn,
1584                               int argc OVS_UNUSED,
1585                               const char *argv[] OVS_UNUSED,
1586                               void *aux OVS_UNUSED)
1587 {
1588     struct ds ds = DS_EMPTY_INITIALIZER;
1589     struct udpif *udpif;
1590     unsigned int flow_limit = atoi(argv[1]);
1591
1592     LIST_FOR_EACH (udpif, list_node, &all_udpifs) {
1593         atomic_store(&udpif->flow_limit, flow_limit);
1594     }
1595     ds_put_format(&ds, "set flow_limit to %u\n", flow_limit);
1596     unixctl_command_reply(conn, ds_cstr(&ds));
1597     ds_destroy(&ds);
1598 }
1599
1600 static void
1601 upcall_unixctl_dump_wait(struct unixctl_conn *conn,
1602                          int argc OVS_UNUSED,
1603                          const char *argv[] OVS_UNUSED,
1604                          void *aux OVS_UNUSED)
1605 {
1606     if (list_is_singleton(&all_udpifs)) {
1607         struct udpif *udpif;
1608         size_t len;
1609
1610         udpif = OBJECT_CONTAINING(list_front(&all_udpifs), udpif, list_node);
1611         len = (udpif->n_conns + 1) * sizeof *udpif->conns;
1612         udpif->conn_seq = seq_read(udpif->dump_seq);
1613         udpif->conns = xrealloc(udpif->conns, len);
1614         udpif->conns[udpif->n_conns++] = conn;
1615     } else {
1616         unixctl_command_reply_error(conn, "can't wait on multiple udpifs.");
1617     }
1618 }