ofproto-dpif: Use DPIF_FP_CREATE but not DPIF_FP_MODIFY.
[cascardo/ovs.git] / ofproto / ofproto-dpif-upcall.c
1 /* Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014 Nicira, Inc.
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.  */
14
15 #include <config.h>
16 #include "ofproto-dpif-upcall.h"
17
18 #include <errno.h>
19 #include <stdbool.h>
20 #include <inttypes.h>
21
22 #include "connmgr.h"
23 #include "coverage.h"
24 #include "dpif.h"
25 #include "dynamic-string.h"
26 #include "fail-open.h"
27 #include "guarded-list.h"
28 #include "latch.h"
29 #include "list.h"
30 #include "netlink.h"
31 #include "ofpbuf.h"
32 #include "ofproto-dpif-ipfix.h"
33 #include "ofproto-dpif-sflow.h"
34 #include "ofproto-dpif-xlate.h"
35 #include "ovs-rcu.h"
36 #include "packets.h"
37 #include "poll-loop.h"
38 #include "seq.h"
39 #include "unixctl.h"
40 #include "vlog.h"
41
42 #define MAX_QUEUE_LENGTH 512
43 #define UPCALL_MAX_BATCH 64
44 #define REVALIDATE_MAX_BATCH 50
45
46 VLOG_DEFINE_THIS_MODULE(ofproto_dpif_upcall);
47
48 COVERAGE_DEFINE(upcall_duplicate_flow);
49 COVERAGE_DEFINE(revalidate_missed_dp_flow);
50
51 /* A thread that reads upcalls from dpif, forwards each upcall's packet,
52  * and possibly sets up a kernel flow as a cache. */
53 struct handler {
54     struct udpif *udpif;               /* Parent udpif. */
55     pthread_t thread;                  /* Thread ID. */
56     uint32_t handler_id;               /* Handler id. */
57 };
58
59 /* A thread that processes datapath flows, updates OpenFlow statistics, and
60  * updates or removes them if necessary. */
61 struct revalidator {
62     struct udpif *udpif;               /* Parent udpif. */
63     pthread_t thread;                  /* Thread ID. */
64     unsigned int id;                   /* ovsthread_id_self(). */
65     struct hmap *ukeys;                /* Points into udpif->ukeys for this
66                                           revalidator. Used for GC phase. */
67 };
68
69 /* An upcall handler for ofproto_dpif.
70  *
71  * udpif keeps records of two kind of logically separate units:
72  *
73  * upcall handling
74  * ---------------
75  *
76  *    - An array of 'struct handler's for upcall handling and flow
77  *      installation.
78  *
79  * flow revalidation
80  * -----------------
81  *
82  *    - Revalidation threads which read the datapath flow table and maintains
83  *      them.
84  */
85 struct udpif {
86     struct list list_node;             /* In all_udpifs list. */
87
88     struct dpif *dpif;                 /* Datapath handle. */
89     struct dpif_backer *backer;        /* Opaque dpif_backer pointer. */
90
91     uint32_t secret;                   /* Random seed for upcall hash. */
92
93     struct handler *handlers;          /* Upcall handlers. */
94     size_t n_handlers;
95
96     struct revalidator *revalidators;  /* Flow revalidators. */
97     size_t n_revalidators;
98
99     struct latch exit_latch;           /* Tells child threads to exit. */
100
101     /* Revalidation. */
102     struct seq *reval_seq;             /* Incremented to force revalidation. */
103     bool need_revalidate;              /* As indicated by 'reval_seq'. */
104     bool reval_exit;                   /* Set by leader on 'exit_latch. */
105     struct ovs_barrier reval_barrier;  /* Barrier used by revalidators. */
106     struct dpif_flow_dump *dump;       /* DPIF flow dump state. */
107     long long int dump_duration;       /* Duration of the last flow dump. */
108     struct seq *dump_seq;              /* Increments each dump iteration. */
109
110     /* There are 'n_revalidators' ukey hmaps. Each revalidator retains a
111      * reference to one of these for garbage collection.
112      *
113      * During the flow dump phase, revalidators insert into these with a random
114      * distribution. During the garbage collection phase, each revalidator
115      * takes care of garbage collecting one of these hmaps. */
116     struct {
117         struct ovs_mutex mutex;        /* Guards the following. */
118         struct hmap hmap OVS_GUARDED;  /* Datapath flow keys. */
119     } *ukeys;
120
121     /* Datapath flow statistics. */
122     unsigned int max_n_flows;
123     unsigned int avg_n_flows;
124
125     /* Following fields are accessed and modified by different threads. */
126     atomic_uint flow_limit;            /* Datapath flow hard limit. */
127
128     /* n_flows_mutex prevents multiple threads updating these concurrently. */
129     atomic_ulong n_flows;           /* Number of flows in the datapath. */
130     atomic_llong n_flows_timestamp;    /* Last time n_flows was updated. */
131     struct ovs_mutex n_flows_mutex;
132
133     /* Following fields are accessed and modified only from the main thread. */
134     struct unixctl_conn **conns;       /* Connections waiting on dump_seq. */
135     uint64_t conn_seq;                 /* Corresponds to 'dump_seq' when
136                                           conns[n_conns-1] was stored. */
137     size_t n_conns;                    /* Number of connections waiting. */
138 };
139
140 enum upcall_type {
141     BAD_UPCALL,                 /* Some kind of bug somewhere. */
142     MISS_UPCALL,                /* A flow miss.  */
143     SFLOW_UPCALL,               /* sFlow sample. */
144     FLOW_SAMPLE_UPCALL,         /* Per-flow sampling. */
145     IPFIX_UPCALL                /* Per-bridge sampling. */
146 };
147
148 struct upcall {
149     struct ofproto_dpif *ofproto;
150
151     struct flow flow;
152     const struct nlattr *key;
153     size_t key_len;
154     enum dpif_upcall_type upcall_type;
155     struct dpif_flow_stats stats;
156     odp_port_t odp_in_port;
157
158     uint64_t slow_path_buf[128 / 8];
159     struct odputil_keybuf mask_buf;
160
161     struct xlate_out xout;
162
163     /* Raw upcall plus data for keeping track of the memory backing it. */
164     struct dpif_upcall dpif_upcall; /* As returned by dpif_recv() */
165     struct ofpbuf upcall_buf;       /* Owns some data in 'dpif_upcall'. */
166     uint64_t upcall_stub[512 / 8];  /* Buffer to reduce need for malloc(). */
167 };
168
169 /* 'udpif_key's are responsible for tracking the little bit of state udpif
170  * needs to do flow expiration which can't be pulled directly from the
171  * datapath.  They may be created or maintained by any revalidator during
172  * the dump phase, but are owned by a single revalidator, and are destroyed
173  * by that revalidator during the garbage-collection phase.
174  *
175  * While some elements of a udpif_key are protected by a mutex, the ukey itself
176  * is not.  Therefore it is not safe to destroy a udpif_key except when all
177  * revalidators are in garbage collection phase, or they aren't running. */
178 struct udpif_key {
179     struct hmap_node hmap_node;     /* In parent revalidator 'ukeys' map. */
180
181     /* These elements are read only once created, and therefore aren't
182      * protected by a mutex. */
183     const struct nlattr *key;      /* Datapath flow key. */
184     size_t key_len;                /* Length of 'key'. */
185
186     struct ovs_mutex mutex;                   /* Guards the following. */
187     struct dpif_flow_stats stats OVS_GUARDED; /* Last known stats.*/
188     long long int created OVS_GUARDED;        /* Estimate of creation time. */
189     uint64_t dump_seq OVS_GUARDED;            /* Tracks udpif->dump_seq. */
190     bool flow_exists OVS_GUARDED;             /* Ensures flows are only deleted
191                                                  once. */
192
193     struct xlate_cache *xcache OVS_GUARDED;   /* Cache for xlate entries that
194                                                * are affected by this ukey.
195                                                * Used for stats and learning.*/
196     struct odputil_keybuf key_buf;            /* Memory for 'key'. */
197 };
198
199 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
200 static struct list all_udpifs = LIST_INITIALIZER(&all_udpifs);
201
202 static size_t read_upcalls(struct handler *,
203                            struct upcall upcalls[UPCALL_MAX_BATCH]);
204 static void free_upcall(struct upcall *);
205 static int convert_upcall(struct udpif *, struct upcall *);
206 static void handle_upcalls(struct udpif *, struct upcall *, size_t n_upcalls);
207 static void udpif_stop_threads(struct udpif *);
208 static void udpif_start_threads(struct udpif *, size_t n_handlers,
209                                 size_t n_revalidators);
210 static void *udpif_upcall_handler(void *);
211 static void *udpif_revalidator(void *);
212 static unsigned long udpif_get_n_flows(struct udpif *);
213 static void revalidate(struct revalidator *);
214 static void revalidator_sweep(struct revalidator *);
215 static void revalidator_purge(struct revalidator *);
216 static void upcall_unixctl_show(struct unixctl_conn *conn, int argc,
217                                 const char *argv[], void *aux);
218 static void upcall_unixctl_disable_megaflows(struct unixctl_conn *, int argc,
219                                              const char *argv[], void *aux);
220 static void upcall_unixctl_enable_megaflows(struct unixctl_conn *, int argc,
221                                             const char *argv[], void *aux);
222 static void upcall_unixctl_set_flow_limit(struct unixctl_conn *conn, int argc,
223                                             const char *argv[], void *aux);
224 static void upcall_unixctl_dump_wait(struct unixctl_conn *conn, int argc,
225                                      const char *argv[], void *aux);
226
227 static struct udpif_key *ukey_create(const struct nlattr *key, size_t key_len,
228                                      long long int used);
229 static struct udpif_key *ukey_lookup(struct udpif *udpif,
230                                      const struct nlattr *key, size_t key_len,
231                                      uint32_t hash);
232 static bool ukey_acquire(struct udpif *udpif, const struct nlattr *key,
233                          size_t key_len, long long int used,
234                          struct udpif_key **result);
235 static void ukey_delete(struct revalidator *, struct udpif_key *);
236
237 static atomic_bool enable_megaflows = ATOMIC_VAR_INIT(true);
238
239 struct udpif *
240 udpif_create(struct dpif_backer *backer, struct dpif *dpif)
241 {
242     static struct ovsthread_once once = OVSTHREAD_ONCE_INITIALIZER;
243     struct udpif *udpif = xzalloc(sizeof *udpif);
244
245     if (ovsthread_once_start(&once)) {
246         unixctl_command_register("upcall/show", "", 0, 0, upcall_unixctl_show,
247                                  NULL);
248         unixctl_command_register("upcall/disable-megaflows", "", 0, 0,
249                                  upcall_unixctl_disable_megaflows, NULL);
250         unixctl_command_register("upcall/enable-megaflows", "", 0, 0,
251                                  upcall_unixctl_enable_megaflows, NULL);
252         unixctl_command_register("upcall/set-flow-limit", "", 1, 1,
253                                  upcall_unixctl_set_flow_limit, NULL);
254         unixctl_command_register("revalidator/wait", "", 0, 0,
255                                  upcall_unixctl_dump_wait, NULL);
256         ovsthread_once_done(&once);
257     }
258
259     udpif->dpif = dpif;
260     udpif->backer = backer;
261     atomic_init(&udpif->flow_limit, MIN(ofproto_flow_limit, 10000));
262     udpif->secret = random_uint32();
263     udpif->reval_seq = seq_create();
264     udpif->dump_seq = seq_create();
265     latch_init(&udpif->exit_latch);
266     list_push_back(&all_udpifs, &udpif->list_node);
267     atomic_init(&udpif->n_flows, 0);
268     atomic_init(&udpif->n_flows_timestamp, LLONG_MIN);
269     ovs_mutex_init(&udpif->n_flows_mutex);
270
271     dpif_register_upcall_cb(dpif, exec_upcalls);
272
273     return udpif;
274 }
275
276 void
277 udpif_run(struct udpif *udpif)
278 {
279     if (udpif->conns && udpif->conn_seq != seq_read(udpif->dump_seq)) {
280         int i;
281
282         for (i = 0; i < udpif->n_conns; i++) {
283             unixctl_command_reply(udpif->conns[i], NULL);
284         }
285         free(udpif->conns);
286         udpif->conns = NULL;
287         udpif->n_conns = 0;
288     }
289 }
290
291 void
292 udpif_destroy(struct udpif *udpif)
293 {
294     udpif_stop_threads(udpif);
295
296     list_remove(&udpif->list_node);
297     latch_destroy(&udpif->exit_latch);
298     seq_destroy(udpif->reval_seq);
299     seq_destroy(udpif->dump_seq);
300     ovs_mutex_destroy(&udpif->n_flows_mutex);
301     free(udpif);
302 }
303
304 /* Stops the handler and revalidator threads, must be enclosed in
305  * ovsrcu quiescent state unless when destroying udpif. */
306 static void
307 udpif_stop_threads(struct udpif *udpif)
308 {
309     if (udpif && (udpif->n_handlers != 0 || udpif->n_revalidators != 0)) {
310         size_t i;
311
312         latch_set(&udpif->exit_latch);
313
314         for (i = 0; i < udpif->n_handlers; i++) {
315             struct handler *handler = &udpif->handlers[i];
316
317             xpthread_join(handler->thread, NULL);
318         }
319
320         for (i = 0; i < udpif->n_revalidators; i++) {
321             xpthread_join(udpif->revalidators[i].thread, NULL);
322         }
323
324         dpif_disable_upcall(udpif->dpif);
325
326         for (i = 0; i < udpif->n_revalidators; i++) {
327             struct revalidator *revalidator = &udpif->revalidators[i];
328
329             /* Delete ukeys, and delete all flows from the datapath to prevent
330              * double-counting stats. */
331             revalidator_purge(revalidator);
332
333             hmap_destroy(&udpif->ukeys[i].hmap);
334             ovs_mutex_destroy(&udpif->ukeys[i].mutex);
335         }
336
337         latch_poll(&udpif->exit_latch);
338
339         ovs_barrier_destroy(&udpif->reval_barrier);
340
341         free(udpif->revalidators);
342         udpif->revalidators = NULL;
343         udpif->n_revalidators = 0;
344
345         free(udpif->handlers);
346         udpif->handlers = NULL;
347         udpif->n_handlers = 0;
348
349         free(udpif->ukeys);
350         udpif->ukeys = NULL;
351     }
352 }
353
354 /* Starts the handler and revalidator threads, must be enclosed in
355  * ovsrcu quiescent state. */
356 static void
357 udpif_start_threads(struct udpif *udpif, size_t n_handlers,
358                     size_t n_revalidators)
359 {
360     if (udpif && n_handlers && n_revalidators) {
361         size_t i;
362
363         udpif->n_handlers = n_handlers;
364         udpif->n_revalidators = n_revalidators;
365
366         udpif->handlers = xzalloc(udpif->n_handlers * sizeof *udpif->handlers);
367         for (i = 0; i < udpif->n_handlers; i++) {
368             struct handler *handler = &udpif->handlers[i];
369
370             handler->udpif = udpif;
371             handler->handler_id = i;
372             handler->thread = ovs_thread_create(
373                 "handler", udpif_upcall_handler, handler);
374         }
375
376         dpif_enable_upcall(udpif->dpif);
377
378         ovs_barrier_init(&udpif->reval_barrier, udpif->n_revalidators);
379         udpif->reval_exit = false;
380         udpif->revalidators = xzalloc(udpif->n_revalidators
381                                       * sizeof *udpif->revalidators);
382         udpif->ukeys = xmalloc(sizeof *udpif->ukeys * n_revalidators);
383         for (i = 0; i < udpif->n_revalidators; i++) {
384             struct revalidator *revalidator = &udpif->revalidators[i];
385
386             revalidator->udpif = udpif;
387             hmap_init(&udpif->ukeys[i].hmap);
388             ovs_mutex_init(&udpif->ukeys[i].mutex);
389             revalidator->ukeys = &udpif->ukeys[i].hmap;
390             revalidator->thread = ovs_thread_create(
391                 "revalidator", udpif_revalidator, revalidator);
392         }
393     }
394 }
395
396 /* Tells 'udpif' how many threads it should use to handle upcalls.
397  * 'n_handlers' and 'n_revalidators' can never be zero.  'udpif''s
398  * datapath handle must have packet reception enabled before starting
399  * threads. */
400 void
401 udpif_set_threads(struct udpif *udpif, size_t n_handlers,
402                   size_t n_revalidators)
403 {
404     ovs_assert(udpif);
405     ovs_assert(n_handlers && n_revalidators);
406
407     ovsrcu_quiesce_start();
408     if (udpif->n_handlers != n_handlers
409         || udpif->n_revalidators != n_revalidators) {
410         udpif_stop_threads(udpif);
411     }
412
413     if (!udpif->handlers && !udpif->revalidators) {
414         int error;
415
416         error = dpif_handlers_set(udpif->dpif, n_handlers);
417         if (error) {
418             VLOG_ERR("failed to configure handlers in dpif %s: %s",
419                      dpif_name(udpif->dpif), ovs_strerror(error));
420             return;
421         }
422
423         udpif_start_threads(udpif, n_handlers, n_revalidators);
424     }
425     ovsrcu_quiesce_end();
426 }
427
428 /* Waits for all ongoing upcall translations to complete.  This ensures that
429  * there are no transient references to any removed ofprotos (or other
430  * objects).  In particular, this should be called after an ofproto is removed
431  * (e.g. via xlate_remove_ofproto()) but before it is destroyed. */
432 void
433 udpif_synchronize(struct udpif *udpif)
434 {
435     /* This is stronger than necessary.  It would be sufficient to ensure
436      * (somehow) that each handler and revalidator thread had passed through
437      * its main loop once. */
438     size_t n_handlers = udpif->n_handlers;
439     size_t n_revalidators = udpif->n_revalidators;
440
441     ovsrcu_quiesce_start();
442     udpif_stop_threads(udpif);
443     udpif_start_threads(udpif, n_handlers, n_revalidators);
444     ovsrcu_quiesce_end();
445 }
446
447 /* Notifies 'udpif' that something changed which may render previous
448  * xlate_actions() results invalid. */
449 void
450 udpif_revalidate(struct udpif *udpif)
451 {
452     seq_change(udpif->reval_seq);
453 }
454
455 /* Returns a seq which increments every time 'udpif' pulls stats from the
456  * datapath.  Callers can use this to get a sense of when might be a good time
457  * to do periodic work which relies on relatively up to date statistics. */
458 struct seq *
459 udpif_dump_seq(struct udpif *udpif)
460 {
461     return udpif->dump_seq;
462 }
463
464 void
465 udpif_get_memory_usage(struct udpif *udpif, struct simap *usage)
466 {
467     size_t i;
468
469     simap_increase(usage, "handlers", udpif->n_handlers);
470
471     simap_increase(usage, "revalidators", udpif->n_revalidators);
472     for (i = 0; i < udpif->n_revalidators; i++) {
473         ovs_mutex_lock(&udpif->ukeys[i].mutex);
474         simap_increase(usage, "udpif keys", hmap_count(&udpif->ukeys[i].hmap));
475         ovs_mutex_unlock(&udpif->ukeys[i].mutex);
476     }
477 }
478
479 /* Remove flows from a single datapath. */
480 void
481 udpif_flush(struct udpif *udpif)
482 {
483     size_t n_handlers, n_revalidators;
484
485     n_handlers = udpif->n_handlers;
486     n_revalidators = udpif->n_revalidators;
487
488     ovsrcu_quiesce_start();
489
490     udpif_stop_threads(udpif);
491     dpif_flow_flush(udpif->dpif);
492     udpif_start_threads(udpif, n_handlers, n_revalidators);
493
494     ovsrcu_quiesce_end();
495 }
496
497 /* Removes all flows from all datapaths. */
498 static void
499 udpif_flush_all_datapaths(void)
500 {
501     struct udpif *udpif;
502
503     LIST_FOR_EACH (udpif, list_node, &all_udpifs) {
504         udpif_flush(udpif);
505     }
506 }
507
508 \f
509 static unsigned long
510 udpif_get_n_flows(struct udpif *udpif)
511 {
512     long long int time, now;
513     unsigned long flow_count;
514
515     now = time_msec();
516     atomic_read(&udpif->n_flows_timestamp, &time);
517     if (time < now - 100 && !ovs_mutex_trylock(&udpif->n_flows_mutex)) {
518         struct dpif_dp_stats stats;
519
520         atomic_store(&udpif->n_flows_timestamp, now);
521         dpif_get_dp_stats(udpif->dpif, &stats);
522         flow_count = stats.n_flows;
523         atomic_store(&udpif->n_flows, flow_count);
524         ovs_mutex_unlock(&udpif->n_flows_mutex);
525     } else {
526         atomic_read(&udpif->n_flows, &flow_count);
527     }
528     return flow_count;
529 }
530
531 /* The upcall handler thread tries to read a batch of UPCALL_MAX_BATCH
532  * upcalls from dpif, processes the batch and installs corresponding flows
533  * in dpif. */
534 static void *
535 udpif_upcall_handler(void *arg)
536 {
537     struct handler *handler = arg;
538     struct udpif *udpif = handler->udpif;
539
540     while (!latch_is_set(&handler->udpif->exit_latch)) {
541         struct upcall upcalls[UPCALL_MAX_BATCH];
542         size_t n_upcalls, i;
543
544         n_upcalls = read_upcalls(handler, upcalls);
545         if (!n_upcalls) {
546             dpif_recv_wait(udpif->dpif, handler->handler_id);
547             latch_wait(&udpif->exit_latch);
548             poll_block();
549         } else {
550             handle_upcalls(handler->udpif, upcalls, n_upcalls);
551
552             for (i = 0; i < n_upcalls; i++) {
553                 free_upcall(&upcalls[i]);
554             }
555         }
556         coverage_clear();
557     }
558
559     return NULL;
560 }
561
562 static void *
563 udpif_revalidator(void *arg)
564 {
565     /* Used by all revalidators. */
566     struct revalidator *revalidator = arg;
567     struct udpif *udpif = revalidator->udpif;
568     bool leader = revalidator == &udpif->revalidators[0];
569
570     /* Used only by the leader. */
571     long long int start_time = 0;
572     uint64_t last_reval_seq = 0;
573     unsigned int flow_limit = 0;
574     size_t n_flows = 0;
575
576     revalidator->id = ovsthread_id_self();
577     for (;;) {
578         if (leader) {
579             uint64_t reval_seq;
580
581             reval_seq = seq_read(udpif->reval_seq);
582             udpif->need_revalidate = last_reval_seq != reval_seq;
583             last_reval_seq = reval_seq;
584
585             n_flows = udpif_get_n_flows(udpif);
586             udpif->max_n_flows = MAX(n_flows, udpif->max_n_flows);
587             udpif->avg_n_flows = (udpif->avg_n_flows + n_flows) / 2;
588
589             /* Only the leader checks the exit latch to prevent a race where
590              * some threads think it's true and exit and others think it's
591              * false and block indefinitely on the reval_barrier */
592             udpif->reval_exit = latch_is_set(&udpif->exit_latch);
593
594             start_time = time_msec();
595             if (!udpif->reval_exit) {
596                 udpif->dump = dpif_flow_dump_create(udpif->dpif);
597             }
598         }
599
600         /* Wait for the leader to start the flow dump. */
601         ovs_barrier_block(&udpif->reval_barrier);
602         if (udpif->reval_exit) {
603             break;
604         }
605         revalidate(revalidator);
606
607         /* Wait for all flows to have been dumped before we garbage collect. */
608         ovs_barrier_block(&udpif->reval_barrier);
609         revalidator_sweep(revalidator);
610
611         /* Wait for all revalidators to finish garbage collection. */
612         ovs_barrier_block(&udpif->reval_barrier);
613
614         if (leader) {
615             long long int duration;
616
617             dpif_flow_dump_destroy(udpif->dump);
618             seq_change(udpif->dump_seq);
619
620             duration = MAX(time_msec() - start_time, 1);
621             atomic_read(&udpif->flow_limit, &flow_limit);
622             udpif->dump_duration = duration;
623             if (duration > 2000) {
624                 flow_limit /= duration / 1000;
625             } else if (duration > 1300) {
626                 flow_limit = flow_limit * 3 / 4;
627             } else if (duration < 1000 && n_flows > 2000
628                        && flow_limit < n_flows * 1000 / duration) {
629                 flow_limit += 1000;
630             }
631             flow_limit = MIN(ofproto_flow_limit, MAX(flow_limit, 1000));
632             atomic_store(&udpif->flow_limit, flow_limit);
633
634             if (duration > 2000) {
635                 VLOG_INFO("Spent an unreasonably long %lldms dumping flows",
636                           duration);
637             }
638
639             poll_timer_wait_until(start_time + MIN(ofproto_max_idle, 500));
640             seq_wait(udpif->reval_seq, last_reval_seq);
641             latch_wait(&udpif->exit_latch);
642             poll_block();
643         }
644     }
645
646     return NULL;
647 }
648 \f
649 static enum upcall_type
650 classify_upcall(const struct upcall *upcall)
651 {
652     const struct dpif_upcall *dpif_upcall = &upcall->dpif_upcall;
653     union user_action_cookie cookie;
654     size_t userdata_len;
655
656     /* First look at the upcall type. */
657     switch (dpif_upcall->type) {
658     case DPIF_UC_ACTION:
659         break;
660
661     case DPIF_UC_MISS:
662         return MISS_UPCALL;
663
664     case DPIF_N_UC_TYPES:
665     default:
666         VLOG_WARN_RL(&rl, "upcall has unexpected type %"PRIu32,
667                      dpif_upcall->type);
668         return BAD_UPCALL;
669     }
670
671     /* "action" upcalls need a closer look. */
672     if (!dpif_upcall->userdata) {
673         VLOG_WARN_RL(&rl, "action upcall missing cookie");
674         return BAD_UPCALL;
675     }
676     userdata_len = nl_attr_get_size(dpif_upcall->userdata);
677     if (userdata_len < sizeof cookie.type
678         || userdata_len > sizeof cookie) {
679         VLOG_WARN_RL(&rl, "action upcall cookie has unexpected size %"PRIuSIZE,
680                      userdata_len);
681         return BAD_UPCALL;
682     }
683     memset(&cookie, 0, sizeof cookie);
684     memcpy(&cookie, nl_attr_get(dpif_upcall->userdata), userdata_len);
685     if (userdata_len == MAX(8, sizeof cookie.sflow)
686         && cookie.type == USER_ACTION_COOKIE_SFLOW) {
687         return SFLOW_UPCALL;
688     } else if (userdata_len == MAX(8, sizeof cookie.slow_path)
689                && cookie.type == USER_ACTION_COOKIE_SLOW_PATH) {
690         return MISS_UPCALL;
691     } else if (userdata_len == MAX(8, sizeof cookie.flow_sample)
692                && cookie.type == USER_ACTION_COOKIE_FLOW_SAMPLE) {
693         return FLOW_SAMPLE_UPCALL;
694     } else if (userdata_len == MAX(8, sizeof cookie.ipfix)
695                && cookie.type == USER_ACTION_COOKIE_IPFIX) {
696         return IPFIX_UPCALL;
697     } else {
698         VLOG_WARN_RL(&rl, "invalid user cookie of type %"PRIu16
699                      " and size %"PRIuSIZE, cookie.type, userdata_len);
700         return BAD_UPCALL;
701     }
702 }
703
704 /* Calculates slow path actions for 'xout'.  'buf' must statically be
705  * initialized with at least 128 bytes of space. */
706 static void
707 compose_slow_path(struct udpif *udpif, struct xlate_out *xout,
708                   struct flow *flow, odp_port_t odp_in_port,
709                   struct ofpbuf *buf)
710 {
711     union user_action_cookie cookie;
712     odp_port_t port;
713     uint32_t pid;
714
715     cookie.type = USER_ACTION_COOKIE_SLOW_PATH;
716     cookie.slow_path.unused = 0;
717     cookie.slow_path.reason = xout->slow;
718
719     port = xout->slow & (SLOW_CFM | SLOW_BFD | SLOW_LACP | SLOW_STP)
720         ? ODPP_NONE
721         : odp_in_port;
722     pid = dpif_port_get_pid(udpif->dpif, port, flow_hash_5tuple(flow, 0));
723     odp_put_userspace_action(pid, &cookie, sizeof cookie.slow_path, buf);
724 }
725
726 static void
727 upcall_init(struct upcall *upcall, struct flow *flow, struct ofpbuf *packet,
728             struct ofproto_dpif *ofproto, struct dpif_upcall *dupcall,
729             odp_port_t odp_in_port)
730 {
731     struct pkt_metadata md = pkt_metadata_from_flow(flow);
732     struct xlate_in xin;
733
734     flow_extract(packet, &md, &upcall->flow);
735
736     upcall->ofproto = ofproto;
737     upcall->key = dupcall->key;
738     upcall->key_len = dupcall->key_len;
739     upcall->upcall_type = dupcall->type;
740     upcall->stats.n_packets = 1;
741     upcall->stats.n_bytes = ofpbuf_size(packet);
742     upcall->stats.used = time_msec();
743     upcall->stats.tcp_flags = ntohs(upcall->flow.tcp_flags);
744     upcall->odp_in_port = odp_in_port;
745
746     xlate_in_init(&xin, upcall->ofproto, &upcall->flow, NULL,
747                   upcall->stats.tcp_flags, packet);
748
749     if (upcall->upcall_type == DPIF_UC_MISS) {
750         xin.resubmit_stats = &upcall->stats;
751     } else {
752         /* For non-miss upcalls, there's a flow in the datapath which this
753          * packet was accounted to.  Presumably the revalidators will deal
754          * with pushing its stats eventually. */
755     }
756
757     xlate_actions(&xin, &upcall->xout);
758 }
759
760 static void
761 free_upcall(struct upcall *upcall)
762 {
763     xlate_out_uninit(&upcall->xout);
764     ofpbuf_uninit(&upcall->dpif_upcall.packet);
765     ofpbuf_uninit(&upcall->upcall_buf);
766 }
767
768 static struct udpif *
769 find_udpif(struct dpif *dpif)
770 {
771     struct udpif *udpif;
772
773     LIST_FOR_EACH (udpif, list_node, &all_udpifs) {
774         if (udpif->dpif == dpif) {
775             return udpif;
776         }
777     }
778     return NULL;
779 }
780
781 void
782 exec_upcalls(struct dpif *dpif, struct dpif_upcall *dupcalls,
783              struct ofpbuf *bufs, int cnt)
784 {
785     struct upcall upcalls[UPCALL_MAX_BATCH];
786     struct udpif *udpif;
787     int i, j;
788
789     udpif = find_udpif(dpif);
790     ovs_assert(udpif);
791
792     for (i = 0; i < cnt; i += UPCALL_MAX_BATCH) {
793         size_t n_upcalls = 0;
794         for (j = i; j < MIN(i + UPCALL_MAX_BATCH, cnt); j++) {
795             struct upcall *upcall = &upcalls[n_upcalls];
796             struct dpif_upcall *dupcall = &dupcalls[j];
797             struct ofpbuf *buf = &bufs[j];
798
799             upcall->dpif_upcall = *dupcall;
800             upcall->upcall_buf = *buf;
801
802             dpif_print_packet(dpif, dupcall);
803             if (!convert_upcall(udpif, upcall)) {
804                 n_upcalls += 1;
805             }
806         }
807
808         if (n_upcalls) {
809             handle_upcalls(udpif, upcalls, n_upcalls);
810             for (j = 0; j < n_upcalls; j++) {
811                 free_upcall(&upcalls[j]);
812             }
813         }
814     }
815 }
816
817 /* Reads and classifies upcalls.  Returns the number of upcalls successfully
818  * read. */
819 static size_t
820 read_upcalls(struct handler *handler,
821              struct upcall upcalls[UPCALL_MAX_BATCH])
822 {
823     struct udpif *udpif = handler->udpif;
824     size_t i;
825     size_t n_upcalls = 0;
826
827     /* Try reading UPCALL_MAX_BATCH upcalls from dpif. */
828     for (i = 0; i < UPCALL_MAX_BATCH; i++) {
829         struct upcall *upcall = &upcalls[n_upcalls];
830         int error;
831
832         ofpbuf_use_stub(&upcall->upcall_buf, upcall->upcall_stub,
833                         sizeof upcall->upcall_stub);
834         error = dpif_recv(udpif->dpif, handler->handler_id,
835                           &upcall->dpif_upcall, &upcall->upcall_buf);
836         if (error) {
837             ofpbuf_uninit(&upcall->upcall_buf);
838             break;
839         }
840
841         if (!convert_upcall(udpif, upcall)) {
842             n_upcalls += 1;
843         }
844     }
845     return n_upcalls;
846 }
847
848 static int
849 convert_upcall(struct udpif *udpif, struct upcall *upcall)
850 {
851     struct dpif_upcall *dupcall = &upcall->dpif_upcall;
852     struct ofpbuf *packet = &dupcall->packet;
853     struct ofproto_dpif *ofproto;
854     struct dpif_sflow *sflow;
855     struct dpif_ipfix *ipfix;
856     struct flow flow;
857     enum upcall_type type;
858     odp_port_t odp_in_port;
859     int error;
860
861     error = xlate_receive(udpif->backer, packet, dupcall->key,
862                           dupcall->key_len, &flow,
863                           &ofproto, &ipfix, &sflow, NULL, &odp_in_port);
864
865     if (error) {
866         if (error == ENODEV) {
867             /* Received packet on datapath port for which we couldn't
868              * associate an ofproto.  This can happen if a port is removed
869              * while traffic is being received.  Print a rate-limited
870              * message in case it happens frequently.  Install a drop flow
871              * so that future packets of the flow are inexpensively dropped
872              * in the kernel. */
873             VLOG_INFO_RL(&rl, "received packet on unassociated datapath "
874                          "port %"PRIu32, odp_in_port);
875             dpif_flow_put(udpif->dpif, DPIF_FP_CREATE,
876                           dupcall->key, dupcall->key_len, NULL, 0, NULL, 0,
877                           NULL);
878         }
879         goto destroy_upcall;
880     }
881
882     type = classify_upcall(upcall);
883     if (type == MISS_UPCALL) {
884         upcall_init(upcall, &flow, packet, ofproto, dupcall, odp_in_port);
885         return error;
886     }
887
888     switch (type) {
889     case SFLOW_UPCALL:
890         if (sflow) {
891             union user_action_cookie cookie;
892
893             memset(&cookie, 0, sizeof cookie);
894             memcpy(&cookie, nl_attr_get(dupcall->userdata),
895                    sizeof cookie.sflow);
896             dpif_sflow_received(sflow, packet, &flow, odp_in_port,
897                                 &cookie);
898         }
899         break;
900     case IPFIX_UPCALL:
901         if (ipfix) {
902             dpif_ipfix_bridge_sample(ipfix, packet, &flow);
903         }
904         break;
905     case FLOW_SAMPLE_UPCALL:
906         if (ipfix) {
907             union user_action_cookie cookie;
908
909             memset(&cookie, 0, sizeof cookie);
910             memcpy(&cookie, nl_attr_get(dupcall->userdata),
911                    sizeof cookie.flow_sample);
912
913             /* The flow reflects exactly the contents of the packet.
914              * Sample the packet using it. */
915             dpif_ipfix_flow_sample(ipfix, packet, &flow,
916                                    cookie.flow_sample.collector_set_id,
917                                    cookie.flow_sample.probability,
918                                    cookie.flow_sample.obs_domain_id,
919                                    cookie.flow_sample.obs_point_id);
920         }
921         break;
922     case BAD_UPCALL:
923         break;
924     case MISS_UPCALL:
925         OVS_NOT_REACHED();
926     }
927
928     dpif_ipfix_unref(ipfix);
929     dpif_sflow_unref(sflow);
930     error = EAGAIN;
931
932 destroy_upcall:
933     ofpbuf_uninit(&upcall->dpif_upcall.packet);
934     ofpbuf_uninit(&upcall->upcall_buf);
935     return error;
936 }
937
938 static void
939 handle_upcalls(struct udpif *udpif, struct upcall *upcalls,
940                size_t n_upcalls)
941 {
942     struct dpif_op *opsp[UPCALL_MAX_BATCH * 2];
943     struct dpif_op ops[UPCALL_MAX_BATCH * 2];
944     size_t n_ops, i;
945     unsigned int flow_limit;
946     bool fail_open, may_put;
947
948     atomic_read(&udpif->flow_limit, &flow_limit);
949     may_put = udpif_get_n_flows(udpif) < flow_limit;
950
951     /* Handle the packets individually in order of arrival.
952      *
953      *   - For SLOW_CFM, SLOW_LACP, SLOW_STP, and SLOW_BFD, translation is what
954      *     processes received packets for these protocols.
955      *
956      *   - For SLOW_CONTROLLER, translation sends the packet to the OpenFlow
957      *     controller.
958      *
959      * The loop fills 'ops' with an array of operations to execute in the
960      * datapath. */
961     fail_open = false;
962     n_ops = 0;
963     for (i = 0; i < n_upcalls; i++) {
964         struct upcall *upcall = &upcalls[i];
965         struct ofpbuf *packet = &upcall->dpif_upcall.packet;
966         struct dpif_op *op;
967
968         fail_open = fail_open || upcall->xout.fail_open;
969
970         if (upcall->flow.in_port.ofp_port
971             != vsp_realdev_to_vlandev(upcall->ofproto,
972                                       upcall->flow.in_port.ofp_port,
973                                       upcall->flow.vlan_tci)) {
974             /* This packet was received on a VLAN splinter port.  We
975              * added a VLAN to the packet to make the packet resemble
976              * the flow, but the actions were composed assuming that
977              * the packet contained no VLAN.  So, we must remove the
978              * VLAN header from the packet before trying to execute the
979              * actions. */
980             if (ofpbuf_size(&upcall->xout.odp_actions)) {
981                 eth_pop_vlan(packet);
982             }
983
984             /* Remove the flow vlan tags inserted by vlan splinter logic
985              * to ensure megaflow masks generated match the data path flow. */
986             upcall->flow.vlan_tci = 0;
987         }
988
989         /* Do not install a flow into the datapath if:
990          *
991          *    - The datapath already has too many flows.
992          *
993          *    - We received this packet via some flow installed in the kernel
994          *      already. */
995         if (may_put
996             && upcall->dpif_upcall.type == DPIF_UC_MISS) {
997             struct ofpbuf mask;
998             bool megaflow;
999
1000             atomic_read(&enable_megaflows, &megaflow);
1001             ofpbuf_use_stack(&mask, &upcall->mask_buf, sizeof upcall->mask_buf);
1002             if (megaflow) {
1003                 size_t max_mpls;
1004                 bool recirc;
1005
1006                 recirc = ofproto_dpif_get_enable_recirc(upcall->ofproto);
1007                 max_mpls = ofproto_dpif_get_max_mpls_depth(upcall->ofproto);
1008                 odp_flow_key_from_mask(&mask, &upcall->xout.wc.masks,
1009                                        &upcall->flow, UINT32_MAX, max_mpls,
1010                                        recirc);
1011             }
1012
1013             op = &ops[n_ops++];
1014             op->type = DPIF_OP_FLOW_PUT;
1015             op->u.flow_put.flags = DPIF_FP_CREATE;
1016             op->u.flow_put.key = upcall->key;
1017             op->u.flow_put.key_len = upcall->key_len;
1018             op->u.flow_put.mask = ofpbuf_data(&mask);
1019             op->u.flow_put.mask_len = ofpbuf_size(&mask);
1020             op->u.flow_put.stats = NULL;
1021
1022             if (!upcall->xout.slow) {
1023                 op->u.flow_put.actions = ofpbuf_data(&upcall->xout.odp_actions);
1024                 op->u.flow_put.actions_len = ofpbuf_size(&upcall->xout.odp_actions);
1025             } else {
1026                 struct ofpbuf buf;
1027
1028                 ofpbuf_use_stack(&buf, upcall->slow_path_buf,
1029                                  sizeof upcall->slow_path_buf);
1030                 compose_slow_path(udpif, &upcall->xout, &upcall->flow,
1031                                   upcall->odp_in_port, &buf);
1032                 op->u.flow_put.actions = ofpbuf_data(&buf);
1033                 op->u.flow_put.actions_len = ofpbuf_size(&buf);
1034             }
1035         }
1036
1037         if (ofpbuf_size(&upcall->xout.odp_actions)) {
1038
1039             op = &ops[n_ops++];
1040             op->type = DPIF_OP_EXECUTE;
1041             op->u.execute.packet = packet;
1042             odp_key_to_pkt_metadata(upcall->key, upcall->key_len,
1043                                     &op->u.execute.md);
1044             op->u.execute.actions = ofpbuf_data(&upcall->xout.odp_actions);
1045             op->u.execute.actions_len = ofpbuf_size(&upcall->xout.odp_actions);
1046             op->u.execute.needs_help = (upcall->xout.slow & SLOW_ACTION) != 0;
1047         }
1048     }
1049
1050     /* Special case for fail-open mode.
1051      *
1052      * If we are in fail-open mode, but we are connected to a controller too,
1053      * then we should send the packet up to the controller in the hope that it
1054      * will try to set up a flow and thereby allow us to exit fail-open.
1055      *
1056      * See the top-level comment in fail-open.c for more information.
1057      *
1058      * Copy packets before they are modified by execution. */
1059     if (fail_open) {
1060         for (i = 0; i < n_upcalls; i++) {
1061             struct upcall *upcall = &upcalls[i];
1062             struct ofpbuf *packet = &upcall->dpif_upcall.packet;
1063             struct ofproto_packet_in *pin;
1064
1065             pin = xmalloc(sizeof *pin);
1066             pin->up.packet = xmemdup(ofpbuf_data(packet), ofpbuf_size(packet));
1067             pin->up.packet_len = ofpbuf_size(packet);
1068             pin->up.reason = OFPR_NO_MATCH;
1069             pin->up.table_id = 0;
1070             pin->up.cookie = OVS_BE64_MAX;
1071             flow_get_metadata(&upcall->flow, &pin->up.fmd);
1072             pin->send_len = 0; /* Not used for flow table misses. */
1073             pin->miss_type = OFPROTO_PACKET_IN_NO_MISS;
1074             ofproto_dpif_send_packet_in(upcall->ofproto, pin);
1075         }
1076     }
1077
1078     /* Execute batch. */
1079     for (i = 0; i < n_ops; i++) {
1080         opsp[i] = &ops[i];
1081     }
1082     dpif_operate(udpif->dpif, opsp, n_ops);
1083 }
1084
1085 /* Must be called with udpif->ukeys[hash % udpif->n_revalidators].mutex. */
1086 static struct udpif_key *
1087 ukey_lookup(struct udpif *udpif, const struct nlattr *key, size_t key_len,
1088             uint32_t hash)
1089     OVS_REQUIRES(udpif->ukeys->mutex)
1090 {
1091     struct udpif_key *ukey;
1092     struct hmap *hmap = &udpif->ukeys[hash % udpif->n_revalidators].hmap;
1093
1094     HMAP_FOR_EACH_WITH_HASH (ukey, hmap_node, hash, hmap) {
1095         if (ukey->key_len == key_len && !memcmp(ukey->key, key, key_len)) {
1096             return ukey;
1097         }
1098     }
1099     return NULL;
1100 }
1101
1102 /* Creates a ukey for 'key' and 'key_len', returning it with ukey->mutex in
1103  * a locked state. */
1104 static struct udpif_key *
1105 ukey_create(const struct nlattr *key, size_t key_len, long long int used)
1106     OVS_NO_THREAD_SAFETY_ANALYSIS
1107 {
1108     struct udpif_key *ukey = xmalloc(sizeof *ukey);
1109
1110     ovs_mutex_init(&ukey->mutex);
1111     ukey->key = (struct nlattr *) &ukey->key_buf;
1112     memcpy(&ukey->key_buf, key, key_len);
1113     ukey->key_len = key_len;
1114
1115     ovs_mutex_lock(&ukey->mutex);
1116     ukey->dump_seq = 0;
1117     ukey->flow_exists = true;
1118     ukey->created = used ? used : time_msec();
1119     memset(&ukey->stats, 0, sizeof ukey->stats);
1120     ukey->xcache = NULL;
1121
1122     return ukey;
1123 }
1124
1125 /* Searches for a ukey in 'udpif->ukeys' that matches 'key' and 'key_len' and
1126  * attempts to lock the ukey. If the ukey does not exist, create it.
1127  *
1128  * Returns true on success, setting *result to the matching ukey and returning
1129  * it in a locked state. Otherwise, returns false and clears *result. */
1130 static bool
1131 ukey_acquire(struct udpif *udpif, const struct nlattr *key, size_t key_len,
1132              long long int used, struct udpif_key **result)
1133     OVS_TRY_LOCK(true, (*result)->mutex)
1134 {
1135     struct udpif_key *ukey;
1136     uint32_t hash, idx;
1137     bool locked = false;
1138
1139     hash = hash_bytes(key, key_len, udpif->secret);
1140     idx = hash % udpif->n_revalidators;
1141
1142     ovs_mutex_lock(&udpif->ukeys[idx].mutex);
1143     ukey = ukey_lookup(udpif, key, key_len, hash);
1144     if (!ukey) {
1145         ukey = ukey_create(key, key_len, used);
1146         hmap_insert(&udpif->ukeys[idx].hmap, &ukey->hmap_node, hash);
1147         locked = true;
1148     } else if (!ovs_mutex_trylock(&ukey->mutex)) {
1149         locked = true;
1150     }
1151     ovs_mutex_unlock(&udpif->ukeys[idx].mutex);
1152
1153     if (locked) {
1154         *result = ukey;
1155     } else {
1156         *result = NULL;
1157     }
1158     return locked;
1159 }
1160
1161 static void
1162 ukey_delete(struct revalidator *revalidator, struct udpif_key *ukey)
1163     OVS_NO_THREAD_SAFETY_ANALYSIS
1164 {
1165     if (revalidator) {
1166         hmap_remove(revalidator->ukeys, &ukey->hmap_node);
1167     }
1168     xlate_cache_delete(ukey->xcache);
1169     ovs_mutex_destroy(&ukey->mutex);
1170     free(ukey);
1171 }
1172
1173 static bool
1174 should_revalidate(const struct udpif *udpif, uint64_t packets,
1175                   long long int used)
1176 {
1177     long long int metric, now, duration;
1178
1179     if (udpif->dump_duration < 200) {
1180         /* We are likely to handle full revalidation for the flows. */
1181         return true;
1182     }
1183
1184     /* Calculate the mean time between seeing these packets. If this
1185      * exceeds the threshold, then delete the flow rather than performing
1186      * costly revalidation for flows that aren't being hit frequently.
1187      *
1188      * This is targeted at situations where the dump_duration is high (~1s),
1189      * and revalidation is triggered by a call to udpif_revalidate(). In
1190      * these situations, revalidation of all flows causes fluctuations in the
1191      * flow_limit due to the interaction with the dump_duration and max_idle.
1192      * This tends to result in deletion of low-throughput flows anyway, so
1193      * skip the revalidation and just delete those flows. */
1194     packets = MAX(packets, 1);
1195     now = MAX(used, time_msec());
1196     duration = now - used;
1197     metric = duration / packets;
1198
1199     if (metric < 200) {
1200         /* The flow is receiving more than ~5pps, so keep it. */
1201         return true;
1202     }
1203     return false;
1204 }
1205
1206 static bool
1207 revalidate_ukey(struct udpif *udpif, struct udpif_key *ukey,
1208                 const struct dpif_flow *f)
1209     OVS_REQUIRES(ukey->mutex)
1210 {
1211     uint64_t slow_path_buf[128 / 8];
1212     struct xlate_out xout, *xoutp;
1213     struct netflow *netflow;
1214     struct ofproto_dpif *ofproto;
1215     struct dpif_flow_stats push;
1216     struct ofpbuf xout_actions;
1217     struct flow flow, dp_mask;
1218     uint32_t *dp32, *xout32;
1219     odp_port_t odp_in_port;
1220     struct xlate_in xin;
1221     long long int last_used;
1222     int error;
1223     size_t i;
1224     bool may_learn, ok;
1225
1226     ok = false;
1227     xoutp = NULL;
1228     netflow = NULL;
1229
1230     last_used = ukey->stats.used;
1231     push.used = f->stats.used;
1232     push.tcp_flags = f->stats.tcp_flags;
1233     push.n_packets = (f->stats.n_packets > ukey->stats.n_packets
1234                       ? f->stats.n_packets - ukey->stats.n_packets
1235                       : 0);
1236     push.n_bytes = (f->stats.n_bytes > ukey->stats.n_bytes
1237                     ? f->stats.n_bytes - ukey->stats.n_bytes
1238                     : 0);
1239
1240     if (udpif->need_revalidate && last_used
1241         && !should_revalidate(udpif, push.n_packets, last_used)) {
1242         ok = false;
1243         goto exit;
1244     }
1245
1246     /* We will push the stats, so update the ukey stats cache. */
1247     ukey->stats = f->stats;
1248     if (!push.n_packets && !udpif->need_revalidate) {
1249         ok = true;
1250         goto exit;
1251     }
1252
1253     may_learn = push.n_packets > 0;
1254     if (ukey->xcache && !udpif->need_revalidate) {
1255         xlate_push_stats(ukey->xcache, may_learn, &push);
1256         ok = true;
1257         goto exit;
1258     }
1259
1260     error = xlate_receive(udpif->backer, NULL, ukey->key, ukey->key_len, &flow,
1261                           &ofproto, NULL, NULL, &netflow, &odp_in_port);
1262     if (error) {
1263         goto exit;
1264     }
1265
1266     if (udpif->need_revalidate) {
1267         xlate_cache_clear(ukey->xcache);
1268     }
1269     if (!ukey->xcache) {
1270         ukey->xcache = xlate_cache_new();
1271     }
1272
1273     xlate_in_init(&xin, ofproto, &flow, NULL, push.tcp_flags, NULL);
1274     xin.resubmit_stats = push.n_packets ? &push : NULL;
1275     xin.xcache = ukey->xcache;
1276     xin.may_learn = may_learn;
1277     xin.skip_wildcards = !udpif->need_revalidate;
1278     xlate_actions(&xin, &xout);
1279     xoutp = &xout;
1280
1281     if (!udpif->need_revalidate) {
1282         ok = true;
1283         goto exit;
1284     }
1285
1286     if (!xout.slow) {
1287         ofpbuf_use_const(&xout_actions, ofpbuf_data(&xout.odp_actions),
1288                          ofpbuf_size(&xout.odp_actions));
1289     } else {
1290         ofpbuf_use_stack(&xout_actions, slow_path_buf, sizeof slow_path_buf);
1291         compose_slow_path(udpif, &xout, &flow, odp_in_port, &xout_actions);
1292     }
1293
1294     if (f->actions_len != ofpbuf_size(&xout_actions)
1295         || memcmp(ofpbuf_data(&xout_actions), f->actions, f->actions_len)) {
1296         goto exit;
1297     }
1298
1299     if (odp_flow_key_to_mask(f->mask, f->mask_len, &dp_mask, &flow)
1300         == ODP_FIT_ERROR) {
1301         goto exit;
1302     }
1303
1304     /* Since the kernel is free to ignore wildcarded bits in the mask, we can't
1305      * directly check that the masks are the same.  Instead we check that the
1306      * mask in the kernel is more specific i.e. less wildcarded, than what
1307      * we've calculated here.  This guarantees we don't catch any packets we
1308      * shouldn't with the megaflow. */
1309     dp32 = (uint32_t *) &dp_mask;
1310     xout32 = (uint32_t *) &xout.wc.masks;
1311     for (i = 0; i < FLOW_U32S; i++) {
1312         if ((dp32[i] | xout32[i]) != dp32[i]) {
1313             goto exit;
1314         }
1315     }
1316     ok = true;
1317
1318 exit:
1319     if (netflow) {
1320         if (!ok) {
1321             netflow_flow_clear(netflow, &flow);
1322         }
1323         netflow_unref(netflow);
1324     }
1325     xlate_out_uninit(xoutp);
1326     return ok;
1327 }
1328
1329 struct dump_op {
1330     struct udpif_key *ukey;
1331     struct dpif_flow_stats stats; /* Stats for 'op'. */
1332     struct dpif_op op;            /* Flow del operation. */
1333 };
1334
1335 static void
1336 dump_op_init(struct dump_op *op, const struct nlattr *key, size_t key_len,
1337              struct udpif_key *ukey)
1338 {
1339     op->ukey = ukey;
1340     op->op.type = DPIF_OP_FLOW_DEL;
1341     op->op.u.flow_del.key = key;
1342     op->op.u.flow_del.key_len = key_len;
1343     op->op.u.flow_del.stats = &op->stats;
1344 }
1345
1346 static void
1347 push_dump_ops__(struct udpif *udpif, struct dump_op *ops, size_t n_ops)
1348 {
1349     struct dpif_op *opsp[REVALIDATE_MAX_BATCH];
1350     size_t i;
1351
1352     ovs_assert(n_ops <= REVALIDATE_MAX_BATCH);
1353     for (i = 0; i < n_ops; i++) {
1354         opsp[i] = &ops[i].op;
1355     }
1356     dpif_operate(udpif->dpif, opsp, n_ops);
1357
1358     for (i = 0; i < n_ops; i++) {
1359         struct dump_op *op = &ops[i];
1360         struct dpif_flow_stats *push, *stats, push_buf;
1361
1362         stats = op->op.u.flow_del.stats;
1363         push = &push_buf;
1364
1365         ovs_mutex_lock(&op->ukey->mutex);
1366         push->used = MAX(stats->used, op->ukey->stats.used);
1367         push->tcp_flags = stats->tcp_flags | op->ukey->stats.tcp_flags;
1368         push->n_packets = stats->n_packets - op->ukey->stats.n_packets;
1369         push->n_bytes = stats->n_bytes - op->ukey->stats.n_bytes;
1370         ovs_mutex_unlock(&op->ukey->mutex);
1371
1372         if (push->n_packets || netflow_exists()) {
1373             struct ofproto_dpif *ofproto;
1374             struct netflow *netflow;
1375             struct flow flow;
1376             bool may_learn;
1377             int error;
1378
1379             may_learn = push->n_packets > 0;
1380             ovs_mutex_lock(&op->ukey->mutex);
1381             if (op->ukey->xcache) {
1382                 xlate_push_stats(op->ukey->xcache, may_learn, push);
1383                 ovs_mutex_unlock(&op->ukey->mutex);
1384                 continue;
1385             }
1386             ovs_mutex_unlock(&op->ukey->mutex);
1387
1388             error = xlate_receive(udpif->backer, NULL, op->op.u.flow_del.key,
1389                                   op->op.u.flow_del.key_len, &flow, &ofproto,
1390                                   NULL, NULL, &netflow, NULL);
1391             if (!error) {
1392                 struct xlate_in xin;
1393
1394                 xlate_in_init(&xin, ofproto, &flow, NULL, push->tcp_flags,
1395                               NULL);
1396                 xin.resubmit_stats = push->n_packets ? push : NULL;
1397                 xin.may_learn = may_learn;
1398                 xin.skip_wildcards = true;
1399                 xlate_actions_for_side_effects(&xin);
1400
1401                 if (netflow) {
1402                     netflow_flow_clear(netflow, &flow);
1403                     netflow_unref(netflow);
1404                 }
1405             }
1406         }
1407     }
1408 }
1409
1410 static void
1411 push_dump_ops(struct revalidator *revalidator,
1412               struct dump_op *ops, size_t n_ops)
1413 {
1414     int i;
1415
1416     push_dump_ops__(revalidator->udpif, ops, n_ops);
1417     for (i = 0; i < n_ops; i++) {
1418         ukey_delete(revalidator, ops[i].ukey);
1419     }
1420 }
1421
1422 static void
1423 revalidate(struct revalidator *revalidator)
1424 {
1425     struct udpif *udpif = revalidator->udpif;
1426     struct dpif_flow_dump_thread *dump_thread;
1427     uint64_t dump_seq;
1428     unsigned int flow_limit;
1429
1430     dump_seq = seq_read(udpif->dump_seq);
1431     atomic_read(&udpif->flow_limit, &flow_limit);
1432     dump_thread = dpif_flow_dump_thread_create(udpif->dump);
1433     for (;;) {
1434         struct dump_op ops[REVALIDATE_MAX_BATCH];
1435         int n_ops = 0;
1436
1437         struct dpif_flow flows[REVALIDATE_MAX_BATCH];
1438         const struct dpif_flow *f;
1439         int n_dumped;
1440
1441         long long int max_idle;
1442         long long int now;
1443         size_t n_dp_flows;
1444         bool kill_them_all;
1445
1446         n_dumped = dpif_flow_dump_next(dump_thread, flows, ARRAY_SIZE(flows));
1447         if (!n_dumped) {
1448             break;
1449         }
1450
1451         now = time_msec();
1452
1453         /* In normal operation we want to keep flows around until they have
1454          * been idle for 'ofproto_max_idle' milliseconds.  However:
1455          *
1456          *     - If the number of datapath flows climbs above 'flow_limit',
1457          *       drop that down to 100 ms to try to bring the flows down to
1458          *       the limit.
1459          *
1460          *     - If the number of datapath flows climbs above twice
1461          *       'flow_limit', delete all the datapath flows as an emergency
1462          *       measure.  (We reassess this condition for the next batch of
1463          *       datapath flows, so we will recover before all the flows are
1464          *       gone.) */
1465         n_dp_flows = udpif_get_n_flows(udpif);
1466         kill_them_all = n_dp_flows > flow_limit * 2;
1467         max_idle = n_dp_flows > flow_limit ? 100 : ofproto_max_idle;
1468
1469         for (f = flows; f < &flows[n_dumped]; f++) {
1470             long long int used = f->stats.used;
1471             struct udpif_key *ukey;
1472             bool already_dumped, keep;
1473
1474             if (!ukey_acquire(udpif, f->key, f->key_len, used, &ukey)) {
1475                 /* We couldn't acquire the ukey. This means that
1476                  * another revalidator is processing this flow
1477                  * concurrently, so don't bother processing it. */
1478                 COVERAGE_INC(upcall_duplicate_flow);
1479                 continue;
1480             }
1481
1482             already_dumped = ukey->dump_seq == dump_seq;
1483             if (already_dumped) {
1484                 /* The flow has already been dumped and handled by another
1485                  * revalidator during this flow dump operation. Skip it. */
1486                 COVERAGE_INC(upcall_duplicate_flow);
1487                 ovs_mutex_unlock(&ukey->mutex);
1488                 continue;
1489             }
1490
1491             if (!used) {
1492                 used = ukey->created;
1493             }
1494             if (kill_them_all || (used && used < now - max_idle)) {
1495                 keep = false;
1496             } else {
1497                 keep = revalidate_ukey(udpif, ukey, f);
1498             }
1499             ukey->dump_seq = dump_seq;
1500             ukey->flow_exists = keep;
1501
1502             if (!keep) {
1503                 dump_op_init(&ops[n_ops++], f->key, f->key_len, ukey);
1504             }
1505             ovs_mutex_unlock(&ukey->mutex);
1506         }
1507
1508         if (n_ops) {
1509             push_dump_ops__(udpif, ops, n_ops);
1510         }
1511     }
1512     dpif_flow_dump_thread_destroy(dump_thread);
1513 }
1514
1515 /* Called with exclusive access to 'revalidator' and 'ukey'. */
1516 static bool
1517 handle_missed_revalidation(struct revalidator *revalidator,
1518                            struct udpif_key *ukey)
1519     OVS_NO_THREAD_SAFETY_ANALYSIS
1520 {
1521     struct udpif *udpif = revalidator->udpif;
1522     struct dpif_flow flow;
1523     struct ofpbuf *buf;
1524     bool keep = false;
1525
1526     COVERAGE_INC(revalidate_missed_dp_flow);
1527
1528     if (!dpif_flow_get(udpif->dpif, ukey->key, ukey->key_len, &buf, &flow)) {
1529         keep = revalidate_ukey(udpif, ukey, &flow);
1530         ofpbuf_delete(buf);
1531     }
1532
1533     return keep;
1534 }
1535
1536 static void
1537 revalidator_sweep__(struct revalidator *revalidator, bool purge)
1538     OVS_NO_THREAD_SAFETY_ANALYSIS
1539 {
1540     struct dump_op ops[REVALIDATE_MAX_BATCH];
1541     struct udpif_key *ukey, *next;
1542     size_t n_ops;
1543     uint64_t dump_seq;
1544
1545     n_ops = 0;
1546     dump_seq = seq_read(revalidator->udpif->dump_seq);
1547
1548     /* During garbage collection, this revalidator completely owns its ukeys
1549      * map, and therefore doesn't need to do any locking. */
1550     HMAP_FOR_EACH_SAFE (ukey, next, hmap_node, revalidator->ukeys) {
1551         if (ukey->flow_exists
1552             && (purge
1553                 || (ukey->dump_seq != dump_seq
1554                     && revalidator->udpif->need_revalidate
1555                     && !handle_missed_revalidation(revalidator, ukey)))) {
1556             struct dump_op *op = &ops[n_ops++];
1557
1558             dump_op_init(op, ukey->key, ukey->key_len, ukey);
1559             if (n_ops == REVALIDATE_MAX_BATCH) {
1560                 push_dump_ops(revalidator, ops, n_ops);
1561                 n_ops = 0;
1562             }
1563         } else if (!ukey->flow_exists) {
1564             ukey_delete(revalidator, ukey);
1565         }
1566     }
1567
1568     if (n_ops) {
1569         push_dump_ops(revalidator, ops, n_ops);
1570     }
1571 }
1572
1573 static void
1574 revalidator_sweep(struct revalidator *revalidator)
1575 {
1576     revalidator_sweep__(revalidator, false);
1577 }
1578
1579 static void
1580 revalidator_purge(struct revalidator *revalidator)
1581 {
1582     revalidator_sweep__(revalidator, true);
1583 }
1584 \f
1585 static void
1586 upcall_unixctl_show(struct unixctl_conn *conn, int argc OVS_UNUSED,
1587                     const char *argv[] OVS_UNUSED, void *aux OVS_UNUSED)
1588 {
1589     struct ds ds = DS_EMPTY_INITIALIZER;
1590     struct udpif *udpif;
1591
1592     LIST_FOR_EACH (udpif, list_node, &all_udpifs) {
1593         unsigned int flow_limit;
1594         size_t i;
1595
1596         atomic_read(&udpif->flow_limit, &flow_limit);
1597
1598         ds_put_format(&ds, "%s:\n", dpif_name(udpif->dpif));
1599         ds_put_format(&ds, "\tflows         : (current %lu)"
1600             " (avg %u) (max %u) (limit %u)\n", udpif_get_n_flows(udpif),
1601             udpif->avg_n_flows, udpif->max_n_flows, flow_limit);
1602         ds_put_format(&ds, "\tdump duration : %lldms\n", udpif->dump_duration);
1603
1604         ds_put_char(&ds, '\n');
1605         for (i = 0; i < n_revalidators; i++) {
1606             struct revalidator *revalidator = &udpif->revalidators[i];
1607
1608             ovs_mutex_lock(&udpif->ukeys[i].mutex);
1609             ds_put_format(&ds, "\t%u: (keys %"PRIuSIZE")\n",
1610                           revalidator->id, hmap_count(&udpif->ukeys[i].hmap));
1611             ovs_mutex_unlock(&udpif->ukeys[i].mutex);
1612         }
1613     }
1614
1615     unixctl_command_reply(conn, ds_cstr(&ds));
1616     ds_destroy(&ds);
1617 }
1618
1619 /* Disable using the megaflows.
1620  *
1621  * This command is only needed for advanced debugging, so it's not
1622  * documented in the man page. */
1623 static void
1624 upcall_unixctl_disable_megaflows(struct unixctl_conn *conn,
1625                                  int argc OVS_UNUSED,
1626                                  const char *argv[] OVS_UNUSED,
1627                                  void *aux OVS_UNUSED)
1628 {
1629     atomic_store(&enable_megaflows, false);
1630     udpif_flush_all_datapaths();
1631     unixctl_command_reply(conn, "megaflows disabled");
1632 }
1633
1634 /* Re-enable using megaflows.
1635  *
1636  * This command is only needed for advanced debugging, so it's not
1637  * documented in the man page. */
1638 static void
1639 upcall_unixctl_enable_megaflows(struct unixctl_conn *conn,
1640                                 int argc OVS_UNUSED,
1641                                 const char *argv[] OVS_UNUSED,
1642                                 void *aux OVS_UNUSED)
1643 {
1644     atomic_store(&enable_megaflows, true);
1645     udpif_flush_all_datapaths();
1646     unixctl_command_reply(conn, "megaflows enabled");
1647 }
1648
1649 /* Set the flow limit.
1650  *
1651  * This command is only needed for advanced debugging, so it's not
1652  * documented in the man page. */
1653 static void
1654 upcall_unixctl_set_flow_limit(struct unixctl_conn *conn,
1655                               int argc OVS_UNUSED,
1656                               const char *argv[] OVS_UNUSED,
1657                               void *aux OVS_UNUSED)
1658 {
1659     struct ds ds = DS_EMPTY_INITIALIZER;
1660     struct udpif *udpif;
1661     unsigned int flow_limit = atoi(argv[1]);
1662
1663     LIST_FOR_EACH (udpif, list_node, &all_udpifs) {
1664         atomic_store(&udpif->flow_limit, flow_limit);
1665     }
1666     ds_put_format(&ds, "set flow_limit to %u\n", flow_limit);
1667     unixctl_command_reply(conn, ds_cstr(&ds));
1668     ds_destroy(&ds);
1669 }
1670
1671 static void
1672 upcall_unixctl_dump_wait(struct unixctl_conn *conn,
1673                          int argc OVS_UNUSED,
1674                          const char *argv[] OVS_UNUSED,
1675                          void *aux OVS_UNUSED)
1676 {
1677     if (list_is_singleton(&all_udpifs)) {
1678         struct udpif *udpif;
1679         size_t len;
1680
1681         udpif = OBJECT_CONTAINING(list_front(&all_udpifs), udpif, list_node);
1682         len = (udpif->n_conns + 1) * sizeof *udpif->conns;
1683         udpif->conn_seq = seq_read(udpif->dump_seq);
1684         udpif->conns = xrealloc(udpif->conns, len);
1685         udpif->conns[udpif->n_conns++] = conn;
1686     } else {
1687         unixctl_command_reply_error(conn, "can't wait on multiple udpifs.");
1688     }
1689 }