ovs-thread: Implement OVS specific barrier.
[cascardo/ovs.git] / ofproto / ofproto-dpif-upcall.c
1 /* Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014 Nicira, Inc.
2  *
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.  */
14
15 #include <config.h>
16 #include "ofproto-dpif-upcall.h"
17
18 #include <errno.h>
19 #include <stdbool.h>
20 #include <inttypes.h>
21
22 #include "connmgr.h"
23 #include "coverage.h"
24 #include "dpif.h"
25 #include "dynamic-string.h"
26 #include "fail-open.h"
27 #include "guarded-list.h"
28 #include "latch.h"
29 #include "list.h"
30 #include "netlink.h"
31 #include "ofpbuf.h"
32 #include "ofproto-dpif-ipfix.h"
33 #include "ofproto-dpif-sflow.h"
34 #include "ofproto-dpif-xlate.h"
35 #include "ovs-rcu.h"
36 #include "packets.h"
37 #include "poll-loop.h"
38 #include "seq.h"
39 #include "unixctl.h"
40 #include "vlog.h"
41
42 #define MAX_QUEUE_LENGTH 512
43 #define FLOW_MISS_MAX_BATCH 50
44 #define REVALIDATE_MAX_BATCH 50
45
46 VLOG_DEFINE_THIS_MODULE(ofproto_dpif_upcall);
47
48 COVERAGE_DEFINE(upcall_duplicate_flow);
49
50 /* A thread that reads upcalls from dpif, forwards each upcall's packet,
51  * and possibly sets up a kernel flow as a cache. */
52 struct handler {
53     struct udpif *udpif;               /* Parent udpif. */
54     pthread_t thread;                  /* Thread ID. */
55     uint32_t handler_id;               /* Handler id. */
56 };
57
58 /* A thread that processes datapath flows, updates OpenFlow statistics, and
59  * updates or removes them if necessary. */
60 struct revalidator {
61     struct udpif *udpif;               /* Parent udpif. */
62     pthread_t thread;                  /* Thread ID. */
63     unsigned int id;                   /* ovsthread_id_self(). */
64     struct hmap *ukeys;                /* Points into udpif->ukeys for this
65                                           revalidator. Used for GC phase. */
66 };
67
68 /* An upcall handler for ofproto_dpif.
69  *
70  * udpif keeps records of two kind of logically separate units:
71  *
72  * upcall handling
73  * ---------------
74  *
75  *    - An array of 'struct handler's for upcall handling and flow
76  *      installation.
77  *
78  * flow revalidation
79  * -----------------
80  *
81  *    - Revalidation threads which read the datapath flow table and maintains
82  *      them.
83  */
84 struct udpif {
85     struct list list_node;             /* In all_udpifs list. */
86
87     struct dpif *dpif;                 /* Datapath handle. */
88     struct dpif_backer *backer;        /* Opaque dpif_backer pointer. */
89
90     uint32_t secret;                   /* Random seed for upcall hash. */
91
92     struct handler *handlers;          /* Upcall handlers. */
93     size_t n_handlers;
94
95     struct revalidator *revalidators;  /* Flow revalidators. */
96     size_t n_revalidators;
97
98     struct latch exit_latch;           /* Tells child threads to exit. */
99
100     /* Revalidation. */
101     struct seq *reval_seq;             /* Incremented to force revalidation. */
102     bool need_revalidate;              /* As indicated by 'reval_seq'. */
103     bool reval_exit;                   /* Set by leader on 'exit_latch. */
104     struct ovs_barrier reval_barrier;  /* Barrier used by revalidators. */
105     struct dpif_flow_dump dump;        /* DPIF flow dump state. */
106     long long int dump_duration;       /* Duration of the last flow dump. */
107     struct seq *dump_seq;              /* Increments each dump iteration. */
108
109     /* There are 'n_revalidators' ukey hmaps. Each revalidator retains a
110      * reference to one of these for garbage collection.
111      *
112      * During the flow dump phase, revalidators insert into these with a random
113      * distribution. During the garbage collection phase, each revalidator
114      * takes care of garbage collecting one of these hmaps. */
115     struct {
116         struct ovs_mutex mutex;        /* Guards the following. */
117         struct hmap hmap OVS_GUARDED;  /* Datapath flow keys. */
118     } *ukeys;
119
120     /* Datapath flow statistics. */
121     unsigned int max_n_flows;
122     unsigned int avg_n_flows;
123
124     /* Following fields are accessed and modified by different threads. */
125     atomic_uint flow_limit;            /* Datapath flow hard limit. */
126
127     /* n_flows_mutex prevents multiple threads updating these concurrently. */
128     atomic_ulong n_flows;           /* Number of flows in the datapath. */
129     atomic_llong n_flows_timestamp;    /* Last time n_flows was updated. */
130     struct ovs_mutex n_flows_mutex;
131 };
132
133 enum upcall_type {
134     BAD_UPCALL,                 /* Some kind of bug somewhere. */
135     MISS_UPCALL,                /* A flow miss.  */
136     SFLOW_UPCALL,               /* sFlow sample. */
137     FLOW_SAMPLE_UPCALL,         /* Per-flow sampling. */
138     IPFIX_UPCALL                /* Per-bridge sampling. */
139 };
140
141 struct upcall {
142     struct flow_miss *flow_miss;    /* This upcall's flow_miss. */
143
144     /* Raw upcall plus data for keeping track of the memory backing it. */
145     struct dpif_upcall dpif_upcall; /* As returned by dpif_recv() */
146     struct ofpbuf upcall_buf;       /* Owns some data in 'dpif_upcall'. */
147     uint64_t upcall_stub[512 / 8];  /* Buffer to reduce need for malloc(). */
148 };
149
150 /* 'udpif_key's are responsible for tracking the little bit of state udpif
151  * needs to do flow expiration which can't be pulled directly from the
152  * datapath.  They may be created or maintained by any revalidator during
153  * the dump phase, but are owned by a single revalidator, and are destroyed
154  * by that revalidator during the garbage-collection phase.
155  *
156  * While some elements of a udpif_key are protected by a mutex, the ukey itself
157  * is not.  Therefore it is not safe to destroy a udpif_key except when all
158  * revalidators are in garbage collection phase, or they aren't running. */
159 struct udpif_key {
160     struct hmap_node hmap_node;     /* In parent revalidator 'ukeys' map. */
161
162     /* These elements are read only once created, and therefore aren't
163      * protected by a mutex. */
164     const struct nlattr *key;      /* Datapath flow key. */
165     size_t key_len;                /* Length of 'key'. */
166
167     struct ovs_mutex mutex;                   /* Guards the following. */
168     struct dpif_flow_stats stats OVS_GUARDED; /* Last known stats.*/
169     long long int created OVS_GUARDED;        /* Estimate of creation time. */
170     bool mark OVS_GUARDED;                    /* For mark and sweep garbage
171                                                  collection. */
172     bool flow_exists OVS_GUARDED;             /* Ensures flows are only deleted
173                                                  once. */
174
175     struct xlate_cache *xcache OVS_GUARDED;   /* Cache for xlate entries that
176                                                * are affected by this ukey.
177                                                * Used for stats and learning.*/
178     struct odputil_keybuf key_buf;            /* Memory for 'key'. */
179 };
180
181 /* Flow miss batching.
182  *
183  * Some dpifs implement operations faster when you hand them off in a batch.
184  * To allow batching, "struct flow_miss" queues the dpif-related work needed
185  * for a given flow.  Each "struct flow_miss" corresponds to sending one or
186  * more packets, plus possibly installing the flow in the dpif. */
187 struct flow_miss {
188     struct hmap_node hmap_node;
189     struct ofproto_dpif *ofproto;
190
191     struct flow flow;
192     const struct nlattr *key;
193     size_t key_len;
194     enum dpif_upcall_type upcall_type;
195     struct dpif_flow_stats stats;
196     odp_port_t odp_in_port;
197
198     uint64_t slow_path_buf[128 / 8];
199     struct odputil_keybuf mask_buf;
200
201     struct xlate_out xout;
202
203     bool put;
204 };
205
206 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
207 static struct list all_udpifs = LIST_INITIALIZER(&all_udpifs);
208
209 static size_t read_upcalls(struct handler *,
210                            struct upcall upcalls[FLOW_MISS_MAX_BATCH],
211                            struct flow_miss miss_buf[FLOW_MISS_MAX_BATCH],
212                            struct hmap *);
213 static void handle_upcalls(struct handler *, struct hmap *, struct upcall *,
214                            size_t n_upcalls);
215 static void udpif_stop_threads(struct udpif *);
216 static void udpif_start_threads(struct udpif *, size_t n_handlers,
217                                 size_t n_revalidators);
218 static void *udpif_upcall_handler(void *);
219 static void *udpif_revalidator(void *);
220 static unsigned long udpif_get_n_flows(struct udpif *);
221 static void revalidate(struct revalidator *);
222 static void revalidator_sweep(struct revalidator *);
223 static void revalidator_purge(struct revalidator *);
224 static void upcall_unixctl_show(struct unixctl_conn *conn, int argc,
225                                 const char *argv[], void *aux);
226 static void upcall_unixctl_disable_megaflows(struct unixctl_conn *, int argc,
227                                              const char *argv[], void *aux);
228 static void upcall_unixctl_enable_megaflows(struct unixctl_conn *, int argc,
229                                             const char *argv[], void *aux);
230 static void upcall_unixctl_set_flow_limit(struct unixctl_conn *conn, int argc,
231                                             const char *argv[], void *aux);
232
233 static struct udpif_key *ukey_create(const struct nlattr *key, size_t key_len,
234                                      long long int used);
235 static void ukey_delete(struct revalidator *, struct udpif_key *);
236
237 static atomic_bool enable_megaflows = ATOMIC_VAR_INIT(true);
238
239 struct udpif *
240 udpif_create(struct dpif_backer *backer, struct dpif *dpif)
241 {
242     static struct ovsthread_once once = OVSTHREAD_ONCE_INITIALIZER;
243     struct udpif *udpif = xzalloc(sizeof *udpif);
244
245     if (ovsthread_once_start(&once)) {
246         unixctl_command_register("upcall/show", "", 0, 0, upcall_unixctl_show,
247                                  NULL);
248         unixctl_command_register("upcall/disable-megaflows", "", 0, 0,
249                                  upcall_unixctl_disable_megaflows, NULL);
250         unixctl_command_register("upcall/enable-megaflows", "", 0, 0,
251                                  upcall_unixctl_enable_megaflows, NULL);
252         unixctl_command_register("upcall/set-flow-limit", "", 1, 1,
253                                  upcall_unixctl_set_flow_limit, NULL);
254         ovsthread_once_done(&once);
255     }
256
257     udpif->dpif = dpif;
258     udpif->backer = backer;
259     atomic_init(&udpif->flow_limit, MIN(ofproto_flow_limit, 10000));
260     udpif->secret = random_uint32();
261     udpif->reval_seq = seq_create();
262     udpif->dump_seq = seq_create();
263     latch_init(&udpif->exit_latch);
264     list_push_back(&all_udpifs, &udpif->list_node);
265     atomic_init(&udpif->n_flows, 0);
266     atomic_init(&udpif->n_flows_timestamp, LLONG_MIN);
267     ovs_mutex_init(&udpif->n_flows_mutex);
268
269     return udpif;
270 }
271
272 void
273 udpif_destroy(struct udpif *udpif)
274 {
275     udpif_stop_threads(udpif);
276
277     list_remove(&udpif->list_node);
278     latch_destroy(&udpif->exit_latch);
279     seq_destroy(udpif->reval_seq);
280     seq_destroy(udpif->dump_seq);
281     ovs_mutex_destroy(&udpif->n_flows_mutex);
282     free(udpif);
283 }
284
285 /* Stops the handler and revalidator threads, must be enclosed in
286  * ovsrcu quiescent state unless when destroying udpif. */
287 static void
288 udpif_stop_threads(struct udpif *udpif)
289 {
290     if (udpif && (udpif->n_handlers != 0 || udpif->n_revalidators != 0)) {
291         size_t i;
292
293         latch_set(&udpif->exit_latch);
294
295         for (i = 0; i < udpif->n_handlers; i++) {
296             struct handler *handler = &udpif->handlers[i];
297
298             xpthread_join(handler->thread, NULL);
299         }
300
301         for (i = 0; i < udpif->n_revalidators; i++) {
302             xpthread_join(udpif->revalidators[i].thread, NULL);
303         }
304
305         for (i = 0; i < udpif->n_revalidators; i++) {
306             struct revalidator *revalidator = &udpif->revalidators[i];
307
308             /* Delete ukeys, and delete all flows from the datapath to prevent
309              * double-counting stats. */
310             revalidator_purge(revalidator);
311
312             hmap_destroy(&udpif->ukeys[i].hmap);
313             ovs_mutex_destroy(&udpif->ukeys[i].mutex);
314         }
315
316         latch_poll(&udpif->exit_latch);
317
318         ovs_barrier_destroy(&udpif->reval_barrier);
319
320         free(udpif->revalidators);
321         udpif->revalidators = NULL;
322         udpif->n_revalidators = 0;
323
324         free(udpif->handlers);
325         udpif->handlers = NULL;
326         udpif->n_handlers = 0;
327
328         free(udpif->ukeys);
329         udpif->ukeys = NULL;
330     }
331 }
332
333 /* Starts the handler and revalidator threads, must be enclosed in
334  * ovsrcu quiescent state. */
335 static void
336 udpif_start_threads(struct udpif *udpif, size_t n_handlers,
337                     size_t n_revalidators)
338 {
339     if (udpif && n_handlers && n_revalidators) {
340         size_t i;
341
342         udpif->n_handlers = n_handlers;
343         udpif->n_revalidators = n_revalidators;
344
345         udpif->handlers = xzalloc(udpif->n_handlers * sizeof *udpif->handlers);
346         for (i = 0; i < udpif->n_handlers; i++) {
347             struct handler *handler = &udpif->handlers[i];
348
349             handler->udpif = udpif;
350             handler->handler_id = i;
351             handler->thread = ovs_thread_create(
352                 "handler", udpif_upcall_handler, handler);
353         }
354
355         ovs_barrier_init(&udpif->reval_barrier, udpif->n_revalidators);
356         udpif->reval_exit = false;
357         udpif->revalidators = xzalloc(udpif->n_revalidators
358                                       * sizeof *udpif->revalidators);
359         udpif->ukeys = xmalloc(sizeof *udpif->ukeys * n_revalidators);
360         for (i = 0; i < udpif->n_revalidators; i++) {
361             struct revalidator *revalidator = &udpif->revalidators[i];
362
363             revalidator->udpif = udpif;
364             hmap_init(&udpif->ukeys[i].hmap);
365             ovs_mutex_init(&udpif->ukeys[i].mutex);
366             revalidator->ukeys = &udpif->ukeys[i].hmap;
367             revalidator->thread = ovs_thread_create(
368                 "revalidator", udpif_revalidator, revalidator);
369         }
370     }
371 }
372
373 /* Tells 'udpif' how many threads it should use to handle upcalls.
374  * 'n_handlers' and 'n_revalidators' can never be zero.  'udpif''s
375  * datapath handle must have packet reception enabled before starting
376  * threads. */
377 void
378 udpif_set_threads(struct udpif *udpif, size_t n_handlers,
379                   size_t n_revalidators)
380 {
381     ovs_assert(udpif);
382     ovs_assert(n_handlers && n_revalidators);
383
384     ovsrcu_quiesce_start();
385     if (udpif->n_handlers != n_handlers
386         || udpif->n_revalidators != n_revalidators) {
387         udpif_stop_threads(udpif);
388     }
389
390     if (!udpif->handlers && !udpif->revalidators) {
391         int error;
392
393         error = dpif_handlers_set(udpif->dpif, n_handlers);
394         if (error) {
395             VLOG_ERR("failed to configure handlers in dpif %s: %s",
396                      dpif_name(udpif->dpif), ovs_strerror(error));
397             return;
398         }
399
400         udpif_start_threads(udpif, n_handlers, n_revalidators);
401     }
402     ovsrcu_quiesce_end();
403 }
404
405 /* Waits for all ongoing upcall translations to complete.  This ensures that
406  * there are no transient references to any removed ofprotos (or other
407  * objects).  In particular, this should be called after an ofproto is removed
408  * (e.g. via xlate_remove_ofproto()) but before it is destroyed. */
409 void
410 udpif_synchronize(struct udpif *udpif)
411 {
412     /* This is stronger than necessary.  It would be sufficient to ensure
413      * (somehow) that each handler and revalidator thread had passed through
414      * its main loop once. */
415     size_t n_handlers = udpif->n_handlers;
416     size_t n_revalidators = udpif->n_revalidators;
417
418     ovsrcu_quiesce_start();
419     udpif_stop_threads(udpif);
420     udpif_start_threads(udpif, n_handlers, n_revalidators);
421     ovsrcu_quiesce_end();
422 }
423
424 /* Notifies 'udpif' that something changed which may render previous
425  * xlate_actions() results invalid. */
426 void
427 udpif_revalidate(struct udpif *udpif)
428 {
429     seq_change(udpif->reval_seq);
430 }
431
432 /* Returns a seq which increments every time 'udpif' pulls stats from the
433  * datapath.  Callers can use this to get a sense of when might be a good time
434  * to do periodic work which relies on relatively up to date statistics. */
435 struct seq *
436 udpif_dump_seq(struct udpif *udpif)
437 {
438     return udpif->dump_seq;
439 }
440
441 void
442 udpif_get_memory_usage(struct udpif *udpif, struct simap *usage)
443 {
444     size_t i;
445
446     simap_increase(usage, "handlers", udpif->n_handlers);
447
448     simap_increase(usage, "revalidators", udpif->n_revalidators);
449     for (i = 0; i < udpif->n_revalidators; i++) {
450         ovs_mutex_lock(&udpif->ukeys[i].mutex);
451         simap_increase(usage, "udpif keys", hmap_count(&udpif->ukeys[i].hmap));
452         ovs_mutex_unlock(&udpif->ukeys[i].mutex);
453     }
454 }
455
456 /* Remove flows from a single datapath. */
457 void
458 udpif_flush(struct udpif *udpif)
459 {
460     size_t n_handlers, n_revalidators;
461
462     n_handlers = udpif->n_handlers;
463     n_revalidators = udpif->n_revalidators;
464
465     ovsrcu_quiesce_start();
466
467     udpif_stop_threads(udpif);
468     dpif_flow_flush(udpif->dpif);
469     udpif_start_threads(udpif, n_handlers, n_revalidators);
470
471     ovsrcu_quiesce_end();
472 }
473
474 /* Removes all flows from all datapaths. */
475 static void
476 udpif_flush_all_datapaths(void)
477 {
478     struct udpif *udpif;
479
480     LIST_FOR_EACH (udpif, list_node, &all_udpifs) {
481         udpif_flush(udpif);
482     }
483 }
484
485 \f
486 static unsigned long
487 udpif_get_n_flows(struct udpif *udpif)
488 {
489     long long int time, now;
490     unsigned long flow_count;
491
492     now = time_msec();
493     atomic_read(&udpif->n_flows_timestamp, &time);
494     if (time < now - 100 && !ovs_mutex_trylock(&udpif->n_flows_mutex)) {
495         struct dpif_dp_stats stats;
496
497         atomic_store(&udpif->n_flows_timestamp, now);
498         dpif_get_dp_stats(udpif->dpif, &stats);
499         flow_count = stats.n_flows;
500         atomic_store(&udpif->n_flows, flow_count);
501         ovs_mutex_unlock(&udpif->n_flows_mutex);
502     } else {
503         atomic_read(&udpif->n_flows, &flow_count);
504     }
505     return flow_count;
506 }
507
508 /* The upcall handler thread tries to read a batch of FLOW_MISS_MAX_BATCH
509  * upcalls from dpif, processes the batch and installs corresponding flows
510  * in dpif. */
511 static void *
512 udpif_upcall_handler(void *arg)
513 {
514     struct handler *handler = arg;
515     struct udpif *udpif = handler->udpif;
516     struct hmap misses = HMAP_INITIALIZER(&misses);
517
518     while (!latch_is_set(&handler->udpif->exit_latch)) {
519         struct upcall upcalls[FLOW_MISS_MAX_BATCH];
520         struct flow_miss miss_buf[FLOW_MISS_MAX_BATCH];
521         struct flow_miss *miss;
522         size_t n_upcalls, i;
523
524         n_upcalls = read_upcalls(handler, upcalls, miss_buf, &misses);
525         if (!n_upcalls) {
526             dpif_recv_wait(udpif->dpif, handler->handler_id);
527             latch_wait(&udpif->exit_latch);
528             poll_block();
529         } else {
530             handle_upcalls(handler, &misses, upcalls, n_upcalls);
531
532             HMAP_FOR_EACH (miss, hmap_node, &misses) {
533                 xlate_out_uninit(&miss->xout);
534             }
535             hmap_clear(&misses);
536             for (i = 0; i < n_upcalls; i++) {
537                 ofpbuf_uninit(&upcalls[i].dpif_upcall.packet);
538                 ofpbuf_uninit(&upcalls[i].upcall_buf);
539             }
540         }
541         coverage_clear();
542     }
543     hmap_destroy(&misses);
544
545     return NULL;
546 }
547
548 static void *
549 udpif_revalidator(void *arg)
550 {
551     /* Used by all revalidators. */
552     struct revalidator *revalidator = arg;
553     struct udpif *udpif = revalidator->udpif;
554     bool leader = revalidator == &udpif->revalidators[0];
555
556     /* Used only by the leader. */
557     long long int start_time = 0;
558     uint64_t last_reval_seq = 0;
559     unsigned int flow_limit = 0;
560     size_t n_flows = 0;
561
562     revalidator->id = ovsthread_id_self();
563     for (;;) {
564         if (leader) {
565             uint64_t reval_seq;
566
567             reval_seq = seq_read(udpif->reval_seq);
568             udpif->need_revalidate = last_reval_seq != reval_seq;
569             last_reval_seq = reval_seq;
570
571             n_flows = udpif_get_n_flows(udpif);
572             udpif->max_n_flows = MAX(n_flows, udpif->max_n_flows);
573             udpif->avg_n_flows = (udpif->avg_n_flows + n_flows) / 2;
574
575             /* Only the leader checks the exit latch to prevent a race where
576              * some threads think it's true and exit and others think it's
577              * false and block indefinitely on the reval_barrier */
578             udpif->reval_exit = latch_is_set(&udpif->exit_latch);
579
580             start_time = time_msec();
581             if (!udpif->reval_exit) {
582                 dpif_flow_dump_start(&udpif->dump, udpif->dpif);
583             }
584         }
585
586         /* Wait for the leader to start the flow dump. */
587         ovs_barrier_block(&udpif->reval_barrier);
588         if (udpif->reval_exit) {
589             break;
590         }
591         revalidate(revalidator);
592
593         /* Wait for all flows to have been dumped before we garbage collect. */
594         ovs_barrier_block(&udpif->reval_barrier);
595         revalidator_sweep(revalidator);
596
597         /* Wait for all revalidators to finish garbage collection. */
598         ovs_barrier_block(&udpif->reval_barrier);
599
600         if (leader) {
601             long long int duration;
602
603             dpif_flow_dump_done(&udpif->dump);
604             seq_change(udpif->dump_seq);
605
606             duration = MAX(time_msec() - start_time, 1);
607             atomic_read(&udpif->flow_limit, &flow_limit);
608             udpif->dump_duration = duration;
609             if (duration > 2000) {
610                 flow_limit /= duration / 1000;
611             } else if (duration > 1300) {
612                 flow_limit = flow_limit * 3 / 4;
613             } else if (duration < 1000 && n_flows > 2000
614                        && flow_limit < n_flows * 1000 / duration) {
615                 flow_limit += 1000;
616             }
617             flow_limit = MIN(ofproto_flow_limit, MAX(flow_limit, 1000));
618             atomic_store(&udpif->flow_limit, flow_limit);
619
620             if (duration > 2000) {
621                 VLOG_INFO("Spent an unreasonably long %lldms dumping flows",
622                           duration);
623             }
624
625             poll_timer_wait_until(start_time + MIN(ofproto_max_idle, 500));
626             seq_wait(udpif->reval_seq, last_reval_seq);
627             latch_wait(&udpif->exit_latch);
628             poll_block();
629         }
630     }
631
632     return NULL;
633 }
634 \f
635 static enum upcall_type
636 classify_upcall(const struct upcall *upcall)
637 {
638     const struct dpif_upcall *dpif_upcall = &upcall->dpif_upcall;
639     union user_action_cookie cookie;
640     size_t userdata_len;
641
642     /* First look at the upcall type. */
643     switch (dpif_upcall->type) {
644     case DPIF_UC_ACTION:
645         break;
646
647     case DPIF_UC_MISS:
648         return MISS_UPCALL;
649
650     case DPIF_N_UC_TYPES:
651     default:
652         VLOG_WARN_RL(&rl, "upcall has unexpected type %"PRIu32,
653                      dpif_upcall->type);
654         return BAD_UPCALL;
655     }
656
657     /* "action" upcalls need a closer look. */
658     if (!dpif_upcall->userdata) {
659         VLOG_WARN_RL(&rl, "action upcall missing cookie");
660         return BAD_UPCALL;
661     }
662     userdata_len = nl_attr_get_size(dpif_upcall->userdata);
663     if (userdata_len < sizeof cookie.type
664         || userdata_len > sizeof cookie) {
665         VLOG_WARN_RL(&rl, "action upcall cookie has unexpected size %"PRIuSIZE,
666                      userdata_len);
667         return BAD_UPCALL;
668     }
669     memset(&cookie, 0, sizeof cookie);
670     memcpy(&cookie, nl_attr_get(dpif_upcall->userdata), userdata_len);
671     if (userdata_len == MAX(8, sizeof cookie.sflow)
672         && cookie.type == USER_ACTION_COOKIE_SFLOW) {
673         return SFLOW_UPCALL;
674     } else if (userdata_len == MAX(8, sizeof cookie.slow_path)
675                && cookie.type == USER_ACTION_COOKIE_SLOW_PATH) {
676         return MISS_UPCALL;
677     } else if (userdata_len == MAX(8, sizeof cookie.flow_sample)
678                && cookie.type == USER_ACTION_COOKIE_FLOW_SAMPLE) {
679         return FLOW_SAMPLE_UPCALL;
680     } else if (userdata_len == MAX(8, sizeof cookie.ipfix)
681                && cookie.type == USER_ACTION_COOKIE_IPFIX) {
682         return IPFIX_UPCALL;
683     } else {
684         VLOG_WARN_RL(&rl, "invalid user cookie of type %"PRIu16
685                      " and size %"PRIuSIZE, cookie.type, userdata_len);
686         return BAD_UPCALL;
687     }
688 }
689
690 /* Calculates slow path actions for 'xout'.  'buf' must statically be
691  * initialized with at least 128 bytes of space. */
692 static void
693 compose_slow_path(struct udpif *udpif, struct xlate_out *xout,
694                   struct flow *flow, odp_port_t odp_in_port,
695                   struct ofpbuf *buf)
696 {
697     union user_action_cookie cookie;
698     odp_port_t port;
699     uint32_t pid;
700
701     cookie.type = USER_ACTION_COOKIE_SLOW_PATH;
702     cookie.slow_path.unused = 0;
703     cookie.slow_path.reason = xout->slow;
704
705     port = xout->slow & (SLOW_CFM | SLOW_BFD | SLOW_LACP | SLOW_STP)
706         ? ODPP_NONE
707         : odp_in_port;
708     pid = dpif_port_get_pid(udpif->dpif, port, flow_hash_5tuple(flow, 0));
709     odp_put_userspace_action(pid, &cookie, sizeof cookie.slow_path, buf);
710 }
711
712 static struct flow_miss *
713 flow_miss_find(struct hmap *todo, const struct ofproto_dpif *ofproto,
714                const struct flow *flow, uint32_t hash)
715 {
716     struct flow_miss *miss;
717
718     HMAP_FOR_EACH_WITH_HASH (miss, hmap_node, hash, todo) {
719         if (miss->ofproto == ofproto && flow_equal(&miss->flow, flow)) {
720             return miss;
721         }
722     }
723
724     return NULL;
725 }
726
727 /* Reads and classifies upcalls.  Returns the number of upcalls successfully
728  * read. */
729 static size_t
730 read_upcalls(struct handler *handler,
731              struct upcall upcalls[FLOW_MISS_MAX_BATCH],
732              struct flow_miss miss_buf[FLOW_MISS_MAX_BATCH],
733              struct hmap *misses)
734 {
735     struct udpif *udpif = handler->udpif;
736     size_t i;
737     size_t n_misses = 0;
738     size_t n_upcalls = 0;
739
740     /*
741      * Try reading FLOW_MISS_MAX_BATCH upcalls from dpif.
742      *
743      * Extract the flow from each upcall.  Construct in 'misses' a hash table
744      * that maps each unique flow to a 'struct flow_miss'.
745      *
746      * Most commonly there is a single packet per flow_miss, but there are
747      * several reasons why there might be more than one, e.g.:
748      *
749      *   - The dpif packet interface does not support TSO (or UFO, etc.), so a
750      *     large packet sent to userspace is split into a sequence of smaller
751      *     ones.
752      *
753      *   - A stream of quickly arriving packets in an established "slow-pathed"
754      *     flow.
755      *
756      *   - Rarely, a stream of quickly arriving packets in a flow not yet
757      *     established.  (This is rare because most protocols do not send
758      *     multiple back-to-back packets before receiving a reply from the
759      *     other end of the connection, which gives OVS a chance to set up a
760      *     datapath flow.)
761      */
762     for (i = 0; i < FLOW_MISS_MAX_BATCH; i++) {
763         struct upcall *upcall = &upcalls[n_upcalls];
764         struct flow_miss *miss = &miss_buf[n_misses];
765         struct dpif_upcall *dupcall;
766         struct ofpbuf *packet;
767         struct flow_miss *existing_miss;
768         struct ofproto_dpif *ofproto;
769         struct dpif_sflow *sflow;
770         struct dpif_ipfix *ipfix;
771         struct flow flow;
772         enum upcall_type type;
773         odp_port_t odp_in_port;
774         int error;
775
776         ofpbuf_use_stub(&upcall->upcall_buf, upcall->upcall_stub,
777                         sizeof upcall->upcall_stub);
778         error = dpif_recv(udpif->dpif, handler->handler_id,
779                           &upcall->dpif_upcall, &upcall->upcall_buf);
780         if (error) {
781             ofpbuf_uninit(&upcall->upcall_buf);
782             break;
783         }
784
785         dupcall = &upcall->dpif_upcall;
786         packet = &dupcall->packet;
787         error = xlate_receive(udpif->backer, packet, dupcall->key,
788                               dupcall->key_len, &flow,
789                               &ofproto, &ipfix, &sflow, NULL, &odp_in_port);
790         if (error) {
791             if (error == ENODEV) {
792                 /* Received packet on datapath port for which we couldn't
793                  * associate an ofproto.  This can happen if a port is removed
794                  * while traffic is being received.  Print a rate-limited
795                  * message in case it happens frequently.  Install a drop flow
796                  * so that future packets of the flow are inexpensively dropped
797                  * in the kernel. */
798                 VLOG_INFO_RL(&rl, "received packet on unassociated datapath "
799                              "port %"PRIu32, odp_in_port);
800                 dpif_flow_put(udpif->dpif, DPIF_FP_CREATE | DPIF_FP_MODIFY,
801                               dupcall->key, dupcall->key_len, NULL, 0, NULL, 0,
802                               NULL);
803             }
804             goto destroy_upcall;
805         }
806
807         type = classify_upcall(upcall);
808         if (type == MISS_UPCALL) {
809             uint32_t hash;
810             struct pkt_metadata md = pkt_metadata_from_flow(&flow);
811
812             flow_extract(packet, &md, &miss->flow);
813             hash = flow_hash(&miss->flow, 0);
814             existing_miss = flow_miss_find(misses, ofproto, &miss->flow,
815                                            hash);
816             if (!existing_miss) {
817                 hmap_insert(misses, &miss->hmap_node, hash);
818                 miss->ofproto = ofproto;
819                 miss->key = dupcall->key;
820                 miss->key_len = dupcall->key_len;
821                 miss->upcall_type = dupcall->type;
822                 miss->stats.n_packets = 0;
823                 miss->stats.n_bytes = 0;
824                 miss->stats.used = time_msec();
825                 miss->stats.tcp_flags = 0;
826                 miss->odp_in_port = odp_in_port;
827                 miss->put = false;
828                 n_misses++;
829             } else {
830                 miss = existing_miss;
831             }
832             miss->stats.tcp_flags |= ntohs(miss->flow.tcp_flags);
833             miss->stats.n_bytes += ofpbuf_size(packet);
834             miss->stats.n_packets++;
835
836             upcall->flow_miss = miss;
837             n_upcalls++;
838             continue;
839         }
840
841         switch (type) {
842         case SFLOW_UPCALL:
843             if (sflow) {
844                 union user_action_cookie cookie;
845
846                 memset(&cookie, 0, sizeof cookie);
847                 memcpy(&cookie, nl_attr_get(dupcall->userdata),
848                        sizeof cookie.sflow);
849                 dpif_sflow_received(sflow, packet, &flow, odp_in_port,
850                                     &cookie);
851             }
852             break;
853         case IPFIX_UPCALL:
854             if (ipfix) {
855                 dpif_ipfix_bridge_sample(ipfix, packet, &flow);
856             }
857             break;
858         case FLOW_SAMPLE_UPCALL:
859             if (ipfix) {
860                 union user_action_cookie cookie;
861
862                 memset(&cookie, 0, sizeof cookie);
863                 memcpy(&cookie, nl_attr_get(dupcall->userdata),
864                        sizeof cookie.flow_sample);
865
866                 /* The flow reflects exactly the contents of the packet.
867                  * Sample the packet using it. */
868                 dpif_ipfix_flow_sample(ipfix, packet, &flow,
869                                        cookie.flow_sample.collector_set_id,
870                                        cookie.flow_sample.probability,
871                                        cookie.flow_sample.obs_domain_id,
872                                        cookie.flow_sample.obs_point_id);
873             }
874             break;
875         case BAD_UPCALL:
876             break;
877         case MISS_UPCALL:
878             OVS_NOT_REACHED();
879         }
880
881         dpif_ipfix_unref(ipfix);
882         dpif_sflow_unref(sflow);
883
884 destroy_upcall:
885         ofpbuf_uninit(&upcall->dpif_upcall.packet);
886         ofpbuf_uninit(&upcall->upcall_buf);
887     }
888
889     return n_upcalls;
890 }
891
892 static void
893 handle_upcalls(struct handler *handler, struct hmap *misses,
894                struct upcall *upcalls, size_t n_upcalls)
895 {
896     struct udpif *udpif = handler->udpif;
897     struct dpif_op *opsp[FLOW_MISS_MAX_BATCH * 2];
898     struct dpif_op ops[FLOW_MISS_MAX_BATCH * 2];
899     struct flow_miss *miss;
900     size_t n_ops, i;
901     unsigned int flow_limit;
902     bool fail_open, may_put;
903
904     atomic_read(&udpif->flow_limit, &flow_limit);
905     may_put = udpif_get_n_flows(udpif) < flow_limit;
906
907     /* Initialize each 'struct flow_miss's ->xout.
908      *
909      * We do this per-flow_miss rather than per-packet because, most commonly,
910      * all the packets in a flow can use the same translation.
911      *
912      * We can't do this in the previous loop because we need the TCP flags for
913      * all the packets in each miss. */
914     fail_open = false;
915     HMAP_FOR_EACH (miss, hmap_node, misses) {
916         struct xlate_in xin;
917
918         xlate_in_init(&xin, miss->ofproto, &miss->flow, NULL,
919                       miss->stats.tcp_flags, NULL);
920         xin.may_learn = true;
921
922         if (miss->upcall_type == DPIF_UC_MISS) {
923             xin.resubmit_stats = &miss->stats;
924         } else {
925             /* For non-miss upcalls, there's a flow in the datapath which this
926              * packet was accounted to.  Presumably the revalidators will deal
927              * with pushing its stats eventually. */
928         }
929
930         xlate_actions(&xin, &miss->xout);
931         fail_open = fail_open || miss->xout.fail_open;
932     }
933
934     /* Now handle the packets individually in order of arrival.  In the common
935      * case each packet of a miss can share the same actions, but slow-pathed
936      * packets need to be translated individually:
937      *
938      *   - For SLOW_CFM, SLOW_LACP, SLOW_STP, and SLOW_BFD, translation is what
939      *     processes received packets for these protocols.
940      *
941      *   - For SLOW_CONTROLLER, translation sends the packet to the OpenFlow
942      *     controller.
943      *
944      * The loop fills 'ops' with an array of operations to execute in the
945      * datapath. */
946     n_ops = 0;
947     for (i = 0; i < n_upcalls; i++) {
948         struct upcall *upcall = &upcalls[i];
949         struct flow_miss *miss = upcall->flow_miss;
950         struct ofpbuf *packet = &upcall->dpif_upcall.packet;
951         struct dpif_op *op;
952         ovs_be16 flow_vlan_tci;
953
954         /* Save a copy of flow.vlan_tci in case it is changed to
955          * generate proper mega flow masks for VLAN splinter flows. */
956         flow_vlan_tci = miss->flow.vlan_tci;
957
958         if (miss->xout.slow) {
959             struct xlate_in xin;
960
961             xlate_in_init(&xin, miss->ofproto, &miss->flow, NULL, 0, packet);
962             xlate_actions_for_side_effects(&xin);
963         }
964
965         if (miss->flow.in_port.ofp_port
966             != vsp_realdev_to_vlandev(miss->ofproto,
967                                       miss->flow.in_port.ofp_port,
968                                       miss->flow.vlan_tci)) {
969             /* This packet was received on a VLAN splinter port.  We
970              * added a VLAN to the packet to make the packet resemble
971              * the flow, but the actions were composed assuming that
972              * the packet contained no VLAN.  So, we must remove the
973              * VLAN header from the packet before trying to execute the
974              * actions. */
975             if (ofpbuf_size(&miss->xout.odp_actions)) {
976                 eth_pop_vlan(packet);
977             }
978
979             /* Remove the flow vlan tags inserted by vlan splinter logic
980              * to ensure megaflow masks generated match the data path flow. */
981             miss->flow.vlan_tci = 0;
982         }
983
984         /* Do not install a flow into the datapath if:
985          *
986          *    - The datapath already has too many flows.
987          *
988          *    - An earlier iteration of this loop already put the same flow.
989          *
990          *    - We received this packet via some flow installed in the kernel
991          *      already. */
992         if (may_put
993             && !miss->put
994             && upcall->dpif_upcall.type == DPIF_UC_MISS) {
995             struct ofpbuf mask;
996             bool megaflow;
997
998             miss->put = true;
999
1000             atomic_read(&enable_megaflows, &megaflow);
1001             ofpbuf_use_stack(&mask, &miss->mask_buf, sizeof miss->mask_buf);
1002             if (megaflow) {
1003                 size_t max_mpls;
1004
1005                 max_mpls = ofproto_dpif_get_max_mpls_depth(miss->ofproto);
1006                 odp_flow_key_from_mask(&mask, &miss->xout.wc.masks,
1007                                        &miss->flow, UINT32_MAX, max_mpls);
1008             }
1009
1010             op = &ops[n_ops++];
1011             op->type = DPIF_OP_FLOW_PUT;
1012             op->u.flow_put.flags = DPIF_FP_CREATE | DPIF_FP_MODIFY;
1013             op->u.flow_put.key = miss->key;
1014             op->u.flow_put.key_len = miss->key_len;
1015             op->u.flow_put.mask = ofpbuf_data(&mask);
1016             op->u.flow_put.mask_len = ofpbuf_size(&mask);
1017             op->u.flow_put.stats = NULL;
1018
1019             if (!miss->xout.slow) {
1020                 op->u.flow_put.actions = ofpbuf_data(&miss->xout.odp_actions);
1021                 op->u.flow_put.actions_len = ofpbuf_size(&miss->xout.odp_actions);
1022             } else {
1023                 struct ofpbuf buf;
1024
1025                 ofpbuf_use_stack(&buf, miss->slow_path_buf,
1026                                  sizeof miss->slow_path_buf);
1027                 compose_slow_path(udpif, &miss->xout, &miss->flow,
1028                                   miss->odp_in_port, &buf);
1029                 op->u.flow_put.actions = ofpbuf_data(&buf);
1030                 op->u.flow_put.actions_len = ofpbuf_size(&buf);
1031             }
1032         }
1033
1034         /*
1035          * The 'miss' may be shared by multiple upcalls. Restore
1036          * the saved flow vlan_tci field before processing the next
1037          * upcall. */
1038         miss->flow.vlan_tci = flow_vlan_tci;
1039
1040         if (ofpbuf_size(&miss->xout.odp_actions)) {
1041
1042             op = &ops[n_ops++];
1043             op->type = DPIF_OP_EXECUTE;
1044             op->u.execute.packet = packet;
1045             odp_key_to_pkt_metadata(miss->key, miss->key_len,
1046                                     &op->u.execute.md);
1047             op->u.execute.actions = ofpbuf_data(&miss->xout.odp_actions);
1048             op->u.execute.actions_len = ofpbuf_size(&miss->xout.odp_actions);
1049             op->u.execute.needs_help = (miss->xout.slow & SLOW_ACTION) != 0;
1050         }
1051     }
1052
1053     /* Special case for fail-open mode.
1054      *
1055      * If we are in fail-open mode, but we are connected to a controller too,
1056      * then we should send the packet up to the controller in the hope that it
1057      * will try to set up a flow and thereby allow us to exit fail-open.
1058      *
1059      * See the top-level comment in fail-open.c for more information.
1060      *
1061      * Copy packets before they are modified by execution. */
1062     if (fail_open) {
1063         for (i = 0; i < n_upcalls; i++) {
1064             struct upcall *upcall = &upcalls[i];
1065             struct flow_miss *miss = upcall->flow_miss;
1066             struct ofpbuf *packet = &upcall->dpif_upcall.packet;
1067             struct ofproto_packet_in *pin;
1068
1069             pin = xmalloc(sizeof *pin);
1070             pin->up.packet = xmemdup(ofpbuf_data(packet), ofpbuf_size(packet));
1071             pin->up.packet_len = ofpbuf_size(packet);
1072             pin->up.reason = OFPR_NO_MATCH;
1073             pin->up.table_id = 0;
1074             pin->up.cookie = OVS_BE64_MAX;
1075             flow_get_metadata(&miss->flow, &pin->up.fmd);
1076             pin->send_len = 0; /* Not used for flow table misses. */
1077             pin->miss_type = OFPROTO_PACKET_IN_NO_MISS;
1078             ofproto_dpif_send_packet_in(miss->ofproto, pin);
1079         }
1080     }
1081
1082     /* Execute batch. */
1083     for (i = 0; i < n_ops; i++) {
1084         opsp[i] = &ops[i];
1085     }
1086     dpif_operate(udpif->dpif, opsp, n_ops);
1087 }
1088
1089 /* Must be called with udpif->ukeys[hash % udpif->n_revalidators].mutex. */
1090 static struct udpif_key *
1091 ukey_lookup__(struct udpif *udpif, const struct nlattr *key, size_t key_len,
1092               uint32_t hash)
1093 {
1094     struct udpif_key *ukey;
1095     struct hmap *hmap = &udpif->ukeys[hash % udpif->n_revalidators].hmap;
1096
1097     HMAP_FOR_EACH_WITH_HASH (ukey, hmap_node, hash, hmap) {
1098         if (ukey->key_len == key_len && !memcmp(ukey->key, key, key_len)) {
1099             return ukey;
1100         }
1101     }
1102     return NULL;
1103 }
1104
1105 static struct udpif_key *
1106 ukey_lookup(struct udpif *udpif, const struct nlattr *key, size_t key_len,
1107             uint32_t hash)
1108 {
1109     struct udpif_key *ukey;
1110     uint32_t idx = hash % udpif->n_revalidators;
1111
1112     ovs_mutex_lock(&udpif->ukeys[idx].mutex);
1113     ukey = ukey_lookup__(udpif, key, key_len, hash);
1114     ovs_mutex_unlock(&udpif->ukeys[idx].mutex);
1115
1116     return ukey;
1117 }
1118
1119 static struct udpif_key *
1120 ukey_create(const struct nlattr *key, size_t key_len, long long int used)
1121 {
1122     struct udpif_key *ukey = xmalloc(sizeof *ukey);
1123     ovs_mutex_init(&ukey->mutex);
1124
1125     ukey->key = (struct nlattr *) &ukey->key_buf;
1126     memcpy(&ukey->key_buf, key, key_len);
1127     ukey->key_len = key_len;
1128
1129     ovs_mutex_lock(&ukey->mutex);
1130     ukey->mark = false;
1131     ukey->flow_exists = true;
1132     ukey->created = used ? used : time_msec();
1133     memset(&ukey->stats, 0, sizeof ukey->stats);
1134     ukey->xcache = NULL;
1135     ovs_mutex_unlock(&ukey->mutex);
1136
1137     return ukey;
1138 }
1139
1140 /* Checks for a ukey in 'udpif->ukeys' with the same 'ukey->key' and 'hash',
1141  * and inserts 'ukey' if it does not exist.
1142  *
1143  * Returns true if 'ukey' was inserted into 'udpif->ukeys', false otherwise. */
1144 static bool
1145 udpif_insert_ukey(struct udpif *udpif, struct udpif_key *ukey, uint32_t hash)
1146 {
1147     struct udpif_key *duplicate;
1148     uint32_t idx = hash % udpif->n_revalidators;
1149     bool ok;
1150
1151     ovs_mutex_lock(&udpif->ukeys[idx].mutex);
1152     duplicate = ukey_lookup__(udpif, ukey->key, ukey->key_len, hash);
1153     if (duplicate) {
1154         ok = false;
1155     } else {
1156         hmap_insert(&udpif->ukeys[idx].hmap, &ukey->hmap_node, hash);
1157         ok = true;
1158     }
1159     ovs_mutex_unlock(&udpif->ukeys[idx].mutex);
1160
1161     return ok;
1162 }
1163
1164 static void
1165 ukey_delete(struct revalidator *revalidator, struct udpif_key *ukey)
1166     OVS_NO_THREAD_SAFETY_ANALYSIS
1167 {
1168     if (revalidator) {
1169         hmap_remove(revalidator->ukeys, &ukey->hmap_node);
1170     }
1171     xlate_cache_delete(ukey->xcache);
1172     ovs_mutex_destroy(&ukey->mutex);
1173     free(ukey);
1174 }
1175
1176 static bool
1177 should_revalidate(const struct udpif *udpif, uint64_t packets,
1178                   long long int used)
1179 {
1180     long long int metric, now, duration;
1181
1182     if (udpif->dump_duration < 200) {
1183         /* We are likely to handle full revalidation for the flows. */
1184         return true;
1185     }
1186
1187     /* Calculate the mean time between seeing these packets. If this
1188      * exceeds the threshold, then delete the flow rather than performing
1189      * costly revalidation for flows that aren't being hit frequently.
1190      *
1191      * This is targeted at situations where the dump_duration is high (~1s),
1192      * and revalidation is triggered by a call to udpif_revalidate(). In
1193      * these situations, revalidation of all flows causes fluctuations in the
1194      * flow_limit due to the interaction with the dump_duration and max_idle.
1195      * This tends to result in deletion of low-throughput flows anyway, so
1196      * skip the revalidation and just delete those flows. */
1197     packets = MAX(packets, 1);
1198     now = MAX(used, time_msec());
1199     duration = now - used;
1200     metric = duration / packets;
1201
1202     if (metric < 200) {
1203         /* The flow is receiving more than ~5pps, so keep it. */
1204         return true;
1205     }
1206     return false;
1207 }
1208
1209 static bool
1210 revalidate_ukey(struct udpif *udpif, struct udpif_key *ukey,
1211                 const struct nlattr *mask, size_t mask_len,
1212                 const struct nlattr *actions, size_t actions_len,
1213                 const struct dpif_flow_stats *stats)
1214     OVS_REQUIRES(ukey->mutex)
1215 {
1216     uint64_t slow_path_buf[128 / 8];
1217     struct xlate_out xout, *xoutp;
1218     struct netflow *netflow;
1219     struct ofproto_dpif *ofproto;
1220     struct dpif_flow_stats push;
1221     struct ofpbuf xout_actions;
1222     struct flow flow, dp_mask;
1223     uint32_t *dp32, *xout32;
1224     odp_port_t odp_in_port;
1225     struct xlate_in xin;
1226     long long int last_used;
1227     int error;
1228     size_t i;
1229     bool may_learn, ok;
1230
1231     ok = false;
1232     xoutp = NULL;
1233     netflow = NULL;
1234
1235     last_used = ukey->stats.used;
1236     push.used = stats->used;
1237     push.tcp_flags = stats->tcp_flags;
1238     push.n_packets = stats->n_packets > ukey->stats.n_packets
1239         ? stats->n_packets - ukey->stats.n_packets
1240         : 0;
1241     push.n_bytes = stats->n_bytes > ukey->stats.n_bytes
1242         ? stats->n_bytes - ukey->stats.n_bytes
1243         : 0;
1244
1245     if (udpif->need_revalidate && last_used
1246         && !should_revalidate(udpif, push.n_packets, last_used)) {
1247         ok = false;
1248         goto exit;
1249     }
1250
1251     /* We will push the stats, so update the ukey stats cache. */
1252     ukey->stats = *stats;
1253     if (!push.n_packets && !udpif->need_revalidate) {
1254         ok = true;
1255         goto exit;
1256     }
1257
1258     may_learn = push.n_packets > 0;
1259     if (ukey->xcache && !udpif->need_revalidate) {
1260         xlate_push_stats(ukey->xcache, may_learn, &push);
1261         ok = true;
1262         goto exit;
1263     }
1264
1265     error = xlate_receive(udpif->backer, NULL, ukey->key, ukey->key_len, &flow,
1266                           &ofproto, NULL, NULL, &netflow, &odp_in_port);
1267     if (error) {
1268         goto exit;
1269     }
1270
1271     if (udpif->need_revalidate) {
1272         xlate_cache_clear(ukey->xcache);
1273     }
1274     if (!ukey->xcache) {
1275         ukey->xcache = xlate_cache_new();
1276     }
1277
1278     xlate_in_init(&xin, ofproto, &flow, NULL, push.tcp_flags, NULL);
1279     xin.resubmit_stats = push.n_packets ? &push : NULL;
1280     xin.xcache = ukey->xcache;
1281     xin.may_learn = may_learn;
1282     xin.skip_wildcards = !udpif->need_revalidate;
1283     xlate_actions(&xin, &xout);
1284     xoutp = &xout;
1285
1286     if (!udpif->need_revalidate) {
1287         ok = true;
1288         goto exit;
1289     }
1290
1291     if (!xout.slow) {
1292         ofpbuf_use_const(&xout_actions, ofpbuf_data(&xout.odp_actions),
1293                          ofpbuf_size(&xout.odp_actions));
1294     } else {
1295         ofpbuf_use_stack(&xout_actions, slow_path_buf, sizeof slow_path_buf);
1296         compose_slow_path(udpif, &xout, &flow, odp_in_port, &xout_actions);
1297     }
1298
1299     if (actions_len != ofpbuf_size(&xout_actions)
1300         || memcmp(ofpbuf_data(&xout_actions), actions, actions_len)) {
1301         goto exit;
1302     }
1303
1304     if (odp_flow_key_to_mask(mask, mask_len, &dp_mask, &flow)
1305         == ODP_FIT_ERROR) {
1306         goto exit;
1307     }
1308
1309     /* Since the kernel is free to ignore wildcarded bits in the mask, we can't
1310      * directly check that the masks are the same.  Instead we check that the
1311      * mask in the kernel is more specific i.e. less wildcarded, than what
1312      * we've calculated here.  This guarantees we don't catch any packets we
1313      * shouldn't with the megaflow. */
1314     dp32 = (uint32_t *) &dp_mask;
1315     xout32 = (uint32_t *) &xout.wc.masks;
1316     for (i = 0; i < FLOW_U32S; i++) {
1317         if ((dp32[i] | xout32[i]) != dp32[i]) {
1318             goto exit;
1319         }
1320     }
1321     ok = true;
1322
1323 exit:
1324     if (netflow) {
1325         if (!ok) {
1326             netflow_flow_clear(netflow, &flow);
1327         }
1328         netflow_unref(netflow);
1329     }
1330     xlate_out_uninit(xoutp);
1331     return ok;
1332 }
1333
1334 struct dump_op {
1335     struct udpif_key *ukey;
1336     struct dpif_flow_stats stats; /* Stats for 'op'. */
1337     struct dpif_op op;            /* Flow del operation. */
1338 };
1339
1340 static void
1341 dump_op_init(struct dump_op *op, const struct nlattr *key, size_t key_len,
1342              struct udpif_key *ukey)
1343 {
1344     op->ukey = ukey;
1345     op->op.type = DPIF_OP_FLOW_DEL;
1346     op->op.u.flow_del.key = key;
1347     op->op.u.flow_del.key_len = key_len;
1348     op->op.u.flow_del.stats = &op->stats;
1349 }
1350
1351 static void
1352 push_dump_ops__(struct udpif *udpif, struct dump_op *ops, size_t n_ops)
1353 {
1354     struct dpif_op *opsp[REVALIDATE_MAX_BATCH];
1355     size_t i;
1356
1357     ovs_assert(n_ops <= REVALIDATE_MAX_BATCH);
1358     for (i = 0; i < n_ops; i++) {
1359         opsp[i] = &ops[i].op;
1360     }
1361     dpif_operate(udpif->dpif, opsp, n_ops);
1362
1363     for (i = 0; i < n_ops; i++) {
1364         struct dump_op *op = &ops[i];
1365         struct dpif_flow_stats *push, *stats, push_buf;
1366
1367         stats = op->op.u.flow_del.stats;
1368         if (op->ukey) {
1369             push = &push_buf;
1370             ovs_mutex_lock(&op->ukey->mutex);
1371             push->used = MAX(stats->used, op->ukey->stats.used);
1372             push->tcp_flags = stats->tcp_flags | op->ukey->stats.tcp_flags;
1373             push->n_packets = stats->n_packets - op->ukey->stats.n_packets;
1374             push->n_bytes = stats->n_bytes - op->ukey->stats.n_bytes;
1375             ovs_mutex_unlock(&op->ukey->mutex);
1376         } else {
1377             push = stats;
1378         }
1379
1380         if (push->n_packets || netflow_exists()) {
1381             struct ofproto_dpif *ofproto;
1382             struct netflow *netflow;
1383             struct flow flow;
1384             bool may_learn;
1385
1386             may_learn = push->n_packets > 0;
1387             if (op->ukey) {
1388                 ovs_mutex_lock(&op->ukey->mutex);
1389                 if (op->ukey->xcache) {
1390                     xlate_push_stats(op->ukey->xcache, may_learn, push);
1391                     ovs_mutex_unlock(&op->ukey->mutex);
1392                     continue;
1393                 }
1394                 ovs_mutex_unlock(&op->ukey->mutex);
1395             }
1396
1397             if (!xlate_receive(udpif->backer, NULL, op->op.u.flow_del.key,
1398                                op->op.u.flow_del.key_len, &flow, &ofproto,
1399                                NULL, NULL, &netflow, NULL)) {
1400                 struct xlate_in xin;
1401
1402                 xlate_in_init(&xin, ofproto, &flow, NULL, push->tcp_flags,
1403                               NULL);
1404                 xin.resubmit_stats = push->n_packets ? push : NULL;
1405                 xin.may_learn = may_learn;
1406                 xin.skip_wildcards = true;
1407                 xlate_actions_for_side_effects(&xin);
1408
1409                 if (netflow) {
1410                     netflow_flow_clear(netflow, &flow);
1411                     netflow_unref(netflow);
1412                 }
1413             }
1414         }
1415     }
1416 }
1417
1418 static void
1419 push_dump_ops(struct revalidator *revalidator,
1420               struct dump_op *ops, size_t n_ops)
1421 {
1422     int i;
1423
1424     push_dump_ops__(revalidator->udpif, ops, n_ops);
1425     for (i = 0; i < n_ops; i++) {
1426         ukey_delete(revalidator, ops[i].ukey);
1427     }
1428 }
1429
1430 static void
1431 revalidate(struct revalidator *revalidator)
1432 {
1433     struct udpif *udpif = revalidator->udpif;
1434
1435     struct dump_op ops[REVALIDATE_MAX_BATCH];
1436     const struct nlattr *key, *mask, *actions;
1437     size_t key_len, mask_len, actions_len;
1438     const struct dpif_flow_stats *stats;
1439     long long int now;
1440     unsigned int flow_limit;
1441     size_t n_ops;
1442     void *state;
1443
1444     n_ops = 0;
1445     now = time_msec();
1446     atomic_read(&udpif->flow_limit, &flow_limit);
1447
1448     dpif_flow_dump_state_init(udpif->dpif, &state);
1449     while (dpif_flow_dump_next(&udpif->dump, state, &key, &key_len, &mask,
1450                                &mask_len, &actions, &actions_len, &stats)) {
1451         struct udpif_key *ukey;
1452         bool mark, may_destroy;
1453         long long int used, max_idle;
1454         uint32_t hash;
1455         size_t n_flows;
1456
1457         hash = hash_bytes(key, key_len, udpif->secret);
1458         ukey = ukey_lookup(udpif, key, key_len, hash);
1459
1460         used = stats->used;
1461         if (!ukey) {
1462             ukey = ukey_create(key, key_len, used);
1463             if (!udpif_insert_ukey(udpif, ukey, hash)) {
1464                 /* The same ukey has already been created. This means that
1465                  * another revalidator is processing this flow
1466                  * concurrently, so don't bother processing it. */
1467                 COVERAGE_INC(upcall_duplicate_flow);
1468                 ukey_delete(NULL, ukey);
1469                 goto next;
1470             }
1471         }
1472
1473         if (ovs_mutex_trylock(&ukey->mutex)) {
1474             /* The flow has been dumped, and is being handled by another
1475              * revalidator concurrently. This can occasionally occur if the
1476              * datapath is changed in the middle of a flow dump. Rather than
1477              * perform the same work twice, skip the flow this time. */
1478             COVERAGE_INC(upcall_duplicate_flow);
1479             goto next;
1480         }
1481
1482         if (ukey->mark || !ukey->flow_exists) {
1483             /* The flow has already been dumped and handled by another
1484              * revalidator during this flow dump operation. Skip it. */
1485             COVERAGE_INC(upcall_duplicate_flow);
1486             ovs_mutex_unlock(&ukey->mutex);
1487             goto next;
1488         }
1489
1490         if (!used) {
1491             used = ukey->created;
1492         }
1493         n_flows = udpif_get_n_flows(udpif);
1494         max_idle = ofproto_max_idle;
1495         if (n_flows > flow_limit) {
1496             max_idle = 100;
1497         }
1498
1499         if ((used && used < now - max_idle) || n_flows > flow_limit * 2) {
1500             mark = false;
1501         } else {
1502             mark = revalidate_ukey(udpif, ukey, mask, mask_len, actions,
1503                                    actions_len, stats);
1504         }
1505         ukey->mark = ukey->flow_exists = mark;
1506
1507         if (!mark) {
1508             dump_op_init(&ops[n_ops++], key, key_len, ukey);
1509         }
1510         ovs_mutex_unlock(&ukey->mutex);
1511
1512     next:
1513         may_destroy = dpif_flow_dump_next_may_destroy_keys(&udpif->dump,
1514                                                            state);
1515
1516         /* Only update 'now' immediately before 'buffer' will be updated.
1517          * This gives us the current time relative to the time the datapath
1518          * will write into 'stats'. */
1519         if (may_destroy) {
1520             now = time_msec();
1521         }
1522
1523         /* Only do a dpif_operate when we've hit our maximum batch, or when our
1524          * memory is about to be clobbered by the next call to
1525          * dpif_flow_dump_next(). */
1526         if (n_ops == REVALIDATE_MAX_BATCH || (n_ops && may_destroy)) {
1527             push_dump_ops__(udpif, ops, n_ops);
1528             n_ops = 0;
1529         }
1530     }
1531
1532     if (n_ops) {
1533         push_dump_ops__(udpif, ops, n_ops);
1534     }
1535
1536     dpif_flow_dump_state_uninit(udpif->dpif, state);
1537 }
1538
1539 static void
1540 revalidator_sweep__(struct revalidator *revalidator, bool purge)
1541     OVS_NO_THREAD_SAFETY_ANALYSIS
1542 {
1543     struct dump_op ops[REVALIDATE_MAX_BATCH];
1544     struct udpif_key *ukey, *next;
1545     size_t n_ops;
1546
1547     n_ops = 0;
1548
1549     /* During garbage collection, this revalidator completely owns its ukeys
1550      * map, and therefore doesn't need to do any locking. */
1551     HMAP_FOR_EACH_SAFE (ukey, next, hmap_node, revalidator->ukeys) {
1552         if (!purge && ukey->mark) {
1553             ukey->mark = false;
1554         } else if (!ukey->flow_exists) {
1555             ukey_delete(revalidator, ukey);
1556         } else {
1557             struct dump_op *op = &ops[n_ops++];
1558
1559             /* If we have previously seen a flow in the datapath, but didn't
1560              * see it during the most recent dump, delete it. This allows us
1561              * to clean up the ukey and keep the statistics consistent. */
1562             dump_op_init(op, ukey->key, ukey->key_len, ukey);
1563             if (n_ops == REVALIDATE_MAX_BATCH) {
1564                 push_dump_ops(revalidator, ops, n_ops);
1565                 n_ops = 0;
1566             }
1567         }
1568     }
1569
1570     if (n_ops) {
1571         push_dump_ops(revalidator, ops, n_ops);
1572     }
1573 }
1574
1575 static void
1576 revalidator_sweep(struct revalidator *revalidator)
1577 {
1578     revalidator_sweep__(revalidator, false);
1579 }
1580
1581 static void
1582 revalidator_purge(struct revalidator *revalidator)
1583 {
1584     revalidator_sweep__(revalidator, true);
1585 }
1586 \f
1587 static void
1588 upcall_unixctl_show(struct unixctl_conn *conn, int argc OVS_UNUSED,
1589                     const char *argv[] OVS_UNUSED, void *aux OVS_UNUSED)
1590 {
1591     struct ds ds = DS_EMPTY_INITIALIZER;
1592     struct udpif *udpif;
1593
1594     LIST_FOR_EACH (udpif, list_node, &all_udpifs) {
1595         unsigned int flow_limit;
1596         size_t i;
1597
1598         atomic_read(&udpif->flow_limit, &flow_limit);
1599
1600         ds_put_format(&ds, "%s:\n", dpif_name(udpif->dpif));
1601         ds_put_format(&ds, "\tflows         : (current %lu)"
1602             " (avg %u) (max %u) (limit %u)\n", udpif_get_n_flows(udpif),
1603             udpif->avg_n_flows, udpif->max_n_flows, flow_limit);
1604         ds_put_format(&ds, "\tdump duration : %lldms\n", udpif->dump_duration);
1605
1606         ds_put_char(&ds, '\n');
1607         for (i = 0; i < n_revalidators; i++) {
1608             struct revalidator *revalidator = &udpif->revalidators[i];
1609
1610             ovs_mutex_lock(&udpif->ukeys[i].mutex);
1611             ds_put_format(&ds, "\t%u: (keys %"PRIuSIZE")\n",
1612                           revalidator->id, hmap_count(&udpif->ukeys[i].hmap));
1613             ovs_mutex_unlock(&udpif->ukeys[i].mutex);
1614         }
1615     }
1616
1617     unixctl_command_reply(conn, ds_cstr(&ds));
1618     ds_destroy(&ds);
1619 }
1620
1621 /* Disable using the megaflows.
1622  *
1623  * This command is only needed for advanced debugging, so it's not
1624  * documented in the man page. */
1625 static void
1626 upcall_unixctl_disable_megaflows(struct unixctl_conn *conn,
1627                                  int argc OVS_UNUSED,
1628                                  const char *argv[] OVS_UNUSED,
1629                                  void *aux OVS_UNUSED)
1630 {
1631     atomic_store(&enable_megaflows, false);
1632     udpif_flush_all_datapaths();
1633     unixctl_command_reply(conn, "megaflows disabled");
1634 }
1635
1636 /* Re-enable using megaflows.
1637  *
1638  * This command is only needed for advanced debugging, so it's not
1639  * documented in the man page. */
1640 static void
1641 upcall_unixctl_enable_megaflows(struct unixctl_conn *conn,
1642                                 int argc OVS_UNUSED,
1643                                 const char *argv[] OVS_UNUSED,
1644                                 void *aux OVS_UNUSED)
1645 {
1646     atomic_store(&enable_megaflows, true);
1647     udpif_flush_all_datapaths();
1648     unixctl_command_reply(conn, "megaflows enabled");
1649 }
1650
1651 /* Set the flow limit.
1652  *
1653  * This command is only needed for advanced debugging, so it's not
1654  * documented in the man page. */
1655 static void
1656 upcall_unixctl_set_flow_limit(struct unixctl_conn *conn,
1657                               int argc OVS_UNUSED,
1658                               const char *argv[] OVS_UNUSED,
1659                               void *aux OVS_UNUSED)
1660 {
1661     struct ds ds = DS_EMPTY_INITIALIZER;
1662     struct udpif *udpif;
1663     unsigned int flow_limit = atoi(argv[1]);
1664
1665     LIST_FOR_EACH (udpif, list_node, &all_udpifs) {
1666         atomic_store(&udpif->flow_limit, flow_limit);
1667     }
1668     ds_put_format(&ds, "set flow_limit to %u\n", flow_limit);
1669     unixctl_command_reply(conn, ds_cstr(&ds));
1670     ds_destroy(&ds);
1671 }