dpif-netdev: Batch megaflow lookup.
[cascardo/ovs.git] / lib / dpif-netdev.c
1 /*
2  * Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014 Nicira, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at:
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <config.h>
18 #include "dpif.h"
19
20 #include <ctype.h>
21 #include <errno.h>
22 #include <fcntl.h>
23 #include <inttypes.h>
24 #include <netinet/in.h>
25 #include <sys/socket.h>
26 #include <net/if.h>
27 #include <stdint.h>
28 #include <stdlib.h>
29 #include <string.h>
30 #include <sys/ioctl.h>
31 #include <sys/stat.h>
32 #include <unistd.h>
33
34 #include "classifier.h"
35 #include "cmap.h"
36 #include "csum.h"
37 #include "dpif.h"
38 #include "dpif-provider.h"
39 #include "dummy.h"
40 #include "dynamic-string.h"
41 #include "flow.h"
42 #include "hmap.h"
43 #include "latch.h"
44 #include "list.h"
45 #include "meta-flow.h"
46 #include "netdev.h"
47 #include "netdev-dpdk.h"
48 #include "netdev-vport.h"
49 #include "netlink.h"
50 #include "odp-execute.h"
51 #include "odp-util.h"
52 #include "ofp-print.h"
53 #include "ofpbuf.h"
54 #include "ovs-rcu.h"
55 #include "packet-dpif.h"
56 #include "packets.h"
57 #include "poll-loop.h"
58 #include "random.h"
59 #include "seq.h"
60 #include "shash.h"
61 #include "sset.h"
62 #include "timeval.h"
63 #include "unixctl.h"
64 #include "util.h"
65 #include "vlog.h"
66
67 VLOG_DEFINE_THIS_MODULE(dpif_netdev);
68
69 /* By default, choose a priority in the middle. */
70 #define NETDEV_RULE_PRIORITY 0x8000
71
72 #define FLOW_DUMP_MAX_BATCH 50
73 #define NR_THREADS 1
74 /* Use per thread recirc_depth to prevent recirculation loop. */
75 #define MAX_RECIRC_DEPTH 5
76 DEFINE_STATIC_PER_THREAD_DATA(uint32_t, recirc_depth, 0)
77
78 /* Configuration parameters. */
79 enum { MAX_FLOWS = 65536 };     /* Maximum number of flows in flow table. */
80
81 /* Queues. */
82 enum { MAX_QUEUE_LEN = 128 };   /* Maximum number of packets per queue. */
83 enum { QUEUE_MASK = MAX_QUEUE_LEN - 1 };
84 BUILD_ASSERT_DECL(IS_POW2(MAX_QUEUE_LEN));
85
86 /* Protects against changes to 'dp_netdevs'. */
87 static struct ovs_mutex dp_netdev_mutex = OVS_MUTEX_INITIALIZER;
88
89 /* Contains all 'struct dp_netdev's. */
90 static struct shash dp_netdevs OVS_GUARDED_BY(dp_netdev_mutex)
91     = SHASH_INITIALIZER(&dp_netdevs);
92
93 struct dp_netdev_upcall {
94     struct dpif_upcall upcall;  /* Queued upcall information. */
95     struct ofpbuf buf;          /* ofpbuf instance for upcall.packet. */
96 };
97
98 /* A queue passing packets from a struct dp_netdev to its clients (handlers).
99  *
100  *
101  * Thread-safety
102  * =============
103  *
104  * Any access at all requires the owning 'dp_netdev''s queue_rwlock and
105  * its own mutex. */
106 struct dp_netdev_queue {
107     struct ovs_mutex mutex;
108     struct seq *seq;      /* Incremented whenever a packet is queued. */
109     struct dp_netdev_upcall upcalls[MAX_QUEUE_LEN] OVS_GUARDED;
110     unsigned int head OVS_GUARDED;
111     unsigned int tail OVS_GUARDED;
112 };
113
114 /* Datapath based on the network device interface from netdev.h.
115  *
116  *
117  * Thread-safety
118  * =============
119  *
120  * Some members, marked 'const', are immutable.  Accessing other members
121  * requires synchronization, as noted in more detail below.
122  *
123  * Acquisition order is, from outermost to innermost:
124  *
125  *    dp_netdev_mutex (global)
126  *    port_mutex
127  *    flow_mutex
128  *    cls.rwlock
129  *    queue_rwlock
130  */
131 struct dp_netdev {
132     const struct dpif_class *const class;
133     const char *const name;
134     struct ovs_refcount ref_cnt;
135     atomic_flag destroyed;
136
137     /* Flows.
138      *
139      * Readers of 'cls' and 'flow_table' must take a 'cls->rwlock' read lock.
140      *
141      * Writers of 'cls' and 'flow_table' must take the 'flow_mutex' and then
142      * the 'cls->rwlock' write lock.  (The outer 'flow_mutex' allows writers to
143      * atomically perform multiple operations on 'cls' and 'flow_table'.)
144      */
145     struct ovs_mutex flow_mutex;
146     struct classifier cls;      /* Classifier.  Protected by cls.rwlock. */
147     struct hmap flow_table OVS_GUARDED; /* Flow table. */
148
149     /* Queues.
150      *
151      * 'queue_rwlock' protects the modification of 'handler_queues' and
152      * 'n_handlers'.  The queue elements are protected by its
153      * 'handler_queues''s mutex. */
154     struct fat_rwlock queue_rwlock;
155     struct dp_netdev_queue *handler_queues;
156     uint32_t n_handlers;
157
158     /* Statistics.
159      *
160      * ovsthread_stats is internally synchronized. */
161     struct ovsthread_stats stats; /* Contains 'struct dp_netdev_stats *'. */
162
163     /* Ports.
164      *
165      * Protected by RCU.  Take the mutex to add or remove ports. */
166     struct ovs_mutex port_mutex;
167     struct cmap ports;
168     struct seq *port_seq;       /* Incremented whenever a port changes. */
169
170     /* Forwarding threads. */
171     struct latch exit_latch;
172     struct pmd_thread *pmd_threads;
173     size_t n_pmd_threads;
174     int pmd_count;
175 };
176
177 static struct dp_netdev_port *dp_netdev_lookup_port(const struct dp_netdev *dp,
178                                                     odp_port_t);
179
180 enum dp_stat_type {
181     DP_STAT_HIT,                /* Packets that matched in the flow table. */
182     DP_STAT_MISS,               /* Packets that did not match. */
183     DP_STAT_LOST,               /* Packets not passed up to the client. */
184     DP_N_STATS
185 };
186
187 /* Contained by struct dp_netdev's 'stats' member.  */
188 struct dp_netdev_stats {
189     struct ovs_mutex mutex;          /* Protects 'n'. */
190
191     /* Indexed by DP_STAT_*, protected by 'mutex'. */
192     unsigned long long int n[DP_N_STATS] OVS_GUARDED;
193 };
194
195
196 /* A port in a netdev-based datapath. */
197 struct dp_netdev_port {
198     struct cmap_node node;      /* Node in dp_netdev's 'ports'. */
199     odp_port_t port_no;
200     struct netdev *netdev;
201     struct netdev_saved_flags *sf;
202     struct netdev_rxq **rxq;
203     struct ovs_refcount ref_cnt;
204     char *type;                 /* Port type as requested by user. */
205 };
206
207
208 /* Stores a miniflow */
209
210 /* There are fields in the flow structure that we never use. Therefore we can
211  * save a few words of memory */
212 #define NETDEV_KEY_BUF_SIZE_U32 (FLOW_U32S \
213                                  - FLOW_U32_SIZE(regs) \
214                                  - FLOW_U32_SIZE(metadata) \
215                                 )
216 struct netdev_flow_key {
217     struct miniflow flow;
218     uint32_t buf[NETDEV_KEY_BUF_SIZE_U32];
219 };
220
221 /* A flow in dp_netdev's 'flow_table'.
222  *
223  *
224  * Thread-safety
225  * =============
226  *
227  * Except near the beginning or ending of its lifespan, rule 'rule' belongs to
228  * its dp_netdev's classifier.  The text below calls this classifier 'cls'.
229  *
230  * Motivation
231  * ----------
232  *
233  * The thread safety rules described here for "struct dp_netdev_flow" are
234  * motivated by two goals:
235  *
236  *    - Prevent threads that read members of "struct dp_netdev_flow" from
237  *      reading bad data due to changes by some thread concurrently modifying
238  *      those members.
239  *
240  *    - Prevent two threads making changes to members of a given "struct
241  *      dp_netdev_flow" from interfering with each other.
242  *
243  *
244  * Rules
245  * -----
246  *
247  * A flow 'flow' may be accessed without a risk of being freed by code that
248  * holds a read-lock or write-lock on 'cls->rwlock' or that owns a reference to
249  * 'flow->ref_cnt' (or both).  Code that needs to hold onto a flow for a while
250  * should take 'cls->rwlock', find the flow it needs, increment 'flow->ref_cnt'
251  * with dpif_netdev_flow_ref(), and drop 'cls->rwlock'.
252  *
253  * 'flow->ref_cnt' protects 'flow' from being freed.  It doesn't protect the
254  * flow from being deleted from 'cls' (that's 'cls->rwlock') and it doesn't
255  * protect members of 'flow' from modification.
256  *
257  * Some members, marked 'const', are immutable.  Accessing other members
258  * requires synchronization, as noted in more detail below.
259  */
260 struct dp_netdev_flow {
261     /* Packet classification. */
262     const struct cls_rule cr;   /* In owning dp_netdev's 'cls'. */
263
264     /* Hash table index by unmasked flow. */
265     const struct hmap_node node; /* In owning dp_netdev's 'flow_table'. */
266     const struct flow flow;      /* The flow that created this entry. */
267
268     /* Statistics.
269      *
270      * Reading or writing these members requires 'mutex'. */
271     struct ovsthread_stats stats; /* Contains "struct dp_netdev_flow_stats". */
272
273     /* Actions. */
274     OVSRCU_TYPE(struct dp_netdev_actions *) actions;
275 };
276
277 static void dp_netdev_flow_free(struct dp_netdev_flow *);
278
279 /* Contained by struct dp_netdev_flow's 'stats' member.  */
280 struct dp_netdev_flow_stats {
281     struct ovs_mutex mutex;         /* Guards all the other members. */
282
283     long long int used OVS_GUARDED; /* Last used time, in monotonic msecs. */
284     long long int packet_count OVS_GUARDED; /* Number of packets matched. */
285     long long int byte_count OVS_GUARDED;   /* Number of bytes matched. */
286     uint16_t tcp_flags OVS_GUARDED; /* Bitwise-OR of seen tcp_flags values. */
287 };
288
289 /* A set of datapath actions within a "struct dp_netdev_flow".
290  *
291  *
292  * Thread-safety
293  * =============
294  *
295  * A struct dp_netdev_actions 'actions' is protected with RCU. */
296 struct dp_netdev_actions {
297     /* These members are immutable: they do not change during the struct's
298      * lifetime.  */
299     struct nlattr *actions;     /* Sequence of OVS_ACTION_ATTR_* attributes. */
300     unsigned int size;          /* Size of 'actions', in bytes. */
301 };
302
303 struct dp_netdev_actions *dp_netdev_actions_create(const struct nlattr *,
304                                                    size_t);
305 struct dp_netdev_actions *dp_netdev_flow_get_actions(
306     const struct dp_netdev_flow *);
307 static void dp_netdev_actions_free(struct dp_netdev_actions *);
308
309 /* PMD: Poll modes drivers.  PMD accesses devices via polling to eliminate
310  * the performance overhead of interrupt processing.  Therefore netdev can
311  * not implement rx-wait for these devices.  dpif-netdev needs to poll
312  * these device to check for recv buffer.  pmd-thread does polling for
313  * devices assigned to itself thread.
314  *
315  * DPDK used PMD for accessing NIC.
316  *
317  * A thread that receives packets from PMD ports, looks them up in the flow
318  * table, and executes the actions it finds.
319  **/
320 struct pmd_thread {
321     struct dp_netdev *dp;
322     pthread_t thread;
323     int id;
324     atomic_uint change_seq;
325 };
326
327 /* Interface to netdev-based datapath. */
328 struct dpif_netdev {
329     struct dpif dpif;
330     struct dp_netdev *dp;
331     uint64_t last_port_seq;
332 };
333
334 static int get_port_by_number(struct dp_netdev *dp, odp_port_t port_no,
335                               struct dp_netdev_port **portp);
336 static int get_port_by_name(struct dp_netdev *dp, const char *devname,
337                             struct dp_netdev_port **portp);
338 static void dp_netdev_free(struct dp_netdev *)
339     OVS_REQUIRES(dp_netdev_mutex);
340 static void dp_netdev_flow_flush(struct dp_netdev *);
341 static int do_add_port(struct dp_netdev *dp, const char *devname,
342                        const char *type, odp_port_t port_no)
343     OVS_REQUIRES(dp->port_mutex);
344 static void do_del_port(struct dp_netdev *dp, struct dp_netdev_port *)
345     OVS_REQUIRES(dp->port_mutex);
346 static void dp_netdev_destroy_all_queues(struct dp_netdev *dp)
347     OVS_REQ_WRLOCK(dp->queue_rwlock);
348 static int dpif_netdev_open(const struct dpif_class *, const char *name,
349                             bool create, struct dpif **);
350 static int dp_netdev_output_userspace(struct dp_netdev *dp, struct ofpbuf **,
351                                       int cnt, int queue_no, int type,
352                                       const struct miniflow *,
353                                       const struct nlattr *userdata);
354 static void dp_netdev_execute_actions(struct dp_netdev *dp,
355                                       struct dpif_packet **, int c,
356                                       bool may_steal, struct pkt_metadata *,
357                                       const struct nlattr *actions,
358                                       size_t actions_len);
359 static void dp_netdev_port_input(struct dp_netdev *dp,
360                                  struct dpif_packet **packets, int cnt,
361                                  odp_port_t port_no);
362
363 static void dp_netdev_set_pmd_threads(struct dp_netdev *, int n);
364
365 static struct dpif_netdev *
366 dpif_netdev_cast(const struct dpif *dpif)
367 {
368     ovs_assert(dpif->dpif_class->open == dpif_netdev_open);
369     return CONTAINER_OF(dpif, struct dpif_netdev, dpif);
370 }
371
372 static struct dp_netdev *
373 get_dp_netdev(const struct dpif *dpif)
374 {
375     return dpif_netdev_cast(dpif)->dp;
376 }
377
378 static int
379 dpif_netdev_enumerate(struct sset *all_dps,
380                       const struct dpif_class *dpif_class)
381 {
382     struct shash_node *node;
383
384     ovs_mutex_lock(&dp_netdev_mutex);
385     SHASH_FOR_EACH(node, &dp_netdevs) {
386         struct dp_netdev *dp = node->data;
387         if (dpif_class != dp->class) {
388             /* 'dp_netdevs' contains both "netdev" and "dummy" dpifs.
389              * If the class doesn't match, skip this dpif. */
390              continue;
391         }
392         sset_add(all_dps, node->name);
393     }
394     ovs_mutex_unlock(&dp_netdev_mutex);
395
396     return 0;
397 }
398
399 static bool
400 dpif_netdev_class_is_dummy(const struct dpif_class *class)
401 {
402     return class != &dpif_netdev_class;
403 }
404
405 static const char *
406 dpif_netdev_port_open_type(const struct dpif_class *class, const char *type)
407 {
408     return strcmp(type, "internal") ? type
409                   : dpif_netdev_class_is_dummy(class) ? "dummy"
410                   : "tap";
411 }
412
413 static struct dpif *
414 create_dpif_netdev(struct dp_netdev *dp)
415 {
416     uint16_t netflow_id = hash_string(dp->name, 0);
417     struct dpif_netdev *dpif;
418
419     ovs_refcount_ref(&dp->ref_cnt);
420
421     dpif = xmalloc(sizeof *dpif);
422     dpif_init(&dpif->dpif, dp->class, dp->name, netflow_id >> 8, netflow_id);
423     dpif->dp = dp;
424     dpif->last_port_seq = seq_read(dp->port_seq);
425
426     return &dpif->dpif;
427 }
428
429 /* Choose an unused, non-zero port number and return it on success.
430  * Return ODPP_NONE on failure. */
431 static odp_port_t
432 choose_port(struct dp_netdev *dp, const char *name)
433     OVS_REQUIRES(dp->port_mutex)
434 {
435     uint32_t port_no;
436
437     if (dp->class != &dpif_netdev_class) {
438         const char *p;
439         int start_no = 0;
440
441         /* If the port name begins with "br", start the number search at
442          * 100 to make writing tests easier. */
443         if (!strncmp(name, "br", 2)) {
444             start_no = 100;
445         }
446
447         /* If the port name contains a number, try to assign that port number.
448          * This can make writing unit tests easier because port numbers are
449          * predictable. */
450         for (p = name; *p != '\0'; p++) {
451             if (isdigit((unsigned char) *p)) {
452                 port_no = start_no + strtol(p, NULL, 10);
453                 if (port_no > 0 && port_no != odp_to_u32(ODPP_NONE)
454                     && !dp_netdev_lookup_port(dp, u32_to_odp(port_no))) {
455                     return u32_to_odp(port_no);
456                 }
457                 break;
458             }
459         }
460     }
461
462     for (port_no = 1; port_no <= UINT16_MAX; port_no++) {
463         if (!dp_netdev_lookup_port(dp, u32_to_odp(port_no))) {
464             return u32_to_odp(port_no);
465         }
466     }
467
468     return ODPP_NONE;
469 }
470
471 static int
472 create_dp_netdev(const char *name, const struct dpif_class *class,
473                  struct dp_netdev **dpp)
474     OVS_REQUIRES(dp_netdev_mutex)
475 {
476     struct dp_netdev *dp;
477     int error;
478
479     dp = xzalloc(sizeof *dp);
480     shash_add(&dp_netdevs, name, dp);
481
482     *CONST_CAST(const struct dpif_class **, &dp->class) = class;
483     *CONST_CAST(const char **, &dp->name) = xstrdup(name);
484     ovs_refcount_init(&dp->ref_cnt);
485     atomic_flag_clear(&dp->destroyed);
486
487     ovs_mutex_init(&dp->flow_mutex);
488     classifier_init(&dp->cls, NULL);
489     hmap_init(&dp->flow_table);
490
491     fat_rwlock_init(&dp->queue_rwlock);
492
493     ovsthread_stats_init(&dp->stats);
494
495     ovs_mutex_init(&dp->port_mutex);
496     cmap_init(&dp->ports);
497     dp->port_seq = seq_create();
498     latch_init(&dp->exit_latch);
499
500     ovs_mutex_lock(&dp->port_mutex);
501     error = do_add_port(dp, name, "internal", ODPP_LOCAL);
502     ovs_mutex_unlock(&dp->port_mutex);
503     if (error) {
504         dp_netdev_free(dp);
505         return error;
506     }
507
508     *dpp = dp;
509     return 0;
510 }
511
512 static int
513 dpif_netdev_open(const struct dpif_class *class, const char *name,
514                  bool create, struct dpif **dpifp)
515 {
516     struct dp_netdev *dp;
517     int error;
518
519     ovs_mutex_lock(&dp_netdev_mutex);
520     dp = shash_find_data(&dp_netdevs, name);
521     if (!dp) {
522         error = create ? create_dp_netdev(name, class, &dp) : ENODEV;
523     } else {
524         error = (dp->class != class ? EINVAL
525                  : create ? EEXIST
526                  : 0);
527     }
528     if (!error) {
529         *dpifp = create_dpif_netdev(dp);
530     }
531     ovs_mutex_unlock(&dp_netdev_mutex);
532
533     return error;
534 }
535
536 static void
537 dp_netdev_purge_queues(struct dp_netdev *dp)
538     OVS_REQ_WRLOCK(dp->queue_rwlock)
539 {
540     int i;
541
542     for (i = 0; i < dp->n_handlers; i++) {
543         struct dp_netdev_queue *q = &dp->handler_queues[i];
544
545         ovs_mutex_lock(&q->mutex);
546         while (q->tail != q->head) {
547             struct dp_netdev_upcall *u = &q->upcalls[q->tail++ & QUEUE_MASK];
548             ofpbuf_uninit(&u->upcall.packet);
549             ofpbuf_uninit(&u->buf);
550         }
551         ovs_mutex_unlock(&q->mutex);
552     }
553 }
554
555 /* Requires dp_netdev_mutex so that we can't get a new reference to 'dp'
556  * through the 'dp_netdevs' shash while freeing 'dp'. */
557 static void
558 dp_netdev_free(struct dp_netdev *dp)
559     OVS_REQUIRES(dp_netdev_mutex)
560 {
561     struct dp_netdev_port *port;
562     struct dp_netdev_stats *bucket;
563     int i;
564
565     shash_find_and_delete(&dp_netdevs, dp->name);
566
567     dp_netdev_set_pmd_threads(dp, 0);
568     free(dp->pmd_threads);
569
570     dp_netdev_flow_flush(dp);
571     ovs_mutex_lock(&dp->port_mutex);
572     CMAP_FOR_EACH (port, node, &dp->ports) {
573         do_del_port(dp, port);
574     }
575     ovs_mutex_unlock(&dp->port_mutex);
576
577     OVSTHREAD_STATS_FOR_EACH_BUCKET (bucket, i, &dp->stats) {
578         ovs_mutex_destroy(&bucket->mutex);
579         free_cacheline(bucket);
580     }
581     ovsthread_stats_destroy(&dp->stats);
582
583     fat_rwlock_wrlock(&dp->queue_rwlock);
584     dp_netdev_destroy_all_queues(dp);
585     fat_rwlock_unlock(&dp->queue_rwlock);
586
587     fat_rwlock_destroy(&dp->queue_rwlock);
588
589     classifier_destroy(&dp->cls);
590     hmap_destroy(&dp->flow_table);
591     ovs_mutex_destroy(&dp->flow_mutex);
592     seq_destroy(dp->port_seq);
593     cmap_destroy(&dp->ports);
594     latch_destroy(&dp->exit_latch);
595     free(CONST_CAST(char *, dp->name));
596     free(dp);
597 }
598
599 static void
600 dp_netdev_unref(struct dp_netdev *dp)
601 {
602     if (dp) {
603         /* Take dp_netdev_mutex so that, if dp->ref_cnt falls to zero, we can't
604          * get a new reference to 'dp' through the 'dp_netdevs' shash. */
605         ovs_mutex_lock(&dp_netdev_mutex);
606         if (ovs_refcount_unref(&dp->ref_cnt) == 1) {
607             dp_netdev_free(dp);
608         }
609         ovs_mutex_unlock(&dp_netdev_mutex);
610     }
611 }
612
613 static void
614 dpif_netdev_close(struct dpif *dpif)
615 {
616     struct dp_netdev *dp = get_dp_netdev(dpif);
617
618     dp_netdev_unref(dp);
619     free(dpif);
620 }
621
622 static int
623 dpif_netdev_destroy(struct dpif *dpif)
624 {
625     struct dp_netdev *dp = get_dp_netdev(dpif);
626
627     if (!atomic_flag_test_and_set(&dp->destroyed)) {
628         if (ovs_refcount_unref(&dp->ref_cnt) == 1) {
629             /* Can't happen: 'dpif' still owns a reference to 'dp'. */
630             OVS_NOT_REACHED();
631         }
632     }
633
634     return 0;
635 }
636
637 static int
638 dpif_netdev_get_stats(const struct dpif *dpif, struct dpif_dp_stats *stats)
639 {
640     struct dp_netdev *dp = get_dp_netdev(dpif);
641     struct dp_netdev_stats *bucket;
642     size_t i;
643
644     fat_rwlock_rdlock(&dp->cls.rwlock);
645     stats->n_flows = hmap_count(&dp->flow_table);
646     fat_rwlock_unlock(&dp->cls.rwlock);
647
648     stats->n_hit = stats->n_missed = stats->n_lost = 0;
649     OVSTHREAD_STATS_FOR_EACH_BUCKET (bucket, i, &dp->stats) {
650         ovs_mutex_lock(&bucket->mutex);
651         stats->n_hit += bucket->n[DP_STAT_HIT];
652         stats->n_missed += bucket->n[DP_STAT_MISS];
653         stats->n_lost += bucket->n[DP_STAT_LOST];
654         ovs_mutex_unlock(&bucket->mutex);
655     }
656     stats->n_masks = UINT32_MAX;
657     stats->n_mask_hit = UINT64_MAX;
658
659     return 0;
660 }
661
662 static void
663 dp_netdev_reload_pmd_threads(struct dp_netdev *dp)
664 {
665     int i;
666
667     for (i = 0; i < dp->n_pmd_threads; i++) {
668         struct pmd_thread *f = &dp->pmd_threads[i];
669         int id;
670
671         atomic_add(&f->change_seq, 1, &id);
672    }
673 }
674
675 static uint32_t
676 hash_port_no(odp_port_t port_no)
677 {
678     return hash_int(odp_to_u32(port_no), 0);
679 }
680
681 static int
682 do_add_port(struct dp_netdev *dp, const char *devname, const char *type,
683             odp_port_t port_no)
684     OVS_REQUIRES(dp->port_mutex)
685 {
686     struct netdev_saved_flags *sf;
687     struct dp_netdev_port *port;
688     struct netdev *netdev;
689     enum netdev_flags flags;
690     const char *open_type;
691     int error;
692     int i;
693
694     /* XXX reject devices already in some dp_netdev. */
695
696     /* Open and validate network device. */
697     open_type = dpif_netdev_port_open_type(dp->class, type);
698     error = netdev_open(devname, open_type, &netdev);
699     if (error) {
700         return error;
701     }
702     /* XXX reject non-Ethernet devices */
703
704     netdev_get_flags(netdev, &flags);
705     if (flags & NETDEV_LOOPBACK) {
706         VLOG_ERR("%s: cannot add a loopback device", devname);
707         netdev_close(netdev);
708         return EINVAL;
709     }
710
711     port = xzalloc(sizeof *port);
712     port->port_no = port_no;
713     port->netdev = netdev;
714     port->rxq = xmalloc(sizeof *port->rxq * netdev_n_rxq(netdev));
715     port->type = xstrdup(type);
716     for (i = 0; i < netdev_n_rxq(netdev); i++) {
717         error = netdev_rxq_open(netdev, &port->rxq[i], i);
718         if (error
719             && !(error == EOPNOTSUPP && dpif_netdev_class_is_dummy(dp->class))) {
720             VLOG_ERR("%s: cannot receive packets on this network device (%s)",
721                      devname, ovs_strerror(errno));
722             netdev_close(netdev);
723             return error;
724         }
725     }
726
727     error = netdev_turn_flags_on(netdev, NETDEV_PROMISC, &sf);
728     if (error) {
729         for (i = 0; i < netdev_n_rxq(netdev); i++) {
730             netdev_rxq_close(port->rxq[i]);
731         }
732         netdev_close(netdev);
733         free(port->rxq);
734         free(port);
735         return error;
736     }
737     port->sf = sf;
738
739     if (netdev_is_pmd(netdev)) {
740         dp->pmd_count++;
741         dp_netdev_set_pmd_threads(dp, NR_THREADS);
742         dp_netdev_reload_pmd_threads(dp);
743     }
744     ovs_refcount_init(&port->ref_cnt);
745
746     cmap_insert(&dp->ports, &port->node, hash_port_no(port_no));
747     seq_change(dp->port_seq);
748
749     return 0;
750 }
751
752 static int
753 dpif_netdev_port_add(struct dpif *dpif, struct netdev *netdev,
754                      odp_port_t *port_nop)
755 {
756     struct dp_netdev *dp = get_dp_netdev(dpif);
757     char namebuf[NETDEV_VPORT_NAME_BUFSIZE];
758     const char *dpif_port;
759     odp_port_t port_no;
760     int error;
761
762     ovs_mutex_lock(&dp->port_mutex);
763     dpif_port = netdev_vport_get_dpif_port(netdev, namebuf, sizeof namebuf);
764     if (*port_nop != ODPP_NONE) {
765         port_no = *port_nop;
766         error = dp_netdev_lookup_port(dp, *port_nop) ? EBUSY : 0;
767     } else {
768         port_no = choose_port(dp, dpif_port);
769         error = port_no == ODPP_NONE ? EFBIG : 0;
770     }
771     if (!error) {
772         *port_nop = port_no;
773         error = do_add_port(dp, dpif_port, netdev_get_type(netdev), port_no);
774     }
775     ovs_mutex_unlock(&dp->port_mutex);
776
777     return error;
778 }
779
780 static int
781 dpif_netdev_port_del(struct dpif *dpif, odp_port_t port_no)
782 {
783     struct dp_netdev *dp = get_dp_netdev(dpif);
784     int error;
785
786     ovs_mutex_lock(&dp->port_mutex);
787     if (port_no == ODPP_LOCAL) {
788         error = EINVAL;
789     } else {
790         struct dp_netdev_port *port;
791
792         error = get_port_by_number(dp, port_no, &port);
793         if (!error) {
794             do_del_port(dp, port);
795         }
796     }
797     ovs_mutex_unlock(&dp->port_mutex);
798
799     return error;
800 }
801
802 static bool
803 is_valid_port_number(odp_port_t port_no)
804 {
805     return port_no != ODPP_NONE;
806 }
807
808 static struct dp_netdev_port *
809 dp_netdev_lookup_port(const struct dp_netdev *dp, odp_port_t port_no)
810 {
811     struct dp_netdev_port *port;
812
813     CMAP_FOR_EACH_WITH_HASH (port, node, hash_port_no(port_no), &dp->ports) {
814         if (port->port_no == port_no) {
815             return port;
816         }
817     }
818     return NULL;
819 }
820
821 static int
822 get_port_by_number(struct dp_netdev *dp,
823                    odp_port_t port_no, struct dp_netdev_port **portp)
824 {
825     if (!is_valid_port_number(port_no)) {
826         *portp = NULL;
827         return EINVAL;
828     } else {
829         *portp = dp_netdev_lookup_port(dp, port_no);
830         return *portp ? 0 : ENOENT;
831     }
832 }
833
834 static void
835 port_ref(struct dp_netdev_port *port)
836 {
837     if (port) {
838         ovs_refcount_ref(&port->ref_cnt);
839     }
840 }
841
842 static void
843 port_destroy__(struct dp_netdev_port *port)
844 {
845     int n_rxq = netdev_n_rxq(port->netdev);
846     int i;
847
848     netdev_close(port->netdev);
849     netdev_restore_flags(port->sf);
850
851     for (i = 0; i < n_rxq; i++) {
852         netdev_rxq_close(port->rxq[i]);
853     }
854     free(port->rxq);
855     free(port->type);
856     free(port);
857 }
858
859 static void
860 port_unref(struct dp_netdev_port *port)
861 {
862     if (port && ovs_refcount_unref(&port->ref_cnt) == 1) {
863         ovsrcu_postpone(port_destroy__, port);
864     }
865 }
866
867 static int
868 get_port_by_name(struct dp_netdev *dp,
869                  const char *devname, struct dp_netdev_port **portp)
870     OVS_REQUIRES(dp->port_mutex)
871 {
872     struct dp_netdev_port *port;
873
874     CMAP_FOR_EACH (port, node, &dp->ports) {
875         if (!strcmp(netdev_get_name(port->netdev), devname)) {
876             *portp = port;
877             return 0;
878         }
879     }
880     return ENOENT;
881 }
882
883 static void
884 do_del_port(struct dp_netdev *dp, struct dp_netdev_port *port)
885     OVS_REQUIRES(dp->port_mutex)
886 {
887     cmap_remove(&dp->ports, &port->node, hash_odp_port(port->port_no));
888     seq_change(dp->port_seq);
889     if (netdev_is_pmd(port->netdev)) {
890         dp_netdev_reload_pmd_threads(dp);
891     }
892
893     port_unref(port);
894 }
895
896 static void
897 answer_port_query(const struct dp_netdev_port *port,
898                   struct dpif_port *dpif_port)
899 {
900     dpif_port->name = xstrdup(netdev_get_name(port->netdev));
901     dpif_port->type = xstrdup(port->type);
902     dpif_port->port_no = port->port_no;
903 }
904
905 static int
906 dpif_netdev_port_query_by_number(const struct dpif *dpif, odp_port_t port_no,
907                                  struct dpif_port *dpif_port)
908 {
909     struct dp_netdev *dp = get_dp_netdev(dpif);
910     struct dp_netdev_port *port;
911     int error;
912
913     error = get_port_by_number(dp, port_no, &port);
914     if (!error && dpif_port) {
915         answer_port_query(port, dpif_port);
916     }
917
918     return error;
919 }
920
921 static int
922 dpif_netdev_port_query_by_name(const struct dpif *dpif, const char *devname,
923                                struct dpif_port *dpif_port)
924 {
925     struct dp_netdev *dp = get_dp_netdev(dpif);
926     struct dp_netdev_port *port;
927     int error;
928
929     ovs_mutex_lock(&dp->port_mutex);
930     error = get_port_by_name(dp, devname, &port);
931     if (!error && dpif_port) {
932         answer_port_query(port, dpif_port);
933     }
934     ovs_mutex_unlock(&dp->port_mutex);
935
936     return error;
937 }
938
939 static void
940 dp_netdev_flow_free(struct dp_netdev_flow *flow)
941 {
942     struct dp_netdev_flow_stats *bucket;
943     size_t i;
944
945     OVSTHREAD_STATS_FOR_EACH_BUCKET (bucket, i, &flow->stats) {
946         ovs_mutex_destroy(&bucket->mutex);
947         free_cacheline(bucket);
948     }
949     ovsthread_stats_destroy(&flow->stats);
950
951     cls_rule_destroy(CONST_CAST(struct cls_rule *, &flow->cr));
952     dp_netdev_actions_free(dp_netdev_flow_get_actions(flow));
953     free(flow);
954 }
955
956 static void
957 dp_netdev_remove_flow(struct dp_netdev *dp, struct dp_netdev_flow *flow)
958     OVS_REQ_WRLOCK(dp->cls.rwlock)
959     OVS_REQUIRES(dp->flow_mutex)
960 {
961     struct cls_rule *cr = CONST_CAST(struct cls_rule *, &flow->cr);
962     struct hmap_node *node = CONST_CAST(struct hmap_node *, &flow->node);
963
964     classifier_remove(&dp->cls, cr);
965     hmap_remove(&dp->flow_table, node);
966     ovsrcu_postpone(dp_netdev_flow_free, flow);
967 }
968
969 static void
970 dp_netdev_flow_flush(struct dp_netdev *dp)
971 {
972     struct dp_netdev_flow *netdev_flow, *next;
973
974     ovs_mutex_lock(&dp->flow_mutex);
975     fat_rwlock_wrlock(&dp->cls.rwlock);
976     HMAP_FOR_EACH_SAFE (netdev_flow, next, node, &dp->flow_table) {
977         dp_netdev_remove_flow(dp, netdev_flow);
978     }
979     fat_rwlock_unlock(&dp->cls.rwlock);
980     ovs_mutex_unlock(&dp->flow_mutex);
981 }
982
983 static int
984 dpif_netdev_flow_flush(struct dpif *dpif)
985 {
986     struct dp_netdev *dp = get_dp_netdev(dpif);
987
988     dp_netdev_flow_flush(dp);
989     return 0;
990 }
991
992 struct dp_netdev_port_state {
993     struct cmap_position position;
994     char *name;
995 };
996
997 static int
998 dpif_netdev_port_dump_start(const struct dpif *dpif OVS_UNUSED, void **statep)
999 {
1000     *statep = xzalloc(sizeof(struct dp_netdev_port_state));
1001     return 0;
1002 }
1003
1004 static int
1005 dpif_netdev_port_dump_next(const struct dpif *dpif, void *state_,
1006                            struct dpif_port *dpif_port)
1007 {
1008     struct dp_netdev_port_state *state = state_;
1009     struct dp_netdev *dp = get_dp_netdev(dpif);
1010     struct cmap_node *node;
1011     int retval;
1012
1013     node = cmap_next_position(&dp->ports, &state->position);
1014     if (node) {
1015         struct dp_netdev_port *port;
1016
1017         port = CONTAINER_OF(node, struct dp_netdev_port, node);
1018
1019         free(state->name);
1020         state->name = xstrdup(netdev_get_name(port->netdev));
1021         dpif_port->name = state->name;
1022         dpif_port->type = port->type;
1023         dpif_port->port_no = port->port_no;
1024
1025         retval = 0;
1026     } else {
1027         retval = EOF;
1028     }
1029
1030     return retval;
1031 }
1032
1033 static int
1034 dpif_netdev_port_dump_done(const struct dpif *dpif OVS_UNUSED, void *state_)
1035 {
1036     struct dp_netdev_port_state *state = state_;
1037     free(state->name);
1038     free(state);
1039     return 0;
1040 }
1041
1042 static int
1043 dpif_netdev_port_poll(const struct dpif *dpif_, char **devnamep OVS_UNUSED)
1044 {
1045     struct dpif_netdev *dpif = dpif_netdev_cast(dpif_);
1046     uint64_t new_port_seq;
1047     int error;
1048
1049     new_port_seq = seq_read(dpif->dp->port_seq);
1050     if (dpif->last_port_seq != new_port_seq) {
1051         dpif->last_port_seq = new_port_seq;
1052         error = ENOBUFS;
1053     } else {
1054         error = EAGAIN;
1055     }
1056
1057     return error;
1058 }
1059
1060 static void
1061 dpif_netdev_port_poll_wait(const struct dpif *dpif_)
1062 {
1063     struct dpif_netdev *dpif = dpif_netdev_cast(dpif_);
1064
1065     seq_wait(dpif->dp->port_seq, dpif->last_port_seq);
1066 }
1067
1068 static struct dp_netdev_flow *
1069 dp_netdev_flow_cast(const struct cls_rule *cr)
1070 {
1071     return cr ? CONTAINER_OF(cr, struct dp_netdev_flow, cr) : NULL;
1072 }
1073
1074 static struct dp_netdev_flow *
1075 dp_netdev_lookup_flow(const struct dp_netdev *dp, const struct miniflow *key)
1076     OVS_REQ_RDLOCK(dp->cls.rwlock)
1077 {
1078     struct dp_netdev_flow *netdev_flow;
1079     struct cls_rule *rule;
1080
1081     classifier_lookup_miniflow_batch(&dp->cls, &key, &rule, 1);
1082     netdev_flow = dp_netdev_flow_cast(rule);
1083
1084     return netdev_flow;
1085 }
1086
1087 static struct dp_netdev_flow *
1088 dp_netdev_find_flow(const struct dp_netdev *dp, const struct flow *flow)
1089     OVS_REQ_RDLOCK(dp->cls.rwlock)
1090 {
1091     struct dp_netdev_flow *netdev_flow;
1092
1093     HMAP_FOR_EACH_WITH_HASH (netdev_flow, node, flow_hash(flow, 0),
1094                              &dp->flow_table) {
1095         if (flow_equal(&netdev_flow->flow, flow)) {
1096             return netdev_flow;
1097         }
1098     }
1099
1100     return NULL;
1101 }
1102
1103 static void
1104 get_dpif_flow_stats(struct dp_netdev_flow *netdev_flow,
1105                     struct dpif_flow_stats *stats)
1106 {
1107     struct dp_netdev_flow_stats *bucket;
1108     size_t i;
1109
1110     memset(stats, 0, sizeof *stats);
1111     OVSTHREAD_STATS_FOR_EACH_BUCKET (bucket, i, &netdev_flow->stats) {
1112         ovs_mutex_lock(&bucket->mutex);
1113         stats->n_packets += bucket->packet_count;
1114         stats->n_bytes += bucket->byte_count;
1115         stats->used = MAX(stats->used, bucket->used);
1116         stats->tcp_flags |= bucket->tcp_flags;
1117         ovs_mutex_unlock(&bucket->mutex);
1118     }
1119 }
1120
1121 static int
1122 dpif_netdev_mask_from_nlattrs(const struct nlattr *key, uint32_t key_len,
1123                               const struct nlattr *mask_key,
1124                               uint32_t mask_key_len, const struct flow *flow,
1125                               struct flow *mask)
1126 {
1127     if (mask_key_len) {
1128         enum odp_key_fitness fitness;
1129
1130         fitness = odp_flow_key_to_mask(mask_key, mask_key_len, mask, flow);
1131         if (fitness) {
1132             /* This should not happen: it indicates that
1133              * odp_flow_key_from_mask() and odp_flow_key_to_mask()
1134              * disagree on the acceptable form of a mask.  Log the problem
1135              * as an error, with enough details to enable debugging. */
1136             static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
1137
1138             if (!VLOG_DROP_ERR(&rl)) {
1139                 struct ds s;
1140
1141                 ds_init(&s);
1142                 odp_flow_format(key, key_len, mask_key, mask_key_len, NULL, &s,
1143                                 true);
1144                 VLOG_ERR("internal error parsing flow mask %s (%s)",
1145                          ds_cstr(&s), odp_key_fitness_to_string(fitness));
1146                 ds_destroy(&s);
1147             }
1148
1149             return EINVAL;
1150         }
1151     } else {
1152         enum mf_field_id id;
1153         /* No mask key, unwildcard everything except fields whose
1154          * prerequisities are not met. */
1155         memset(mask, 0x0, sizeof *mask);
1156
1157         for (id = 0; id < MFF_N_IDS; ++id) {
1158             /* Skip registers and metadata. */
1159             if (!(id >= MFF_REG0 && id < MFF_REG0 + FLOW_N_REGS)
1160                 && id != MFF_METADATA) {
1161                 const struct mf_field *mf = mf_from_id(id);
1162                 if (mf_are_prereqs_ok(mf, flow)) {
1163                     mf_mask_field(mf, mask);
1164                 }
1165             }
1166         }
1167     }
1168
1169     /* Force unwildcard the in_port.
1170      *
1171      * We need to do this even in the case where we unwildcard "everything"
1172      * above because "everything" only includes the 16-bit OpenFlow port number
1173      * mask->in_port.ofp_port, which only covers half of the 32-bit datapath
1174      * port number mask->in_port.odp_port. */
1175     mask->in_port.odp_port = u32_to_odp(UINT32_MAX);
1176
1177     return 0;
1178 }
1179
1180 static int
1181 dpif_netdev_flow_from_nlattrs(const struct nlattr *key, uint32_t key_len,
1182                               struct flow *flow)
1183 {
1184     odp_port_t in_port;
1185
1186     if (odp_flow_key_to_flow(key, key_len, flow)) {
1187         /* This should not happen: it indicates that odp_flow_key_from_flow()
1188          * and odp_flow_key_to_flow() disagree on the acceptable form of a
1189          * flow.  Log the problem as an error, with enough details to enable
1190          * debugging. */
1191         static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
1192
1193         if (!VLOG_DROP_ERR(&rl)) {
1194             struct ds s;
1195
1196             ds_init(&s);
1197             odp_flow_format(key, key_len, NULL, 0, NULL, &s, true);
1198             VLOG_ERR("internal error parsing flow key %s", ds_cstr(&s));
1199             ds_destroy(&s);
1200         }
1201
1202         return EINVAL;
1203     }
1204
1205     in_port = flow->in_port.odp_port;
1206     if (!is_valid_port_number(in_port) && in_port != ODPP_NONE) {
1207         return EINVAL;
1208     }
1209
1210     return 0;
1211 }
1212
1213 static int
1214 dpif_netdev_flow_get(const struct dpif *dpif,
1215                      const struct nlattr *nl_key, size_t nl_key_len,
1216                      struct ofpbuf **actionsp, struct dpif_flow_stats *stats)
1217 {
1218     struct dp_netdev *dp = get_dp_netdev(dpif);
1219     struct dp_netdev_flow *netdev_flow;
1220     struct flow key;
1221     int error;
1222
1223     error = dpif_netdev_flow_from_nlattrs(nl_key, nl_key_len, &key);
1224     if (error) {
1225         return error;
1226     }
1227
1228     fat_rwlock_rdlock(&dp->cls.rwlock);
1229     netdev_flow = dp_netdev_find_flow(dp, &key);
1230     fat_rwlock_unlock(&dp->cls.rwlock);
1231
1232     if (netdev_flow) {
1233         if (stats) {
1234             get_dpif_flow_stats(netdev_flow, stats);
1235         }
1236
1237         if (actionsp) {
1238             struct dp_netdev_actions *actions;
1239
1240             actions = dp_netdev_flow_get_actions(netdev_flow);
1241             *actionsp = ofpbuf_clone_data(actions->actions, actions->size);
1242         }
1243      } else {
1244         error = ENOENT;
1245     }
1246
1247     return error;
1248 }
1249
1250 static int
1251 dp_netdev_flow_add(struct dp_netdev *dp, const struct flow *flow,
1252                    const struct flow_wildcards *wc,
1253                    const struct nlattr *actions,
1254                    size_t actions_len)
1255     OVS_REQUIRES(dp->flow_mutex)
1256 {
1257     struct dp_netdev_flow *netdev_flow;
1258     struct match match;
1259
1260     netdev_flow = xzalloc(sizeof *netdev_flow);
1261     *CONST_CAST(struct flow *, &netdev_flow->flow) = *flow;
1262
1263     ovsthread_stats_init(&netdev_flow->stats);
1264
1265     ovsrcu_set(&netdev_flow->actions,
1266                dp_netdev_actions_create(actions, actions_len));
1267
1268     match_init(&match, flow, wc);
1269     cls_rule_init(CONST_CAST(struct cls_rule *, &netdev_flow->cr),
1270                   &match, NETDEV_RULE_PRIORITY);
1271     fat_rwlock_wrlock(&dp->cls.rwlock);
1272     classifier_insert(&dp->cls,
1273                       CONST_CAST(struct cls_rule *, &netdev_flow->cr));
1274     hmap_insert(&dp->flow_table,
1275                 CONST_CAST(struct hmap_node *, &netdev_flow->node),
1276                 flow_hash(flow, 0));
1277     fat_rwlock_unlock(&dp->cls.rwlock);
1278
1279     return 0;
1280 }
1281
1282 static void
1283 clear_stats(struct dp_netdev_flow *netdev_flow)
1284 {
1285     struct dp_netdev_flow_stats *bucket;
1286     size_t i;
1287
1288     OVSTHREAD_STATS_FOR_EACH_BUCKET (bucket, i, &netdev_flow->stats) {
1289         ovs_mutex_lock(&bucket->mutex);
1290         bucket->used = 0;
1291         bucket->packet_count = 0;
1292         bucket->byte_count = 0;
1293         bucket->tcp_flags = 0;
1294         ovs_mutex_unlock(&bucket->mutex);
1295     }
1296 }
1297
1298 static int
1299 dpif_netdev_flow_put(struct dpif *dpif, const struct dpif_flow_put *put)
1300 {
1301     struct dp_netdev *dp = get_dp_netdev(dpif);
1302     struct dp_netdev_flow *netdev_flow;
1303     struct flow flow;
1304     struct miniflow miniflow;
1305     struct flow_wildcards wc;
1306     int error;
1307
1308     error = dpif_netdev_flow_from_nlattrs(put->key, put->key_len, &flow);
1309     if (error) {
1310         return error;
1311     }
1312     error = dpif_netdev_mask_from_nlattrs(put->key, put->key_len,
1313                                           put->mask, put->mask_len,
1314                                           &flow, &wc.masks);
1315     if (error) {
1316         return error;
1317     }
1318     miniflow_init(&miniflow, &flow);
1319
1320     ovs_mutex_lock(&dp->flow_mutex);
1321     fat_rwlock_rdlock(&dp->cls.rwlock);
1322     netdev_flow = dp_netdev_lookup_flow(dp, &miniflow);
1323     fat_rwlock_unlock(&dp->cls.rwlock);
1324     if (!netdev_flow) {
1325         if (put->flags & DPIF_FP_CREATE) {
1326             if (hmap_count(&dp->flow_table) < MAX_FLOWS) {
1327                 if (put->stats) {
1328                     memset(put->stats, 0, sizeof *put->stats);
1329                 }
1330                 error = dp_netdev_flow_add(dp, &flow, &wc, put->actions,
1331                                            put->actions_len);
1332             } else {
1333                 error = EFBIG;
1334             }
1335         } else {
1336             error = ENOENT;
1337         }
1338     } else {
1339         if (put->flags & DPIF_FP_MODIFY
1340             && flow_equal(&flow, &netdev_flow->flow)) {
1341             struct dp_netdev_actions *new_actions;
1342             struct dp_netdev_actions *old_actions;
1343
1344             new_actions = dp_netdev_actions_create(put->actions,
1345                                                    put->actions_len);
1346
1347             old_actions = dp_netdev_flow_get_actions(netdev_flow);
1348             ovsrcu_set(&netdev_flow->actions, new_actions);
1349
1350             if (put->stats) {
1351                 get_dpif_flow_stats(netdev_flow, put->stats);
1352             }
1353             if (put->flags & DPIF_FP_ZERO_STATS) {
1354                 clear_stats(netdev_flow);
1355             }
1356
1357             ovsrcu_postpone(dp_netdev_actions_free, old_actions);
1358         } else if (put->flags & DPIF_FP_CREATE) {
1359             error = EEXIST;
1360         } else {
1361             /* Overlapping flow. */
1362             error = EINVAL;
1363         }
1364     }
1365     ovs_mutex_unlock(&dp->flow_mutex);
1366     miniflow_destroy(&miniflow);
1367
1368     return error;
1369 }
1370
1371 static int
1372 dpif_netdev_flow_del(struct dpif *dpif, const struct dpif_flow_del *del)
1373 {
1374     struct dp_netdev *dp = get_dp_netdev(dpif);
1375     struct dp_netdev_flow *netdev_flow;
1376     struct flow key;
1377     int error;
1378
1379     error = dpif_netdev_flow_from_nlattrs(del->key, del->key_len, &key);
1380     if (error) {
1381         return error;
1382     }
1383
1384     ovs_mutex_lock(&dp->flow_mutex);
1385     fat_rwlock_wrlock(&dp->cls.rwlock);
1386     netdev_flow = dp_netdev_find_flow(dp, &key);
1387     if (netdev_flow) {
1388         if (del->stats) {
1389             get_dpif_flow_stats(netdev_flow, del->stats);
1390         }
1391         dp_netdev_remove_flow(dp, netdev_flow);
1392     } else {
1393         error = ENOENT;
1394     }
1395     fat_rwlock_unlock(&dp->cls.rwlock);
1396     ovs_mutex_unlock(&dp->flow_mutex);
1397
1398     return error;
1399 }
1400
1401 struct dpif_netdev_flow_dump {
1402     struct dpif_flow_dump up;
1403     uint32_t bucket;
1404     uint32_t offset;
1405     int status;
1406     struct ovs_mutex mutex;
1407 };
1408
1409 static struct dpif_netdev_flow_dump *
1410 dpif_netdev_flow_dump_cast(struct dpif_flow_dump *dump)
1411 {
1412     return CONTAINER_OF(dump, struct dpif_netdev_flow_dump, up);
1413 }
1414
1415 static struct dpif_flow_dump *
1416 dpif_netdev_flow_dump_create(const struct dpif *dpif_)
1417 {
1418     struct dpif_netdev_flow_dump *dump;
1419
1420     dump = xmalloc(sizeof *dump);
1421     dpif_flow_dump_init(&dump->up, dpif_);
1422     dump->bucket = 0;
1423     dump->offset = 0;
1424     dump->status = 0;
1425     ovs_mutex_init(&dump->mutex);
1426
1427     return &dump->up;
1428 }
1429
1430 static int
1431 dpif_netdev_flow_dump_destroy(struct dpif_flow_dump *dump_)
1432 {
1433     struct dpif_netdev_flow_dump *dump = dpif_netdev_flow_dump_cast(dump_);
1434
1435     ovs_mutex_destroy(&dump->mutex);
1436     free(dump);
1437     return 0;
1438 }
1439
1440 struct dpif_netdev_flow_dump_thread {
1441     struct dpif_flow_dump_thread up;
1442     struct dpif_netdev_flow_dump *dump;
1443     struct odputil_keybuf keybuf[FLOW_DUMP_MAX_BATCH];
1444     struct odputil_keybuf maskbuf[FLOW_DUMP_MAX_BATCH];
1445 };
1446
1447 static struct dpif_netdev_flow_dump_thread *
1448 dpif_netdev_flow_dump_thread_cast(struct dpif_flow_dump_thread *thread)
1449 {
1450     return CONTAINER_OF(thread, struct dpif_netdev_flow_dump_thread, up);
1451 }
1452
1453 static struct dpif_flow_dump_thread *
1454 dpif_netdev_flow_dump_thread_create(struct dpif_flow_dump *dump_)
1455 {
1456     struct dpif_netdev_flow_dump *dump = dpif_netdev_flow_dump_cast(dump_);
1457     struct dpif_netdev_flow_dump_thread *thread;
1458
1459     thread = xmalloc(sizeof *thread);
1460     dpif_flow_dump_thread_init(&thread->up, &dump->up);
1461     thread->dump = dump;
1462     return &thread->up;
1463 }
1464
1465 static void
1466 dpif_netdev_flow_dump_thread_destroy(struct dpif_flow_dump_thread *thread_)
1467 {
1468     struct dpif_netdev_flow_dump_thread *thread
1469         = dpif_netdev_flow_dump_thread_cast(thread_);
1470
1471     free(thread);
1472 }
1473
1474 /* XXX the caller must use 'actions' without quiescing */
1475 static int
1476 dpif_netdev_flow_dump_next(struct dpif_flow_dump_thread *thread_,
1477                            struct dpif_flow *flows, int max_flows)
1478 {
1479     struct dpif_netdev_flow_dump_thread *thread
1480         = dpif_netdev_flow_dump_thread_cast(thread_);
1481     struct dpif_netdev_flow_dump *dump = thread->dump;
1482     struct dpif_netdev *dpif = dpif_netdev_cast(thread->up.dpif);
1483     struct dp_netdev_flow *netdev_flows[FLOW_DUMP_MAX_BATCH];
1484     struct dp_netdev *dp = get_dp_netdev(&dpif->dpif);
1485     int n_flows = 0;
1486     int i;
1487
1488     ovs_mutex_lock(&dump->mutex);
1489     if (!dump->status) {
1490         fat_rwlock_rdlock(&dp->cls.rwlock);
1491         for (n_flows = 0; n_flows < MIN(max_flows, FLOW_DUMP_MAX_BATCH);
1492              n_flows++) {
1493             struct hmap_node *node;
1494
1495             node = hmap_at_position(&dp->flow_table, &dump->bucket,
1496                                     &dump->offset);
1497             if (!node) {
1498                 dump->status = EOF;
1499                 break;
1500             }
1501             netdev_flows[n_flows] = CONTAINER_OF(node, struct dp_netdev_flow,
1502                                                  node);
1503         }
1504         fat_rwlock_unlock(&dp->cls.rwlock);
1505     }
1506     ovs_mutex_unlock(&dump->mutex);
1507
1508     for (i = 0; i < n_flows; i++) {
1509         struct odputil_keybuf *maskbuf = &thread->maskbuf[i];
1510         struct odputil_keybuf *keybuf = &thread->keybuf[i];
1511         struct dp_netdev_flow *netdev_flow = netdev_flows[i];
1512         struct dpif_flow *f = &flows[i];
1513         struct dp_netdev_actions *dp_actions;
1514         struct flow_wildcards wc;
1515         struct ofpbuf buf;
1516
1517         minimask_expand(&netdev_flow->cr.match.mask, &wc);
1518
1519         /* Key. */
1520         ofpbuf_use_stack(&buf, keybuf, sizeof *keybuf);
1521         odp_flow_key_from_flow(&buf, &netdev_flow->flow, &wc.masks,
1522                                netdev_flow->flow.in_port.odp_port, true);
1523         f->key = ofpbuf_data(&buf);
1524         f->key_len = ofpbuf_size(&buf);
1525
1526         /* Mask. */
1527         ofpbuf_use_stack(&buf, maskbuf, sizeof *maskbuf);
1528         odp_flow_key_from_mask(&buf, &wc.masks, &netdev_flow->flow,
1529                                odp_to_u32(wc.masks.in_port.odp_port),
1530                                SIZE_MAX, true);
1531         f->mask = ofpbuf_data(&buf);
1532         f->mask_len = ofpbuf_size(&buf);
1533
1534         /* Actions. */
1535         dp_actions = dp_netdev_flow_get_actions(netdev_flow);
1536         f->actions = dp_actions->actions;
1537         f->actions_len = dp_actions->size;
1538
1539         /* Stats. */
1540         get_dpif_flow_stats(netdev_flow, &f->stats);
1541     }
1542
1543     return n_flows;
1544 }
1545
1546 static int
1547 dpif_netdev_execute(struct dpif *dpif, struct dpif_execute *execute)
1548 {
1549     struct dp_netdev *dp = get_dp_netdev(dpif);
1550     struct dpif_packet packet, *pp;
1551     struct pkt_metadata *md = &execute->md;
1552
1553     if (ofpbuf_size(execute->packet) < ETH_HEADER_LEN ||
1554         ofpbuf_size(execute->packet) > UINT16_MAX) {
1555         return EINVAL;
1556     }
1557
1558     packet.ofpbuf = *execute->packet;
1559     pp = &packet;
1560
1561     dp_netdev_execute_actions(dp, &pp, 1, false, md,
1562                               execute->actions, execute->actions_len);
1563
1564     /* Even though may_steal is set to false, some actions could modify or
1565      * reallocate the ofpbuf memory. We need to pass those changes to the
1566      * caller */
1567     *execute->packet = packet.ofpbuf;
1568
1569     return 0;
1570 }
1571
1572 static void
1573 dp_netdev_destroy_all_queues(struct dp_netdev *dp)
1574     OVS_REQ_WRLOCK(dp->queue_rwlock)
1575 {
1576     size_t i;
1577
1578     dp_netdev_purge_queues(dp);
1579
1580     for (i = 0; i < dp->n_handlers; i++) {
1581         struct dp_netdev_queue *q = &dp->handler_queues[i];
1582
1583         ovs_mutex_destroy(&q->mutex);
1584         seq_destroy(q->seq);
1585     }
1586     free(dp->handler_queues);
1587     dp->handler_queues = NULL;
1588     dp->n_handlers = 0;
1589 }
1590
1591 static void
1592 dp_netdev_refresh_queues(struct dp_netdev *dp, uint32_t n_handlers)
1593     OVS_REQ_WRLOCK(dp->queue_rwlock)
1594 {
1595     if (dp->n_handlers != n_handlers) {
1596         size_t i;
1597
1598         dp_netdev_destroy_all_queues(dp);
1599
1600         dp->n_handlers = n_handlers;
1601         dp->handler_queues = xzalloc(n_handlers * sizeof *dp->handler_queues);
1602
1603         for (i = 0; i < n_handlers; i++) {
1604             struct dp_netdev_queue *q = &dp->handler_queues[i];
1605
1606             ovs_mutex_init(&q->mutex);
1607             q->seq = seq_create();
1608         }
1609     }
1610 }
1611
1612 static int
1613 dpif_netdev_recv_set(struct dpif *dpif, bool enable)
1614 {
1615     struct dp_netdev *dp = get_dp_netdev(dpif);
1616
1617     if ((dp->handler_queues != NULL) == enable) {
1618         return 0;
1619     }
1620
1621     fat_rwlock_wrlock(&dp->queue_rwlock);
1622     if (!enable) {
1623         dp_netdev_destroy_all_queues(dp);
1624     } else {
1625         dp_netdev_refresh_queues(dp, 1);
1626     }
1627     fat_rwlock_unlock(&dp->queue_rwlock);
1628
1629     return 0;
1630 }
1631
1632 static int
1633 dpif_netdev_handlers_set(struct dpif *dpif, uint32_t n_handlers)
1634 {
1635     struct dp_netdev *dp = get_dp_netdev(dpif);
1636
1637     fat_rwlock_wrlock(&dp->queue_rwlock);
1638     if (dp->handler_queues) {
1639         dp_netdev_refresh_queues(dp, n_handlers);
1640     }
1641     fat_rwlock_unlock(&dp->queue_rwlock);
1642
1643     return 0;
1644 }
1645
1646 static int
1647 dpif_netdev_queue_to_priority(const struct dpif *dpif OVS_UNUSED,
1648                               uint32_t queue_id, uint32_t *priority)
1649 {
1650     *priority = queue_id;
1651     return 0;
1652 }
1653
1654 static bool
1655 dp_netdev_recv_check(const struct dp_netdev *dp, const uint32_t handler_id)
1656     OVS_REQ_RDLOCK(dp->queue_rwlock)
1657 {
1658     static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
1659
1660     if (!dp->handler_queues) {
1661         VLOG_WARN_RL(&rl, "receiving upcall disabled");
1662         return false;
1663     }
1664
1665     if (handler_id >= dp->n_handlers) {
1666         VLOG_WARN_RL(&rl, "handler index out of bound");
1667         return false;
1668     }
1669
1670     return true;
1671 }
1672
1673 static int
1674 dpif_netdev_recv(struct dpif *dpif, uint32_t handler_id,
1675                  struct dpif_upcall *upcall, struct ofpbuf *buf)
1676 {
1677     struct dp_netdev *dp = get_dp_netdev(dpif);
1678     struct dp_netdev_queue *q;
1679     int error = 0;
1680
1681     fat_rwlock_rdlock(&dp->queue_rwlock);
1682
1683     if (!dp_netdev_recv_check(dp, handler_id)) {
1684         error = EAGAIN;
1685         goto out;
1686     }
1687
1688     q = &dp->handler_queues[handler_id];
1689     ovs_mutex_lock(&q->mutex);
1690     if (q->head != q->tail) {
1691         struct dp_netdev_upcall *u = &q->upcalls[q->tail++ & QUEUE_MASK];
1692
1693         *upcall = u->upcall;
1694
1695         ofpbuf_uninit(buf);
1696         *buf = u->buf;
1697     } else {
1698         error = EAGAIN;
1699     }
1700     ovs_mutex_unlock(&q->mutex);
1701
1702 out:
1703     fat_rwlock_unlock(&dp->queue_rwlock);
1704
1705     return error;
1706 }
1707
1708 static void
1709 dpif_netdev_recv_wait(struct dpif *dpif, uint32_t handler_id)
1710 {
1711     struct dp_netdev *dp = get_dp_netdev(dpif);
1712     struct dp_netdev_queue *q;
1713     uint64_t seq;
1714
1715     fat_rwlock_rdlock(&dp->queue_rwlock);
1716
1717     if (!dp_netdev_recv_check(dp, handler_id)) {
1718         goto out;
1719     }
1720
1721     q = &dp->handler_queues[handler_id];
1722     ovs_mutex_lock(&q->mutex);
1723     seq = seq_read(q->seq);
1724     if (q->head != q->tail) {
1725         poll_immediate_wake();
1726     } else {
1727         seq_wait(q->seq, seq);
1728     }
1729
1730     ovs_mutex_unlock(&q->mutex);
1731
1732 out:
1733     fat_rwlock_unlock(&dp->queue_rwlock);
1734 }
1735
1736 static void
1737 dpif_netdev_recv_purge(struct dpif *dpif)
1738 {
1739     struct dpif_netdev *dpif_netdev = dpif_netdev_cast(dpif);
1740
1741     fat_rwlock_wrlock(&dpif_netdev->dp->queue_rwlock);
1742     dp_netdev_purge_queues(dpif_netdev->dp);
1743     fat_rwlock_unlock(&dpif_netdev->dp->queue_rwlock);
1744 }
1745 \f
1746 /* Creates and returns a new 'struct dp_netdev_actions', with a reference count
1747  * of 1, whose actions are a copy of from the 'ofpacts_len' bytes of
1748  * 'ofpacts'. */
1749 struct dp_netdev_actions *
1750 dp_netdev_actions_create(const struct nlattr *actions, size_t size)
1751 {
1752     struct dp_netdev_actions *netdev_actions;
1753
1754     netdev_actions = xmalloc(sizeof *netdev_actions);
1755     netdev_actions->actions = xmemdup(actions, size);
1756     netdev_actions->size = size;
1757
1758     return netdev_actions;
1759 }
1760
1761 struct dp_netdev_actions *
1762 dp_netdev_flow_get_actions(const struct dp_netdev_flow *flow)
1763 {
1764     return ovsrcu_get(struct dp_netdev_actions *, &flow->actions);
1765 }
1766
1767 static void
1768 dp_netdev_actions_free(struct dp_netdev_actions *actions)
1769 {
1770     free(actions->actions);
1771     free(actions);
1772 }
1773 \f
1774
1775 static void
1776 dp_netdev_process_rxq_port(struct dp_netdev *dp,
1777                           struct dp_netdev_port *port,
1778                           struct netdev_rxq *rxq)
1779 {
1780     struct dpif_packet *packets[NETDEV_MAX_RX_BATCH];
1781     int error, cnt;
1782
1783     error = netdev_rxq_recv(rxq, packets, &cnt);
1784     if (!error) {
1785         dp_netdev_port_input(dp, packets, cnt, port->port_no);
1786     } else if (error != EAGAIN && error != EOPNOTSUPP) {
1787         static struct vlog_rate_limit rl
1788             = VLOG_RATE_LIMIT_INIT(1, 5);
1789
1790         VLOG_ERR_RL(&rl, "error receiving data from %s: %s",
1791                     netdev_get_name(port->netdev),
1792                     ovs_strerror(error));
1793     }
1794 }
1795
1796 static void
1797 dpif_netdev_run(struct dpif *dpif)
1798 {
1799     struct dp_netdev_port *port;
1800     struct dp_netdev *dp = get_dp_netdev(dpif);
1801
1802     CMAP_FOR_EACH (port, node, &dp->ports) {
1803         if (!netdev_is_pmd(port->netdev)) {
1804             int i;
1805
1806             for (i = 0; i < netdev_n_rxq(port->netdev); i++) {
1807                 dp_netdev_process_rxq_port(dp, port, port->rxq[i]);
1808             }
1809         }
1810     }
1811 }
1812
1813 static void
1814 dpif_netdev_wait(struct dpif *dpif)
1815 {
1816     struct dp_netdev_port *port;
1817     struct dp_netdev *dp = get_dp_netdev(dpif);
1818
1819     ovs_mutex_lock(&dp_netdev_mutex);
1820     CMAP_FOR_EACH (port, node, &dp->ports) {
1821         if (!netdev_is_pmd(port->netdev)) {
1822             int i;
1823
1824             for (i = 0; i < netdev_n_rxq(port->netdev); i++) {
1825                 netdev_rxq_wait(port->rxq[i]);
1826             }
1827         }
1828     }
1829     ovs_mutex_unlock(&dp_netdev_mutex);
1830 }
1831
1832 struct rxq_poll {
1833     struct dp_netdev_port *port;
1834     struct netdev_rxq *rx;
1835 };
1836
1837 static int
1838 pmd_load_queues(struct pmd_thread *f,
1839                 struct rxq_poll **ppoll_list, int poll_cnt)
1840 {
1841     struct dp_netdev *dp = f->dp;
1842     struct rxq_poll *poll_list = *ppoll_list;
1843     struct dp_netdev_port *port;
1844     int id = f->id;
1845     int index;
1846     int i;
1847
1848     /* Simple scheduler for netdev rx polling. */
1849     for (i = 0; i < poll_cnt; i++) {
1850          port_unref(poll_list[i].port);
1851     }
1852
1853     poll_cnt = 0;
1854     index = 0;
1855
1856     CMAP_FOR_EACH (port, node, &f->dp->ports) {
1857         if (netdev_is_pmd(port->netdev)) {
1858             int i;
1859
1860             for (i = 0; i < netdev_n_rxq(port->netdev); i++) {
1861                 if ((index % dp->n_pmd_threads) == id) {
1862                     poll_list = xrealloc(poll_list, sizeof *poll_list * (poll_cnt + 1));
1863
1864                     port_ref(port);
1865                     poll_list[poll_cnt].port = port;
1866                     poll_list[poll_cnt].rx = port->rxq[i];
1867                     poll_cnt++;
1868                 }
1869                 index++;
1870             }
1871         }
1872     }
1873
1874     *ppoll_list = poll_list;
1875     return poll_cnt;
1876 }
1877
1878 static void *
1879 pmd_thread_main(void *f_)
1880 {
1881     struct pmd_thread *f = f_;
1882     struct dp_netdev *dp = f->dp;
1883     unsigned int lc = 0;
1884     struct rxq_poll *poll_list;
1885     unsigned int port_seq;
1886     int poll_cnt;
1887     int i;
1888
1889     poll_cnt = 0;
1890     poll_list = NULL;
1891
1892     pmd_thread_setaffinity_cpu(f->id);
1893 reload:
1894     poll_cnt = pmd_load_queues(f, &poll_list, poll_cnt);
1895     atomic_read(&f->change_seq, &port_seq);
1896
1897     for (;;) {
1898         unsigned int c_port_seq;
1899         int i;
1900
1901         for (i = 0; i < poll_cnt; i++) {
1902             dp_netdev_process_rxq_port(dp,  poll_list[i].port, poll_list[i].rx);
1903         }
1904
1905         if (lc++ > 1024) {
1906             ovsrcu_quiesce();
1907
1908             /* TODO: need completely userspace based signaling method.
1909              * to keep this thread entirely in userspace.
1910              * For now using atomic counter. */
1911             lc = 0;
1912             atomic_read_explicit(&f->change_seq, &c_port_seq, memory_order_consume);
1913             if (c_port_seq != port_seq) {
1914                 break;
1915             }
1916         }
1917     }
1918
1919     if (!latch_is_set(&f->dp->exit_latch)){
1920         goto reload;
1921     }
1922
1923     for (i = 0; i < poll_cnt; i++) {
1924          port_unref(poll_list[i].port);
1925     }
1926
1927     free(poll_list);
1928     return NULL;
1929 }
1930
1931 static void
1932 dp_netdev_set_pmd_threads(struct dp_netdev *dp, int n)
1933 {
1934     int i;
1935
1936     if (n == dp->n_pmd_threads) {
1937         return;
1938     }
1939
1940     /* Stop existing threads. */
1941     latch_set(&dp->exit_latch);
1942     dp_netdev_reload_pmd_threads(dp);
1943     for (i = 0; i < dp->n_pmd_threads; i++) {
1944         struct pmd_thread *f = &dp->pmd_threads[i];
1945
1946         xpthread_join(f->thread, NULL);
1947     }
1948     latch_poll(&dp->exit_latch);
1949     free(dp->pmd_threads);
1950
1951     /* Start new threads. */
1952     dp->pmd_threads = xmalloc(n * sizeof *dp->pmd_threads);
1953     dp->n_pmd_threads = n;
1954
1955     for (i = 0; i < n; i++) {
1956         struct pmd_thread *f = &dp->pmd_threads[i];
1957
1958         f->dp = dp;
1959         f->id = i;
1960         atomic_store(&f->change_seq, 1);
1961
1962         /* Each thread will distribute all devices rx-queues among
1963          * themselves. */
1964         f->thread = ovs_thread_create("pmd", pmd_thread_main, f);
1965     }
1966 }
1967
1968 \f
1969 static void *
1970 dp_netdev_flow_stats_new_cb(void)
1971 {
1972     struct dp_netdev_flow_stats *bucket = xzalloc_cacheline(sizeof *bucket);
1973     ovs_mutex_init(&bucket->mutex);
1974     return bucket;
1975 }
1976
1977 static void
1978 dp_netdev_flow_used(struct dp_netdev_flow *netdev_flow,
1979                     int cnt, int size,
1980                     uint16_t tcp_flags)
1981 {
1982     long long int now = time_msec();
1983     struct dp_netdev_flow_stats *bucket;
1984
1985     bucket = ovsthread_stats_bucket_get(&netdev_flow->stats,
1986                                         dp_netdev_flow_stats_new_cb);
1987
1988     ovs_mutex_lock(&bucket->mutex);
1989     bucket->used = MAX(now, bucket->used);
1990     bucket->packet_count += cnt;
1991     bucket->byte_count += size;
1992     bucket->tcp_flags |= tcp_flags;
1993     ovs_mutex_unlock(&bucket->mutex);
1994 }
1995
1996 static void *
1997 dp_netdev_stats_new_cb(void)
1998 {
1999     struct dp_netdev_stats *bucket = xzalloc_cacheline(sizeof *bucket);
2000     ovs_mutex_init(&bucket->mutex);
2001     return bucket;
2002 }
2003
2004 static void
2005 dp_netdev_count_packet(struct dp_netdev *dp, enum dp_stat_type type, int cnt)
2006 {
2007     struct dp_netdev_stats *bucket;
2008
2009     bucket = ovsthread_stats_bucket_get(&dp->stats, dp_netdev_stats_new_cb);
2010     ovs_mutex_lock(&bucket->mutex);
2011     bucket->n[type] += cnt;
2012     ovs_mutex_unlock(&bucket->mutex);
2013 }
2014
2015 struct packet_batch {
2016     unsigned int packet_count;
2017     unsigned int byte_count;
2018     uint16_t tcp_flags;
2019
2020     struct dp_netdev_flow *flow;
2021
2022     struct dpif_packet *packets[NETDEV_MAX_RX_BATCH];
2023     struct pkt_metadata md;
2024 };
2025
2026 static inline void
2027 packet_batch_update(struct packet_batch *batch,
2028                     struct dpif_packet *packet, const struct miniflow *mf)
2029 {
2030     batch->tcp_flags |= miniflow_get_tcp_flags(mf);
2031     batch->packets[batch->packet_count++] = packet;
2032     batch->byte_count += ofpbuf_size(&packet->ofpbuf);
2033 }
2034
2035 static inline void
2036 packet_batch_init(struct packet_batch *batch, struct dp_netdev_flow *flow,
2037                   struct pkt_metadata *md)
2038 {
2039     batch->flow = flow;
2040     batch->md = *md;
2041
2042     batch->packet_count = 0;
2043     batch->byte_count = 0;
2044     batch->tcp_flags = 0;
2045 }
2046
2047 static inline void
2048 packet_batch_execute(struct packet_batch *batch, struct dp_netdev *dp)
2049 {
2050     struct dp_netdev_actions *actions;
2051     struct dp_netdev_flow *flow = batch->flow;
2052
2053     dp_netdev_flow_used(batch->flow, batch->packet_count, batch->byte_count,
2054                         batch->tcp_flags);
2055
2056     actions = dp_netdev_flow_get_actions(flow);
2057
2058     dp_netdev_execute_actions(dp, batch->packets,
2059                               batch->packet_count, true, &batch->md,
2060                               actions->actions, actions->size);
2061
2062     dp_netdev_count_packet(dp, DP_STAT_HIT, batch->packet_count);
2063 }
2064
2065 static void
2066 dp_netdev_input(struct dp_netdev *dp, struct dpif_packet **packets, int cnt,
2067                 struct pkt_metadata *md)
2068 {
2069     struct packet_batch batches[NETDEV_MAX_RX_BATCH];
2070     struct netdev_flow_key keys[NETDEV_MAX_RX_BATCH];
2071     const struct miniflow *mfs[NETDEV_MAX_RX_BATCH]; /* NULL at bad packets. */
2072     struct cls_rule *rules[NETDEV_MAX_RX_BATCH];
2073     size_t n_batches, i;
2074
2075     for (i = 0; i < cnt; i++) {
2076         if (OVS_UNLIKELY(ofpbuf_size(&packets[i]->ofpbuf) < ETH_HEADER_LEN)) {
2077             dpif_packet_delete(packets[i]);
2078             mfs[i] = NULL;
2079             continue;
2080         }
2081
2082         miniflow_initialize(&keys[i].flow, keys[i].buf);
2083         miniflow_extract(&packets[i]->ofpbuf, md, &keys[i].flow);
2084         mfs[i] = &keys[i].flow;
2085     }
2086
2087     fat_rwlock_rdlock(&dp->cls.rwlock);
2088     classifier_lookup_miniflow_batch(&dp->cls, mfs, rules, cnt);
2089     fat_rwlock_unlock(&dp->cls.rwlock);
2090
2091     n_batches = 0;
2092     for (i = 0; i < cnt; i++) {
2093         struct dp_netdev_flow *flow;
2094         struct packet_batch *batch;
2095         size_t j;
2096
2097         if (OVS_UNLIKELY(!mfs[i])) {
2098             continue;
2099         }
2100
2101         if (OVS_UNLIKELY(!rules[i])) {
2102             dp_netdev_count_packet(dp, DP_STAT_MISS, 1);
2103             if (OVS_LIKELY(dp->handler_queues)) {
2104                 uint32_t hash = miniflow_hash_5tuple(mfs[i], 0);
2105                 struct ofpbuf *buf = &packets[i]->ofpbuf;
2106
2107                 dp_netdev_output_userspace(dp, &buf, 1, hash % dp->n_handlers,
2108                                            DPIF_UC_MISS, mfs[i], NULL);
2109             } else {
2110                 /* No upcall queue.  Freeing the packet */
2111                 dpif_packet_delete(packets[i]);
2112             }
2113             continue;
2114         }
2115
2116         /* XXX: This O(n^2) algortihm makes sense if we're operating under the
2117          * assumption that the number of distinct flows (and therefore the
2118          * number of distinct batches) is quite small.  If this turns out not
2119          * to be the case, it may make sense to pre sort based on the
2120          * netdev_flow pointer.  That done we can get the appropriate batching
2121          * in O(n * log(n)) instead. */
2122         batch = NULL;
2123         flow = dp_netdev_flow_cast(rules[i]);
2124         for (j = 0; j < n_batches; j++) {
2125             if (batches[j].flow == flow) {
2126                 batch = &batches[j];
2127                 break;
2128             }
2129         }
2130
2131         if (!batch) {
2132             batch = &batches[n_batches++];
2133             packet_batch_init(batch, flow, md);
2134         }
2135         packet_batch_update(batch, packets[i], mfs[i]);
2136     }
2137
2138     for (i = 0; i < n_batches; i++) {
2139         packet_batch_execute(&batches[i], dp);
2140     }
2141 }
2142
2143 static void
2144 dp_netdev_port_input(struct dp_netdev *dp, struct dpif_packet **packets,
2145                      int cnt, odp_port_t port_no)
2146 {
2147     uint32_t *recirc_depth = recirc_depth_get();
2148     struct pkt_metadata md = PKT_METADATA_INITIALIZER(port_no);
2149
2150     *recirc_depth = 0;
2151     dp_netdev_input(dp, packets, cnt, &md);
2152 }
2153
2154 static int
2155 dp_netdev_queue_userspace_packet(struct dp_netdev_queue *q,
2156                                  struct ofpbuf *packet, int type,
2157                                  const struct miniflow *key,
2158                                  const struct nlattr *userdata)
2159 OVS_REQUIRES(q->mutex)
2160 {
2161     if (q->head - q->tail < MAX_QUEUE_LEN) {
2162         struct dp_netdev_upcall *u = &q->upcalls[q->head++ & QUEUE_MASK];
2163         struct dpif_upcall *upcall = &u->upcall;
2164         struct ofpbuf *buf = &u->buf;
2165         size_t buf_size;
2166         struct flow flow;
2167
2168         upcall->type = type;
2169
2170         /* Allocate buffer big enough for everything. */
2171         buf_size = ODPUTIL_FLOW_KEY_BYTES;
2172         if (userdata) {
2173             buf_size += NLA_ALIGN(userdata->nla_len);
2174         }
2175         ofpbuf_init(buf, buf_size);
2176
2177         /* Put ODP flow. */
2178         miniflow_expand(key, &flow);
2179         odp_flow_key_from_flow(buf, &flow, NULL, flow.in_port.odp_port, true);
2180         upcall->key = ofpbuf_data(buf);
2181         upcall->key_len = ofpbuf_size(buf);
2182
2183         /* Put userdata. */
2184         if (userdata) {
2185             upcall->userdata = ofpbuf_put(buf, userdata,
2186                     NLA_ALIGN(userdata->nla_len));
2187         }
2188
2189         upcall->packet = *packet;
2190
2191         seq_change(q->seq);
2192
2193         return 0;
2194     } else {
2195         ofpbuf_delete(packet);
2196         return ENOBUFS;
2197     }
2198
2199 }
2200
2201 static int
2202 dp_netdev_output_userspace(struct dp_netdev *dp, struct ofpbuf **packets,
2203                            int cnt, int queue_no, int type,
2204                            const struct miniflow *key,
2205                            const struct nlattr *userdata)
2206 {
2207     struct dp_netdev_queue *q;
2208     int error;
2209     int i;
2210
2211     fat_rwlock_rdlock(&dp->queue_rwlock);
2212     q = &dp->handler_queues[queue_no];
2213     ovs_mutex_lock(&q->mutex);
2214     for (i = 0; i < cnt; i++) {
2215         struct ofpbuf *packet = packets[i];
2216
2217         error = dp_netdev_queue_userspace_packet(q, packet, type, key,
2218                                                  userdata);
2219         if (error == ENOBUFS) {
2220             dp_netdev_count_packet(dp, DP_STAT_LOST, 1);
2221         }
2222     }
2223     ovs_mutex_unlock(&q->mutex);
2224     fat_rwlock_unlock(&dp->queue_rwlock);
2225
2226     return error;
2227 }
2228
2229 struct dp_netdev_execute_aux {
2230     struct dp_netdev *dp;
2231 };
2232
2233 static void
2234 dp_execute_cb(void *aux_, struct dpif_packet **packets, int cnt,
2235               struct pkt_metadata *md,
2236               const struct nlattr *a, bool may_steal)
2237     OVS_NO_THREAD_SAFETY_ANALYSIS
2238 {
2239     struct dp_netdev_execute_aux *aux = aux_;
2240     int type = nl_attr_type(a);
2241     struct dp_netdev_port *p;
2242     uint32_t *depth = recirc_depth_get();
2243     int i;
2244
2245     switch ((enum ovs_action_attr)type) {
2246     case OVS_ACTION_ATTR_OUTPUT:
2247         p = dp_netdev_lookup_port(aux->dp, u32_to_odp(nl_attr_get_u32(a)));
2248         if (OVS_LIKELY(p)) {
2249             netdev_send(p->netdev, packets, cnt, may_steal);
2250         } else if (may_steal) {
2251             for (i = 0; i < cnt; i++) {
2252                 dpif_packet_delete(packets[i]);
2253             }
2254         }
2255         break;
2256
2257     case OVS_ACTION_ATTR_USERSPACE: {
2258         const struct nlattr *userdata;
2259         struct netdev_flow_key key;
2260
2261         userdata = nl_attr_find_nested(a, OVS_USERSPACE_ATTR_USERDATA);
2262
2263         miniflow_initialize(&key.flow, key.buf);
2264
2265         for (i = 0; i < cnt; i++) {
2266             struct ofpbuf *packet, *userspace_packet;
2267
2268             packet = &packets[i]->ofpbuf;
2269
2270             miniflow_extract(packet, md, &key.flow);
2271
2272             userspace_packet = may_steal ? packet : ofpbuf_clone(packet);
2273
2274             dp_netdev_output_userspace(aux->dp, &userspace_packet, 1,
2275                                        miniflow_hash_5tuple(&key.flow, 0)
2276                                            % aux->dp->n_handlers,
2277                                        DPIF_UC_ACTION, &key.flow,
2278                                        userdata);
2279         }
2280         break;
2281     }
2282
2283     case OVS_ACTION_ATTR_HASH: {
2284         const struct ovs_action_hash *hash_act;
2285         struct netdev_flow_key key;
2286         uint32_t hash;
2287
2288         hash_act = nl_attr_get(a);
2289
2290         miniflow_initialize(&key.flow, key.buf);
2291
2292         for (i = 0; i < cnt; i++) {
2293
2294             /* TODO: this is slow. Use RSS hash in the future */
2295             miniflow_extract(&packets[i]->ofpbuf, md, &key.flow);
2296
2297             if (hash_act->hash_alg == OVS_HASH_ALG_L4) {
2298                 /* Hash need not be symmetric, nor does it need to include
2299                  * L2 fields. */
2300                 hash = miniflow_hash_5tuple(&key.flow, hash_act->hash_basis);
2301             } else {
2302                 VLOG_WARN("Unknown hash algorithm specified "
2303                           "for the hash action.");
2304                 hash = 2;
2305             }
2306
2307             if (!hash) {
2308                 hash = 1; /* 0 is not valid */
2309             }
2310
2311             if (i == 0) {
2312                 md->dp_hash = hash;
2313             }
2314             packets[i]->dp_hash = hash;
2315         }
2316         break;
2317     }
2318
2319     case OVS_ACTION_ATTR_RECIRC:
2320         if (*depth < MAX_RECIRC_DEPTH) {
2321
2322             (*depth)++;
2323             for (i = 0; i < cnt; i++) {
2324                 struct dpif_packet *recirc_pkt;
2325                 struct pkt_metadata recirc_md = *md;
2326
2327                 recirc_pkt = (may_steal) ? packets[i]
2328                                     : dpif_packet_clone(packets[i]);
2329
2330                 recirc_md.recirc_id = nl_attr_get_u32(a);
2331
2332                 /* Hash is private to each packet */
2333                 recirc_md.dp_hash = packets[i]->dp_hash;
2334
2335                 dp_netdev_input(aux->dp, &recirc_pkt, 1, &recirc_md);
2336             }
2337             (*depth)--;
2338
2339             break;
2340         } else {
2341             VLOG_WARN("Packet dropped. Max recirculation depth exceeded.");
2342             if (may_steal) {
2343                 for (i = 0; i < cnt; i++) {
2344                     dpif_packet_delete(packets[i]);
2345                 }
2346             }
2347         }
2348         break;
2349
2350     case OVS_ACTION_ATTR_PUSH_VLAN:
2351     case OVS_ACTION_ATTR_POP_VLAN:
2352     case OVS_ACTION_ATTR_PUSH_MPLS:
2353     case OVS_ACTION_ATTR_POP_MPLS:
2354     case OVS_ACTION_ATTR_SET:
2355     case OVS_ACTION_ATTR_SAMPLE:
2356     case OVS_ACTION_ATTR_UNSPEC:
2357     case __OVS_ACTION_ATTR_MAX:
2358         OVS_NOT_REACHED();
2359     }
2360 }
2361
2362 static void
2363 dp_netdev_execute_actions(struct dp_netdev *dp,
2364                           struct dpif_packet **packets, int cnt,
2365                           bool may_steal, struct pkt_metadata *md,
2366                           const struct nlattr *actions, size_t actions_len)
2367 {
2368     struct dp_netdev_execute_aux aux = {dp};
2369
2370     odp_execute_actions(&aux, packets, cnt, may_steal, md, actions,
2371                         actions_len, dp_execute_cb);
2372 }
2373
2374 const struct dpif_class dpif_netdev_class = {
2375     "netdev",
2376     dpif_netdev_enumerate,
2377     dpif_netdev_port_open_type,
2378     dpif_netdev_open,
2379     dpif_netdev_close,
2380     dpif_netdev_destroy,
2381     dpif_netdev_run,
2382     dpif_netdev_wait,
2383     dpif_netdev_get_stats,
2384     dpif_netdev_port_add,
2385     dpif_netdev_port_del,
2386     dpif_netdev_port_query_by_number,
2387     dpif_netdev_port_query_by_name,
2388     NULL,                       /* port_get_pid */
2389     dpif_netdev_port_dump_start,
2390     dpif_netdev_port_dump_next,
2391     dpif_netdev_port_dump_done,
2392     dpif_netdev_port_poll,
2393     dpif_netdev_port_poll_wait,
2394     dpif_netdev_flow_get,
2395     dpif_netdev_flow_put,
2396     dpif_netdev_flow_del,
2397     dpif_netdev_flow_flush,
2398     dpif_netdev_flow_dump_create,
2399     dpif_netdev_flow_dump_destroy,
2400     dpif_netdev_flow_dump_thread_create,
2401     dpif_netdev_flow_dump_thread_destroy,
2402     dpif_netdev_flow_dump_next,
2403     dpif_netdev_execute,
2404     NULL,                       /* operate */
2405     dpif_netdev_recv_set,
2406     dpif_netdev_handlers_set,
2407     dpif_netdev_queue_to_priority,
2408     dpif_netdev_recv,
2409     dpif_netdev_recv_wait,
2410     dpif_netdev_recv_purge,
2411 };
2412
2413 static void
2414 dpif_dummy_change_port_number(struct unixctl_conn *conn, int argc OVS_UNUSED,
2415                               const char *argv[], void *aux OVS_UNUSED)
2416 {
2417     struct dp_netdev_port *old_port;
2418     struct dp_netdev_port *new_port;
2419     struct dp_netdev *dp;
2420     odp_port_t port_no;
2421
2422     ovs_mutex_lock(&dp_netdev_mutex);
2423     dp = shash_find_data(&dp_netdevs, argv[1]);
2424     if (!dp || !dpif_netdev_class_is_dummy(dp->class)) {
2425         ovs_mutex_unlock(&dp_netdev_mutex);
2426         unixctl_command_reply_error(conn, "unknown datapath or not a dummy");
2427         return;
2428     }
2429     ovs_refcount_ref(&dp->ref_cnt);
2430     ovs_mutex_unlock(&dp_netdev_mutex);
2431
2432     ovs_mutex_lock(&dp->port_mutex);
2433     if (get_port_by_name(dp, argv[2], &old_port)) {
2434         unixctl_command_reply_error(conn, "unknown port");
2435         goto exit;
2436     }
2437
2438     port_no = u32_to_odp(atoi(argv[3]));
2439     if (!port_no || port_no == ODPP_NONE) {
2440         unixctl_command_reply_error(conn, "bad port number");
2441         goto exit;
2442     }
2443     if (dp_netdev_lookup_port(dp, port_no)) {
2444         unixctl_command_reply_error(conn, "port number already in use");
2445         goto exit;
2446     }
2447
2448     /* Remove old port. */
2449     cmap_remove(&dp->ports, &old_port->node, hash_port_no(old_port->port_no));
2450     ovsrcu_postpone(free, old_port);
2451
2452     /* Insert new port (cmap semantics mean we cannot re-insert 'old_port'). */
2453     new_port = xmemdup(old_port, sizeof *old_port);
2454     new_port->port_no = port_no;
2455     cmap_insert(&dp->ports, &new_port->node, hash_port_no(port_no));
2456
2457     seq_change(dp->port_seq);
2458     unixctl_command_reply(conn, NULL);
2459
2460 exit:
2461     ovs_mutex_unlock(&dp->port_mutex);
2462     dp_netdev_unref(dp);
2463 }
2464
2465 static void
2466 dpif_dummy_delete_port(struct unixctl_conn *conn, int argc OVS_UNUSED,
2467                        const char *argv[], void *aux OVS_UNUSED)
2468 {
2469     struct dp_netdev_port *port;
2470     struct dp_netdev *dp;
2471
2472     ovs_mutex_lock(&dp_netdev_mutex);
2473     dp = shash_find_data(&dp_netdevs, argv[1]);
2474     if (!dp || !dpif_netdev_class_is_dummy(dp->class)) {
2475         ovs_mutex_unlock(&dp_netdev_mutex);
2476         unixctl_command_reply_error(conn, "unknown datapath or not a dummy");
2477         return;
2478     }
2479     ovs_refcount_ref(&dp->ref_cnt);
2480     ovs_mutex_unlock(&dp_netdev_mutex);
2481
2482     ovs_mutex_lock(&dp->port_mutex);
2483     if (get_port_by_name(dp, argv[2], &port)) {
2484         unixctl_command_reply_error(conn, "unknown port");
2485     } else if (port->port_no == ODPP_LOCAL) {
2486         unixctl_command_reply_error(conn, "can't delete local port");
2487     } else {
2488         do_del_port(dp, port);
2489         unixctl_command_reply(conn, NULL);
2490     }
2491     ovs_mutex_unlock(&dp->port_mutex);
2492
2493     dp_netdev_unref(dp);
2494 }
2495
2496 static void
2497 dpif_dummy_register__(const char *type)
2498 {
2499     struct dpif_class *class;
2500
2501     class = xmalloc(sizeof *class);
2502     *class = dpif_netdev_class;
2503     class->type = xstrdup(type);
2504     dp_register_provider(class);
2505 }
2506
2507 void
2508 dpif_dummy_register(bool override)
2509 {
2510     if (override) {
2511         struct sset types;
2512         const char *type;
2513
2514         sset_init(&types);
2515         dp_enumerate_types(&types);
2516         SSET_FOR_EACH (type, &types) {
2517             if (!dp_unregister_provider(type)) {
2518                 dpif_dummy_register__(type);
2519             }
2520         }
2521         sset_destroy(&types);
2522     }
2523
2524     dpif_dummy_register__("dummy");
2525
2526     unixctl_command_register("dpif-dummy/change-port-number",
2527                              "DP PORT NEW-NUMBER",
2528                              3, 3, dpif_dummy_change_port_number, NULL);
2529     unixctl_command_register("dpif-dummy/delete-port", "DP PORT",
2530                              2, 2, dpif_dummy_delete_port, NULL);
2531 }