netdev: netdev_send accepts multiple packets
[cascardo/ovs.git] / lib / dpif-netdev.c
1 /*
2  * Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014 Nicira, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at:
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <config.h>
18 #include "dpif.h"
19
20 #include <ctype.h>
21 #include <errno.h>
22 #include <fcntl.h>
23 #include <inttypes.h>
24 #include <netinet/in.h>
25 #include <sys/socket.h>
26 #include <net/if.h>
27 #include <stdint.h>
28 #include <stdlib.h>
29 #include <string.h>
30 #include <sys/ioctl.h>
31 #include <sys/stat.h>
32 #include <unistd.h>
33
34 #include "classifier.h"
35 #include "cmap.h"
36 #include "csum.h"
37 #include "dpif.h"
38 #include "dpif-provider.h"
39 #include "dummy.h"
40 #include "dynamic-string.h"
41 #include "flow.h"
42 #include "hmap.h"
43 #include "latch.h"
44 #include "list.h"
45 #include "meta-flow.h"
46 #include "netdev.h"
47 #include "netdev-dpdk.h"
48 #include "netdev-vport.h"
49 #include "netlink.h"
50 #include "odp-execute.h"
51 #include "odp-util.h"
52 #include "ofp-print.h"
53 #include "ofpbuf.h"
54 #include "ovs-rcu.h"
55 #include "packet-dpif.h"
56 #include "packets.h"
57 #include "poll-loop.h"
58 #include "random.h"
59 #include "seq.h"
60 #include "shash.h"
61 #include "sset.h"
62 #include "timeval.h"
63 #include "unixctl.h"
64 #include "util.h"
65 #include "vlog.h"
66
67 VLOG_DEFINE_THIS_MODULE(dpif_netdev);
68
69 /* By default, choose a priority in the middle. */
70 #define NETDEV_RULE_PRIORITY 0x8000
71
72 #define NR_THREADS 1
73 /* Use per thread recirc_depth to prevent recirculation loop. */
74 #define MAX_RECIRC_DEPTH 5
75 DEFINE_STATIC_PER_THREAD_DATA(uint32_t, recirc_depth, 0)
76
77 /* Configuration parameters. */
78 enum { MAX_FLOWS = 65536 };     /* Maximum number of flows in flow table. */
79
80 /* Queues. */
81 enum { MAX_QUEUE_LEN = 128 };   /* Maximum number of packets per queue. */
82 enum { QUEUE_MASK = MAX_QUEUE_LEN - 1 };
83 BUILD_ASSERT_DECL(IS_POW2(MAX_QUEUE_LEN));
84
85 /* Protects against changes to 'dp_netdevs'. */
86 static struct ovs_mutex dp_netdev_mutex = OVS_MUTEX_INITIALIZER;
87
88 /* Contains all 'struct dp_netdev's. */
89 static struct shash dp_netdevs OVS_GUARDED_BY(dp_netdev_mutex)
90     = SHASH_INITIALIZER(&dp_netdevs);
91
92 struct dp_netdev_upcall {
93     struct dpif_upcall upcall;  /* Queued upcall information. */
94     struct ofpbuf buf;          /* ofpbuf instance for upcall.packet. */
95 };
96
97 /* A queue passing packets from a struct dp_netdev to its clients (handlers).
98  *
99  *
100  * Thread-safety
101  * =============
102  *
103  * Any access at all requires the owning 'dp_netdev''s queue_rwlock and
104  * its own mutex. */
105 struct dp_netdev_queue {
106     struct ovs_mutex mutex;
107     struct seq *seq;      /* Incremented whenever a packet is queued. */
108     struct dp_netdev_upcall upcalls[MAX_QUEUE_LEN] OVS_GUARDED;
109     unsigned int head OVS_GUARDED;
110     unsigned int tail OVS_GUARDED;
111 };
112
113 /* Datapath based on the network device interface from netdev.h.
114  *
115  *
116  * Thread-safety
117  * =============
118  *
119  * Some members, marked 'const', are immutable.  Accessing other members
120  * requires synchronization, as noted in more detail below.
121  *
122  * Acquisition order is, from outermost to innermost:
123  *
124  *    dp_netdev_mutex (global)
125  *    port_mutex
126  *    flow_mutex
127  *    cls.rwlock
128  *    queue_rwlock
129  */
130 struct dp_netdev {
131     const struct dpif_class *const class;
132     const char *const name;
133     struct ovs_refcount ref_cnt;
134     atomic_flag destroyed;
135
136     /* Flows.
137      *
138      * Readers of 'cls' and 'flow_table' must take a 'cls->rwlock' read lock.
139      *
140      * Writers of 'cls' and 'flow_table' must take the 'flow_mutex' and then
141      * the 'cls->rwlock' write lock.  (The outer 'flow_mutex' allows writers to
142      * atomically perform multiple operations on 'cls' and 'flow_table'.)
143      */
144     struct ovs_mutex flow_mutex;
145     struct classifier cls;      /* Classifier.  Protected by cls.rwlock. */
146     struct hmap flow_table OVS_GUARDED; /* Flow table. */
147
148     /* Queues.
149      *
150      * 'queue_rwlock' protects the modification of 'handler_queues' and
151      * 'n_handlers'.  The queue elements are protected by its
152      * 'handler_queues''s mutex. */
153     struct fat_rwlock queue_rwlock;
154     struct dp_netdev_queue *handler_queues;
155     uint32_t n_handlers;
156
157     /* Statistics.
158      *
159      * ovsthread_stats is internally synchronized. */
160     struct ovsthread_stats stats; /* Contains 'struct dp_netdev_stats *'. */
161
162     /* Ports.
163      *
164      * Protected by RCU.  Take the mutex to add or remove ports. */
165     struct ovs_mutex port_mutex;
166     struct cmap ports;
167     struct seq *port_seq;       /* Incremented whenever a port changes. */
168
169     /* Forwarding threads. */
170     struct latch exit_latch;
171     struct pmd_thread *pmd_threads;
172     size_t n_pmd_threads;
173     int pmd_count;
174 };
175
176 static struct dp_netdev_port *dp_netdev_lookup_port(const struct dp_netdev *dp,
177                                                     odp_port_t);
178
179 enum dp_stat_type {
180     DP_STAT_HIT,                /* Packets that matched in the flow table. */
181     DP_STAT_MISS,               /* Packets that did not match. */
182     DP_STAT_LOST,               /* Packets not passed up to the client. */
183     DP_N_STATS
184 };
185
186 /* Contained by struct dp_netdev's 'stats' member.  */
187 struct dp_netdev_stats {
188     struct ovs_mutex mutex;          /* Protects 'n'. */
189
190     /* Indexed by DP_STAT_*, protected by 'mutex'. */
191     unsigned long long int n[DP_N_STATS] OVS_GUARDED;
192 };
193
194
195 /* A port in a netdev-based datapath. */
196 struct dp_netdev_port {
197     struct cmap_node node;      /* Node in dp_netdev's 'ports'. */
198     odp_port_t port_no;
199     struct netdev *netdev;
200     struct netdev_saved_flags *sf;
201     struct netdev_rxq **rxq;
202     struct ovs_refcount ref_cnt;
203     char *type;                 /* Port type as requested by user. */
204 };
205
206 /* A flow in dp_netdev's 'flow_table'.
207  *
208  *
209  * Thread-safety
210  * =============
211  *
212  * Except near the beginning or ending of its lifespan, rule 'rule' belongs to
213  * its dp_netdev's classifier.  The text below calls this classifier 'cls'.
214  *
215  * Motivation
216  * ----------
217  *
218  * The thread safety rules described here for "struct dp_netdev_flow" are
219  * motivated by two goals:
220  *
221  *    - Prevent threads that read members of "struct dp_netdev_flow" from
222  *      reading bad data due to changes by some thread concurrently modifying
223  *      those members.
224  *
225  *    - Prevent two threads making changes to members of a given "struct
226  *      dp_netdev_flow" from interfering with each other.
227  *
228  *
229  * Rules
230  * -----
231  *
232  * A flow 'flow' may be accessed without a risk of being freed by code that
233  * holds a read-lock or write-lock on 'cls->rwlock' or that owns a reference to
234  * 'flow->ref_cnt' (or both).  Code that needs to hold onto a flow for a while
235  * should take 'cls->rwlock', find the flow it needs, increment 'flow->ref_cnt'
236  * with dpif_netdev_flow_ref(), and drop 'cls->rwlock'.
237  *
238  * 'flow->ref_cnt' protects 'flow' from being freed.  It doesn't protect the
239  * flow from being deleted from 'cls' (that's 'cls->rwlock') and it doesn't
240  * protect members of 'flow' from modification.
241  *
242  * Some members, marked 'const', are immutable.  Accessing other members
243  * requires synchronization, as noted in more detail below.
244  */
245 struct dp_netdev_flow {
246     /* Packet classification. */
247     const struct cls_rule cr;   /* In owning dp_netdev's 'cls'. */
248
249     /* Hash table index by unmasked flow. */
250     const struct hmap_node node; /* In owning dp_netdev's 'flow_table'. */
251     const struct flow flow;      /* The flow that created this entry. */
252
253     /* Statistics.
254      *
255      * Reading or writing these members requires 'mutex'. */
256     struct ovsthread_stats stats; /* Contains "struct dp_netdev_flow_stats". */
257
258     /* Actions. */
259     OVSRCU_TYPE(struct dp_netdev_actions *) actions;
260 };
261
262 static void dp_netdev_flow_free(struct dp_netdev_flow *);
263
264 /* Contained by struct dp_netdev_flow's 'stats' member.  */
265 struct dp_netdev_flow_stats {
266     struct ovs_mutex mutex;         /* Guards all the other members. */
267
268     long long int used OVS_GUARDED; /* Last used time, in monotonic msecs. */
269     long long int packet_count OVS_GUARDED; /* Number of packets matched. */
270     long long int byte_count OVS_GUARDED;   /* Number of bytes matched. */
271     uint16_t tcp_flags OVS_GUARDED; /* Bitwise-OR of seen tcp_flags values. */
272 };
273
274 /* A set of datapath actions within a "struct dp_netdev_flow".
275  *
276  *
277  * Thread-safety
278  * =============
279  *
280  * A struct dp_netdev_actions 'actions' is protected with RCU. */
281 struct dp_netdev_actions {
282     /* These members are immutable: they do not change during the struct's
283      * lifetime.  */
284     struct nlattr *actions;     /* Sequence of OVS_ACTION_ATTR_* attributes. */
285     unsigned int size;          /* Size of 'actions', in bytes. */
286 };
287
288 struct dp_netdev_actions *dp_netdev_actions_create(const struct nlattr *,
289                                                    size_t);
290 struct dp_netdev_actions *dp_netdev_flow_get_actions(
291     const struct dp_netdev_flow *);
292 static void dp_netdev_actions_free(struct dp_netdev_actions *);
293
294 /* PMD: Poll modes drivers.  PMD accesses devices via polling to eliminate
295  * the performance overhead of interrupt processing.  Therefore netdev can
296  * not implement rx-wait for these devices.  dpif-netdev needs to poll
297  * these device to check for recv buffer.  pmd-thread does polling for
298  * devices assigned to itself thread.
299  *
300  * DPDK used PMD for accessing NIC.
301  *
302  * A thread that receives packets from PMD ports, looks them up in the flow
303  * table, and executes the actions it finds.
304  **/
305 struct pmd_thread {
306     struct dp_netdev *dp;
307     pthread_t thread;
308     int id;
309     atomic_uint change_seq;
310 };
311
312 /* Interface to netdev-based datapath. */
313 struct dpif_netdev {
314     struct dpif dpif;
315     struct dp_netdev *dp;
316     uint64_t last_port_seq;
317 };
318
319 static int get_port_by_number(struct dp_netdev *dp, odp_port_t port_no,
320                               struct dp_netdev_port **portp);
321 static int get_port_by_name(struct dp_netdev *dp, const char *devname,
322                             struct dp_netdev_port **portp);
323 static void dp_netdev_free(struct dp_netdev *)
324     OVS_REQUIRES(dp_netdev_mutex);
325 static void dp_netdev_flow_flush(struct dp_netdev *);
326 static int do_add_port(struct dp_netdev *dp, const char *devname,
327                        const char *type, odp_port_t port_no)
328     OVS_REQUIRES(dp->port_mutex);
329 static void do_del_port(struct dp_netdev *dp, struct dp_netdev_port *)
330     OVS_REQUIRES(dp->port_mutex);
331 static void dp_netdev_destroy_all_queues(struct dp_netdev *dp)
332     OVS_REQ_WRLOCK(dp->queue_rwlock);
333 static int dpif_netdev_open(const struct dpif_class *, const char *name,
334                             bool create, struct dpif **);
335 static int dp_netdev_output_userspace(struct dp_netdev *dp, struct ofpbuf *,
336                                       int queue_no, int type,
337                                       const struct miniflow *,
338                                       const struct nlattr *userdata);
339 static void dp_netdev_execute_actions(struct dp_netdev *dp,
340                                       const struct miniflow *,
341                                       struct dpif_packet *, bool may_steal,
342                                       struct pkt_metadata *,
343                                       const struct nlattr *actions,
344                                       size_t actions_len);
345 static void dp_netdev_port_input(struct dp_netdev *dp,
346                                  struct dpif_packet *packet,
347                                  struct pkt_metadata *);
348
349 static void dp_netdev_set_pmd_threads(struct dp_netdev *, int n);
350
351 static struct dpif_netdev *
352 dpif_netdev_cast(const struct dpif *dpif)
353 {
354     ovs_assert(dpif->dpif_class->open == dpif_netdev_open);
355     return CONTAINER_OF(dpif, struct dpif_netdev, dpif);
356 }
357
358 static struct dp_netdev *
359 get_dp_netdev(const struct dpif *dpif)
360 {
361     return dpif_netdev_cast(dpif)->dp;
362 }
363
364 static int
365 dpif_netdev_enumerate(struct sset *all_dps,
366                       const struct dpif_class *dpif_class)
367 {
368     struct shash_node *node;
369
370     ovs_mutex_lock(&dp_netdev_mutex);
371     SHASH_FOR_EACH(node, &dp_netdevs) {
372         struct dp_netdev *dp = node->data;
373         if (dpif_class != dp->class) {
374             /* 'dp_netdevs' contains both "netdev" and "dummy" dpifs.
375              * If the class doesn't match, skip this dpif. */
376              continue;
377         }
378         sset_add(all_dps, node->name);
379     }
380     ovs_mutex_unlock(&dp_netdev_mutex);
381
382     return 0;
383 }
384
385 static bool
386 dpif_netdev_class_is_dummy(const struct dpif_class *class)
387 {
388     return class != &dpif_netdev_class;
389 }
390
391 static const char *
392 dpif_netdev_port_open_type(const struct dpif_class *class, const char *type)
393 {
394     return strcmp(type, "internal") ? type
395                   : dpif_netdev_class_is_dummy(class) ? "dummy"
396                   : "tap";
397 }
398
399 static struct dpif *
400 create_dpif_netdev(struct dp_netdev *dp)
401 {
402     uint16_t netflow_id = hash_string(dp->name, 0);
403     struct dpif_netdev *dpif;
404
405     ovs_refcount_ref(&dp->ref_cnt);
406
407     dpif = xmalloc(sizeof *dpif);
408     dpif_init(&dpif->dpif, dp->class, dp->name, netflow_id >> 8, netflow_id);
409     dpif->dp = dp;
410     dpif->last_port_seq = seq_read(dp->port_seq);
411
412     return &dpif->dpif;
413 }
414
415 /* Choose an unused, non-zero port number and return it on success.
416  * Return ODPP_NONE on failure. */
417 static odp_port_t
418 choose_port(struct dp_netdev *dp, const char *name)
419     OVS_REQUIRES(dp->port_mutex)
420 {
421     uint32_t port_no;
422
423     if (dp->class != &dpif_netdev_class) {
424         const char *p;
425         int start_no = 0;
426
427         /* If the port name begins with "br", start the number search at
428          * 100 to make writing tests easier. */
429         if (!strncmp(name, "br", 2)) {
430             start_no = 100;
431         }
432
433         /* If the port name contains a number, try to assign that port number.
434          * This can make writing unit tests easier because port numbers are
435          * predictable. */
436         for (p = name; *p != '\0'; p++) {
437             if (isdigit((unsigned char) *p)) {
438                 port_no = start_no + strtol(p, NULL, 10);
439                 if (port_no > 0 && port_no != odp_to_u32(ODPP_NONE)
440                     && !dp_netdev_lookup_port(dp, u32_to_odp(port_no))) {
441                     return u32_to_odp(port_no);
442                 }
443                 break;
444             }
445         }
446     }
447
448     for (port_no = 1; port_no <= UINT16_MAX; port_no++) {
449         if (!dp_netdev_lookup_port(dp, u32_to_odp(port_no))) {
450             return u32_to_odp(port_no);
451         }
452     }
453
454     return ODPP_NONE;
455 }
456
457 static int
458 create_dp_netdev(const char *name, const struct dpif_class *class,
459                  struct dp_netdev **dpp)
460     OVS_REQUIRES(dp_netdev_mutex)
461 {
462     struct dp_netdev *dp;
463     int error;
464
465     dp = xzalloc(sizeof *dp);
466     shash_add(&dp_netdevs, name, dp);
467
468     *CONST_CAST(const struct dpif_class **, &dp->class) = class;
469     *CONST_CAST(const char **, &dp->name) = xstrdup(name);
470     ovs_refcount_init(&dp->ref_cnt);
471     atomic_flag_clear(&dp->destroyed);
472
473     ovs_mutex_init(&dp->flow_mutex);
474     classifier_init(&dp->cls, NULL);
475     hmap_init(&dp->flow_table);
476
477     fat_rwlock_init(&dp->queue_rwlock);
478
479     ovsthread_stats_init(&dp->stats);
480
481     ovs_mutex_init(&dp->port_mutex);
482     cmap_init(&dp->ports);
483     dp->port_seq = seq_create();
484     latch_init(&dp->exit_latch);
485
486     ovs_mutex_lock(&dp->port_mutex);
487     error = do_add_port(dp, name, "internal", ODPP_LOCAL);
488     ovs_mutex_unlock(&dp->port_mutex);
489     if (error) {
490         dp_netdev_free(dp);
491         return error;
492     }
493
494     *dpp = dp;
495     return 0;
496 }
497
498 static int
499 dpif_netdev_open(const struct dpif_class *class, const char *name,
500                  bool create, struct dpif **dpifp)
501 {
502     struct dp_netdev *dp;
503     int error;
504
505     ovs_mutex_lock(&dp_netdev_mutex);
506     dp = shash_find_data(&dp_netdevs, name);
507     if (!dp) {
508         error = create ? create_dp_netdev(name, class, &dp) : ENODEV;
509     } else {
510         error = (dp->class != class ? EINVAL
511                  : create ? EEXIST
512                  : 0);
513     }
514     if (!error) {
515         *dpifp = create_dpif_netdev(dp);
516     }
517     ovs_mutex_unlock(&dp_netdev_mutex);
518
519     return error;
520 }
521
522 static void
523 dp_netdev_purge_queues(struct dp_netdev *dp)
524     OVS_REQ_WRLOCK(dp->queue_rwlock)
525 {
526     int i;
527
528     for (i = 0; i < dp->n_handlers; i++) {
529         struct dp_netdev_queue *q = &dp->handler_queues[i];
530
531         ovs_mutex_lock(&q->mutex);
532         while (q->tail != q->head) {
533             struct dp_netdev_upcall *u = &q->upcalls[q->tail++ & QUEUE_MASK];
534             ofpbuf_uninit(&u->upcall.packet);
535             ofpbuf_uninit(&u->buf);
536         }
537         ovs_mutex_unlock(&q->mutex);
538     }
539 }
540
541 /* Requires dp_netdev_mutex so that we can't get a new reference to 'dp'
542  * through the 'dp_netdevs' shash while freeing 'dp'. */
543 static void
544 dp_netdev_free(struct dp_netdev *dp)
545     OVS_REQUIRES(dp_netdev_mutex)
546 {
547     struct dp_netdev_port *port;
548     struct dp_netdev_stats *bucket;
549     int i;
550
551     shash_find_and_delete(&dp_netdevs, dp->name);
552
553     dp_netdev_set_pmd_threads(dp, 0);
554     free(dp->pmd_threads);
555
556     dp_netdev_flow_flush(dp);
557     ovs_mutex_lock(&dp->port_mutex);
558     CMAP_FOR_EACH (port, node, &dp->ports) {
559         do_del_port(dp, port);
560     }
561     ovs_mutex_unlock(&dp->port_mutex);
562
563     OVSTHREAD_STATS_FOR_EACH_BUCKET (bucket, i, &dp->stats) {
564         ovs_mutex_destroy(&bucket->mutex);
565         free_cacheline(bucket);
566     }
567     ovsthread_stats_destroy(&dp->stats);
568
569     fat_rwlock_wrlock(&dp->queue_rwlock);
570     dp_netdev_destroy_all_queues(dp);
571     fat_rwlock_unlock(&dp->queue_rwlock);
572
573     fat_rwlock_destroy(&dp->queue_rwlock);
574
575     classifier_destroy(&dp->cls);
576     hmap_destroy(&dp->flow_table);
577     ovs_mutex_destroy(&dp->flow_mutex);
578     seq_destroy(dp->port_seq);
579     cmap_destroy(&dp->ports);
580     latch_destroy(&dp->exit_latch);
581     free(CONST_CAST(char *, dp->name));
582     free(dp);
583 }
584
585 static void
586 dp_netdev_unref(struct dp_netdev *dp)
587 {
588     if (dp) {
589         /* Take dp_netdev_mutex so that, if dp->ref_cnt falls to zero, we can't
590          * get a new reference to 'dp' through the 'dp_netdevs' shash. */
591         ovs_mutex_lock(&dp_netdev_mutex);
592         if (ovs_refcount_unref(&dp->ref_cnt) == 1) {
593             dp_netdev_free(dp);
594         }
595         ovs_mutex_unlock(&dp_netdev_mutex);
596     }
597 }
598
599 static void
600 dpif_netdev_close(struct dpif *dpif)
601 {
602     struct dp_netdev *dp = get_dp_netdev(dpif);
603
604     dp_netdev_unref(dp);
605     free(dpif);
606 }
607
608 static int
609 dpif_netdev_destroy(struct dpif *dpif)
610 {
611     struct dp_netdev *dp = get_dp_netdev(dpif);
612
613     if (!atomic_flag_test_and_set(&dp->destroyed)) {
614         if (ovs_refcount_unref(&dp->ref_cnt) == 1) {
615             /* Can't happen: 'dpif' still owns a reference to 'dp'. */
616             OVS_NOT_REACHED();
617         }
618     }
619
620     return 0;
621 }
622
623 static int
624 dpif_netdev_get_stats(const struct dpif *dpif, struct dpif_dp_stats *stats)
625 {
626     struct dp_netdev *dp = get_dp_netdev(dpif);
627     struct dp_netdev_stats *bucket;
628     size_t i;
629
630     fat_rwlock_rdlock(&dp->cls.rwlock);
631     stats->n_flows = hmap_count(&dp->flow_table);
632     fat_rwlock_unlock(&dp->cls.rwlock);
633
634     stats->n_hit = stats->n_missed = stats->n_lost = 0;
635     OVSTHREAD_STATS_FOR_EACH_BUCKET (bucket, i, &dp->stats) {
636         ovs_mutex_lock(&bucket->mutex);
637         stats->n_hit += bucket->n[DP_STAT_HIT];
638         stats->n_missed += bucket->n[DP_STAT_MISS];
639         stats->n_lost += bucket->n[DP_STAT_LOST];
640         ovs_mutex_unlock(&bucket->mutex);
641     }
642     stats->n_masks = UINT32_MAX;
643     stats->n_mask_hit = UINT64_MAX;
644
645     return 0;
646 }
647
648 static void
649 dp_netdev_reload_pmd_threads(struct dp_netdev *dp)
650 {
651     int i;
652
653     for (i = 0; i < dp->n_pmd_threads; i++) {
654         struct pmd_thread *f = &dp->pmd_threads[i];
655         int id;
656
657         atomic_add(&f->change_seq, 1, &id);
658    }
659 }
660
661 static uint32_t
662 hash_port_no(odp_port_t port_no)
663 {
664     return hash_int(odp_to_u32(port_no), 0);
665 }
666
667 static int
668 do_add_port(struct dp_netdev *dp, const char *devname, const char *type,
669             odp_port_t port_no)
670     OVS_REQUIRES(dp->port_mutex)
671 {
672     struct netdev_saved_flags *sf;
673     struct dp_netdev_port *port;
674     struct netdev *netdev;
675     enum netdev_flags flags;
676     const char *open_type;
677     int error;
678     int i;
679
680     /* XXX reject devices already in some dp_netdev. */
681
682     /* Open and validate network device. */
683     open_type = dpif_netdev_port_open_type(dp->class, type);
684     error = netdev_open(devname, open_type, &netdev);
685     if (error) {
686         return error;
687     }
688     /* XXX reject non-Ethernet devices */
689
690     netdev_get_flags(netdev, &flags);
691     if (flags & NETDEV_LOOPBACK) {
692         VLOG_ERR("%s: cannot add a loopback device", devname);
693         netdev_close(netdev);
694         return EINVAL;
695     }
696
697     port = xzalloc(sizeof *port);
698     port->port_no = port_no;
699     port->netdev = netdev;
700     port->rxq = xmalloc(sizeof *port->rxq * netdev_n_rxq(netdev));
701     port->type = xstrdup(type);
702     for (i = 0; i < netdev_n_rxq(netdev); i++) {
703         error = netdev_rxq_open(netdev, &port->rxq[i], i);
704         if (error
705             && !(error == EOPNOTSUPP && dpif_netdev_class_is_dummy(dp->class))) {
706             VLOG_ERR("%s: cannot receive packets on this network device (%s)",
707                      devname, ovs_strerror(errno));
708             netdev_close(netdev);
709             return error;
710         }
711     }
712
713     error = netdev_turn_flags_on(netdev, NETDEV_PROMISC, &sf);
714     if (error) {
715         for (i = 0; i < netdev_n_rxq(netdev); i++) {
716             netdev_rxq_close(port->rxq[i]);
717         }
718         netdev_close(netdev);
719         free(port->rxq);
720         free(port);
721         return error;
722     }
723     port->sf = sf;
724
725     if (netdev_is_pmd(netdev)) {
726         dp->pmd_count++;
727         dp_netdev_set_pmd_threads(dp, NR_THREADS);
728         dp_netdev_reload_pmd_threads(dp);
729     }
730     ovs_refcount_init(&port->ref_cnt);
731
732     cmap_insert(&dp->ports, &port->node, hash_port_no(port_no));
733     seq_change(dp->port_seq);
734
735     return 0;
736 }
737
738 static int
739 dpif_netdev_port_add(struct dpif *dpif, struct netdev *netdev,
740                      odp_port_t *port_nop)
741 {
742     struct dp_netdev *dp = get_dp_netdev(dpif);
743     char namebuf[NETDEV_VPORT_NAME_BUFSIZE];
744     const char *dpif_port;
745     odp_port_t port_no;
746     int error;
747
748     ovs_mutex_lock(&dp->port_mutex);
749     dpif_port = netdev_vport_get_dpif_port(netdev, namebuf, sizeof namebuf);
750     if (*port_nop != ODPP_NONE) {
751         port_no = *port_nop;
752         error = dp_netdev_lookup_port(dp, *port_nop) ? EBUSY : 0;
753     } else {
754         port_no = choose_port(dp, dpif_port);
755         error = port_no == ODPP_NONE ? EFBIG : 0;
756     }
757     if (!error) {
758         *port_nop = port_no;
759         error = do_add_port(dp, dpif_port, netdev_get_type(netdev), port_no);
760     }
761     ovs_mutex_unlock(&dp->port_mutex);
762
763     return error;
764 }
765
766 static int
767 dpif_netdev_port_del(struct dpif *dpif, odp_port_t port_no)
768 {
769     struct dp_netdev *dp = get_dp_netdev(dpif);
770     int error;
771
772     ovs_mutex_lock(&dp->port_mutex);
773     if (port_no == ODPP_LOCAL) {
774         error = EINVAL;
775     } else {
776         struct dp_netdev_port *port;
777
778         error = get_port_by_number(dp, port_no, &port);
779         if (!error) {
780             do_del_port(dp, port);
781         }
782     }
783     ovs_mutex_unlock(&dp->port_mutex);
784
785     return error;
786 }
787
788 static bool
789 is_valid_port_number(odp_port_t port_no)
790 {
791     return port_no != ODPP_NONE;
792 }
793
794 static struct dp_netdev_port *
795 dp_netdev_lookup_port(const struct dp_netdev *dp, odp_port_t port_no)
796 {
797     struct dp_netdev_port *port;
798
799     CMAP_FOR_EACH_WITH_HASH (port, node, hash_port_no(port_no), &dp->ports) {
800         if (port->port_no == port_no) {
801             return port;
802         }
803     }
804     return NULL;
805 }
806
807 static int
808 get_port_by_number(struct dp_netdev *dp,
809                    odp_port_t port_no, struct dp_netdev_port **portp)
810 {
811     if (!is_valid_port_number(port_no)) {
812         *portp = NULL;
813         return EINVAL;
814     } else {
815         *portp = dp_netdev_lookup_port(dp, port_no);
816         return *portp ? 0 : ENOENT;
817     }
818 }
819
820 static void
821 port_ref(struct dp_netdev_port *port)
822 {
823     if (port) {
824         ovs_refcount_ref(&port->ref_cnt);
825     }
826 }
827
828 static void
829 port_destroy__(struct dp_netdev_port *port)
830 {
831     int n_rxq = netdev_n_rxq(port->netdev);
832     int i;
833
834     netdev_close(port->netdev);
835     netdev_restore_flags(port->sf);
836
837     for (i = 0; i < n_rxq; i++) {
838         netdev_rxq_close(port->rxq[i]);
839     }
840     free(port->rxq);
841     free(port->type);
842     free(port);
843 }
844
845 static void
846 port_unref(struct dp_netdev_port *port)
847 {
848     if (port && ovs_refcount_unref(&port->ref_cnt) == 1) {
849         ovsrcu_postpone(port_destroy__, port);
850     }
851 }
852
853 static int
854 get_port_by_name(struct dp_netdev *dp,
855                  const char *devname, struct dp_netdev_port **portp)
856     OVS_REQUIRES(dp->port_mutex)
857 {
858     struct dp_netdev_port *port;
859
860     CMAP_FOR_EACH (port, node, &dp->ports) {
861         if (!strcmp(netdev_get_name(port->netdev), devname)) {
862             *portp = port;
863             return 0;
864         }
865     }
866     return ENOENT;
867 }
868
869 static void
870 do_del_port(struct dp_netdev *dp, struct dp_netdev_port *port)
871     OVS_REQUIRES(dp->port_mutex)
872 {
873     cmap_remove(&dp->ports, &port->node, hash_odp_port(port->port_no));
874     seq_change(dp->port_seq);
875     if (netdev_is_pmd(port->netdev)) {
876         dp_netdev_reload_pmd_threads(dp);
877     }
878
879     port_unref(port);
880 }
881
882 static void
883 answer_port_query(const struct dp_netdev_port *port,
884                   struct dpif_port *dpif_port)
885 {
886     dpif_port->name = xstrdup(netdev_get_name(port->netdev));
887     dpif_port->type = xstrdup(port->type);
888     dpif_port->port_no = port->port_no;
889 }
890
891 static int
892 dpif_netdev_port_query_by_number(const struct dpif *dpif, odp_port_t port_no,
893                                  struct dpif_port *dpif_port)
894 {
895     struct dp_netdev *dp = get_dp_netdev(dpif);
896     struct dp_netdev_port *port;
897     int error;
898
899     error = get_port_by_number(dp, port_no, &port);
900     if (!error && dpif_port) {
901         answer_port_query(port, dpif_port);
902     }
903
904     return error;
905 }
906
907 static int
908 dpif_netdev_port_query_by_name(const struct dpif *dpif, const char *devname,
909                                struct dpif_port *dpif_port)
910 {
911     struct dp_netdev *dp = get_dp_netdev(dpif);
912     struct dp_netdev_port *port;
913     int error;
914
915     ovs_mutex_lock(&dp->port_mutex);
916     error = get_port_by_name(dp, devname, &port);
917     if (!error && dpif_port) {
918         answer_port_query(port, dpif_port);
919     }
920     ovs_mutex_unlock(&dp->port_mutex);
921
922     return error;
923 }
924
925 static void
926 dp_netdev_flow_free(struct dp_netdev_flow *flow)
927 {
928     struct dp_netdev_flow_stats *bucket;
929     size_t i;
930
931     OVSTHREAD_STATS_FOR_EACH_BUCKET (bucket, i, &flow->stats) {
932         ovs_mutex_destroy(&bucket->mutex);
933         free_cacheline(bucket);
934     }
935     ovsthread_stats_destroy(&flow->stats);
936
937     cls_rule_destroy(CONST_CAST(struct cls_rule *, &flow->cr));
938     dp_netdev_actions_free(dp_netdev_flow_get_actions(flow));
939     free(flow);
940 }
941
942 static void
943 dp_netdev_remove_flow(struct dp_netdev *dp, struct dp_netdev_flow *flow)
944     OVS_REQ_WRLOCK(dp->cls.rwlock)
945     OVS_REQUIRES(dp->flow_mutex)
946 {
947     struct cls_rule *cr = CONST_CAST(struct cls_rule *, &flow->cr);
948     struct hmap_node *node = CONST_CAST(struct hmap_node *, &flow->node);
949
950     classifier_remove(&dp->cls, cr);
951     hmap_remove(&dp->flow_table, node);
952     ovsrcu_postpone(dp_netdev_flow_free, flow);
953 }
954
955 static void
956 dp_netdev_flow_flush(struct dp_netdev *dp)
957 {
958     struct dp_netdev_flow *netdev_flow, *next;
959
960     ovs_mutex_lock(&dp->flow_mutex);
961     fat_rwlock_wrlock(&dp->cls.rwlock);
962     HMAP_FOR_EACH_SAFE (netdev_flow, next, node, &dp->flow_table) {
963         dp_netdev_remove_flow(dp, netdev_flow);
964     }
965     fat_rwlock_unlock(&dp->cls.rwlock);
966     ovs_mutex_unlock(&dp->flow_mutex);
967 }
968
969 static int
970 dpif_netdev_flow_flush(struct dpif *dpif)
971 {
972     struct dp_netdev *dp = get_dp_netdev(dpif);
973
974     dp_netdev_flow_flush(dp);
975     return 0;
976 }
977
978 struct dp_netdev_port_state {
979     struct cmap_position position;
980     char *name;
981 };
982
983 static int
984 dpif_netdev_port_dump_start(const struct dpif *dpif OVS_UNUSED, void **statep)
985 {
986     *statep = xzalloc(sizeof(struct dp_netdev_port_state));
987     return 0;
988 }
989
990 static int
991 dpif_netdev_port_dump_next(const struct dpif *dpif, void *state_,
992                            struct dpif_port *dpif_port)
993 {
994     struct dp_netdev_port_state *state = state_;
995     struct dp_netdev *dp = get_dp_netdev(dpif);
996     struct cmap_node *node;
997     int retval;
998
999     node = cmap_next_position(&dp->ports, &state->position);
1000     if (node) {
1001         struct dp_netdev_port *port;
1002
1003         port = CONTAINER_OF(node, struct dp_netdev_port, node);
1004
1005         free(state->name);
1006         state->name = xstrdup(netdev_get_name(port->netdev));
1007         dpif_port->name = state->name;
1008         dpif_port->type = port->type;
1009         dpif_port->port_no = port->port_no;
1010
1011         retval = 0;
1012     } else {
1013         retval = EOF;
1014     }
1015
1016     return retval;
1017 }
1018
1019 static int
1020 dpif_netdev_port_dump_done(const struct dpif *dpif OVS_UNUSED, void *state_)
1021 {
1022     struct dp_netdev_port_state *state = state_;
1023     free(state->name);
1024     free(state);
1025     return 0;
1026 }
1027
1028 static int
1029 dpif_netdev_port_poll(const struct dpif *dpif_, char **devnamep OVS_UNUSED)
1030 {
1031     struct dpif_netdev *dpif = dpif_netdev_cast(dpif_);
1032     uint64_t new_port_seq;
1033     int error;
1034
1035     new_port_seq = seq_read(dpif->dp->port_seq);
1036     if (dpif->last_port_seq != new_port_seq) {
1037         dpif->last_port_seq = new_port_seq;
1038         error = ENOBUFS;
1039     } else {
1040         error = EAGAIN;
1041     }
1042
1043     return error;
1044 }
1045
1046 static void
1047 dpif_netdev_port_poll_wait(const struct dpif *dpif_)
1048 {
1049     struct dpif_netdev *dpif = dpif_netdev_cast(dpif_);
1050
1051     seq_wait(dpif->dp->port_seq, dpif->last_port_seq);
1052 }
1053
1054 static struct dp_netdev_flow *
1055 dp_netdev_flow_cast(const struct cls_rule *cr)
1056 {
1057     return cr ? CONTAINER_OF(cr, struct dp_netdev_flow, cr) : NULL;
1058 }
1059
1060 static struct dp_netdev_flow *
1061 dp_netdev_lookup_flow(const struct dp_netdev *dp, const struct miniflow *key)
1062     OVS_EXCLUDED(dp->cls.rwlock)
1063 {
1064     struct dp_netdev_flow *netdev_flow;
1065     struct cls_rule *rule;
1066
1067     fat_rwlock_rdlock(&dp->cls.rwlock);
1068     rule = classifier_lookup_miniflow_first(&dp->cls, key);
1069     netdev_flow = dp_netdev_flow_cast(rule);
1070     fat_rwlock_unlock(&dp->cls.rwlock);
1071
1072     return netdev_flow;
1073 }
1074
1075 static struct dp_netdev_flow *
1076 dp_netdev_find_flow(const struct dp_netdev *dp, const struct flow *flow)
1077     OVS_REQ_RDLOCK(dp->cls.rwlock)
1078 {
1079     struct dp_netdev_flow *netdev_flow;
1080
1081     HMAP_FOR_EACH_WITH_HASH (netdev_flow, node, flow_hash(flow, 0),
1082                              &dp->flow_table) {
1083         if (flow_equal(&netdev_flow->flow, flow)) {
1084             return netdev_flow;
1085         }
1086     }
1087
1088     return NULL;
1089 }
1090
1091 static void
1092 get_dpif_flow_stats(struct dp_netdev_flow *netdev_flow,
1093                     struct dpif_flow_stats *stats)
1094 {
1095     struct dp_netdev_flow_stats *bucket;
1096     size_t i;
1097
1098     memset(stats, 0, sizeof *stats);
1099     OVSTHREAD_STATS_FOR_EACH_BUCKET (bucket, i, &netdev_flow->stats) {
1100         ovs_mutex_lock(&bucket->mutex);
1101         stats->n_packets += bucket->packet_count;
1102         stats->n_bytes += bucket->byte_count;
1103         stats->used = MAX(stats->used, bucket->used);
1104         stats->tcp_flags |= bucket->tcp_flags;
1105         ovs_mutex_unlock(&bucket->mutex);
1106     }
1107 }
1108
1109 static int
1110 dpif_netdev_mask_from_nlattrs(const struct nlattr *key, uint32_t key_len,
1111                               const struct nlattr *mask_key,
1112                               uint32_t mask_key_len, const struct flow *flow,
1113                               struct flow *mask)
1114 {
1115     if (mask_key_len) {
1116         enum odp_key_fitness fitness;
1117
1118         fitness = odp_flow_key_to_mask(mask_key, mask_key_len, mask, flow);
1119         if (fitness) {
1120             /* This should not happen: it indicates that
1121              * odp_flow_key_from_mask() and odp_flow_key_to_mask()
1122              * disagree on the acceptable form of a mask.  Log the problem
1123              * as an error, with enough details to enable debugging. */
1124             static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
1125
1126             if (!VLOG_DROP_ERR(&rl)) {
1127                 struct ds s;
1128
1129                 ds_init(&s);
1130                 odp_flow_format(key, key_len, mask_key, mask_key_len, NULL, &s,
1131                                 true);
1132                 VLOG_ERR("internal error parsing flow mask %s (%s)",
1133                          ds_cstr(&s), odp_key_fitness_to_string(fitness));
1134                 ds_destroy(&s);
1135             }
1136
1137             return EINVAL;
1138         }
1139     } else {
1140         enum mf_field_id id;
1141         /* No mask key, unwildcard everything except fields whose
1142          * prerequisities are not met. */
1143         memset(mask, 0x0, sizeof *mask);
1144
1145         for (id = 0; id < MFF_N_IDS; ++id) {
1146             /* Skip registers and metadata. */
1147             if (!(id >= MFF_REG0 && id < MFF_REG0 + FLOW_N_REGS)
1148                 && id != MFF_METADATA) {
1149                 const struct mf_field *mf = mf_from_id(id);
1150                 if (mf_are_prereqs_ok(mf, flow)) {
1151                     mf_mask_field(mf, mask);
1152                 }
1153             }
1154         }
1155     }
1156
1157     /* Force unwildcard the in_port.
1158      *
1159      * We need to do this even in the case where we unwildcard "everything"
1160      * above because "everything" only includes the 16-bit OpenFlow port number
1161      * mask->in_port.ofp_port, which only covers half of the 32-bit datapath
1162      * port number mask->in_port.odp_port. */
1163     mask->in_port.odp_port = u32_to_odp(UINT32_MAX);
1164
1165     return 0;
1166 }
1167
1168 static int
1169 dpif_netdev_flow_from_nlattrs(const struct nlattr *key, uint32_t key_len,
1170                               struct flow *flow)
1171 {
1172     odp_port_t in_port;
1173
1174     if (odp_flow_key_to_flow(key, key_len, flow)) {
1175         /* This should not happen: it indicates that odp_flow_key_from_flow()
1176          * and odp_flow_key_to_flow() disagree on the acceptable form of a
1177          * flow.  Log the problem as an error, with enough details to enable
1178          * debugging. */
1179         static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
1180
1181         if (!VLOG_DROP_ERR(&rl)) {
1182             struct ds s;
1183
1184             ds_init(&s);
1185             odp_flow_format(key, key_len, NULL, 0, NULL, &s, true);
1186             VLOG_ERR("internal error parsing flow key %s", ds_cstr(&s));
1187             ds_destroy(&s);
1188         }
1189
1190         return EINVAL;
1191     }
1192
1193     in_port = flow->in_port.odp_port;
1194     if (!is_valid_port_number(in_port) && in_port != ODPP_NONE) {
1195         return EINVAL;
1196     }
1197
1198     return 0;
1199 }
1200
1201 static int
1202 dpif_netdev_flow_get(const struct dpif *dpif,
1203                      const struct nlattr *nl_key, size_t nl_key_len,
1204                      struct ofpbuf **actionsp, struct dpif_flow_stats *stats)
1205 {
1206     struct dp_netdev *dp = get_dp_netdev(dpif);
1207     struct dp_netdev_flow *netdev_flow;
1208     struct flow key;
1209     int error;
1210
1211     error = dpif_netdev_flow_from_nlattrs(nl_key, nl_key_len, &key);
1212     if (error) {
1213         return error;
1214     }
1215
1216     fat_rwlock_rdlock(&dp->cls.rwlock);
1217     netdev_flow = dp_netdev_find_flow(dp, &key);
1218     fat_rwlock_unlock(&dp->cls.rwlock);
1219
1220     if (netdev_flow) {
1221         if (stats) {
1222             get_dpif_flow_stats(netdev_flow, stats);
1223         }
1224
1225         if (actionsp) {
1226             struct dp_netdev_actions *actions;
1227
1228             actions = dp_netdev_flow_get_actions(netdev_flow);
1229             *actionsp = ofpbuf_clone_data(actions->actions, actions->size);
1230         }
1231      } else {
1232         error = ENOENT;
1233     }
1234
1235     return error;
1236 }
1237
1238 static int
1239 dp_netdev_flow_add(struct dp_netdev *dp, const struct flow *flow,
1240                    const struct flow_wildcards *wc,
1241                    const struct nlattr *actions,
1242                    size_t actions_len)
1243     OVS_REQUIRES(dp->flow_mutex)
1244 {
1245     struct dp_netdev_flow *netdev_flow;
1246     struct match match;
1247
1248     netdev_flow = xzalloc(sizeof *netdev_flow);
1249     *CONST_CAST(struct flow *, &netdev_flow->flow) = *flow;
1250
1251     ovsthread_stats_init(&netdev_flow->stats);
1252
1253     ovsrcu_set(&netdev_flow->actions,
1254                dp_netdev_actions_create(actions, actions_len));
1255
1256     match_init(&match, flow, wc);
1257     cls_rule_init(CONST_CAST(struct cls_rule *, &netdev_flow->cr),
1258                   &match, NETDEV_RULE_PRIORITY);
1259     fat_rwlock_wrlock(&dp->cls.rwlock);
1260     classifier_insert(&dp->cls,
1261                       CONST_CAST(struct cls_rule *, &netdev_flow->cr));
1262     hmap_insert(&dp->flow_table,
1263                 CONST_CAST(struct hmap_node *, &netdev_flow->node),
1264                 flow_hash(flow, 0));
1265     fat_rwlock_unlock(&dp->cls.rwlock);
1266
1267     return 0;
1268 }
1269
1270 static void
1271 clear_stats(struct dp_netdev_flow *netdev_flow)
1272 {
1273     struct dp_netdev_flow_stats *bucket;
1274     size_t i;
1275
1276     OVSTHREAD_STATS_FOR_EACH_BUCKET (bucket, i, &netdev_flow->stats) {
1277         ovs_mutex_lock(&bucket->mutex);
1278         bucket->used = 0;
1279         bucket->packet_count = 0;
1280         bucket->byte_count = 0;
1281         bucket->tcp_flags = 0;
1282         ovs_mutex_unlock(&bucket->mutex);
1283     }
1284 }
1285
1286 static int
1287 dpif_netdev_flow_put(struct dpif *dpif, const struct dpif_flow_put *put)
1288 {
1289     struct dp_netdev *dp = get_dp_netdev(dpif);
1290     struct dp_netdev_flow *netdev_flow;
1291     struct flow flow;
1292     struct miniflow miniflow;
1293     struct flow_wildcards wc;
1294     int error;
1295
1296     error = dpif_netdev_flow_from_nlattrs(put->key, put->key_len, &flow);
1297     if (error) {
1298         return error;
1299     }
1300     error = dpif_netdev_mask_from_nlattrs(put->key, put->key_len,
1301                                           put->mask, put->mask_len,
1302                                           &flow, &wc.masks);
1303     if (error) {
1304         return error;
1305     }
1306     miniflow_init(&miniflow, &flow);
1307
1308     ovs_mutex_lock(&dp->flow_mutex);
1309     netdev_flow = dp_netdev_lookup_flow(dp, &miniflow);
1310     if (!netdev_flow) {
1311         if (put->flags & DPIF_FP_CREATE) {
1312             if (hmap_count(&dp->flow_table) < MAX_FLOWS) {
1313                 if (put->stats) {
1314                     memset(put->stats, 0, sizeof *put->stats);
1315                 }
1316                 error = dp_netdev_flow_add(dp, &flow, &wc, put->actions,
1317                                            put->actions_len);
1318             } else {
1319                 error = EFBIG;
1320             }
1321         } else {
1322             error = ENOENT;
1323         }
1324     } else {
1325         if (put->flags & DPIF_FP_MODIFY
1326             && flow_equal(&flow, &netdev_flow->flow)) {
1327             struct dp_netdev_actions *new_actions;
1328             struct dp_netdev_actions *old_actions;
1329
1330             new_actions = dp_netdev_actions_create(put->actions,
1331                                                    put->actions_len);
1332
1333             old_actions = dp_netdev_flow_get_actions(netdev_flow);
1334             ovsrcu_set(&netdev_flow->actions, new_actions);
1335
1336             if (put->stats) {
1337                 get_dpif_flow_stats(netdev_flow, put->stats);
1338             }
1339             if (put->flags & DPIF_FP_ZERO_STATS) {
1340                 clear_stats(netdev_flow);
1341             }
1342
1343             ovsrcu_postpone(dp_netdev_actions_free, old_actions);
1344         } else if (put->flags & DPIF_FP_CREATE) {
1345             error = EEXIST;
1346         } else {
1347             /* Overlapping flow. */
1348             error = EINVAL;
1349         }
1350     }
1351     ovs_mutex_unlock(&dp->flow_mutex);
1352
1353     return error;
1354 }
1355
1356 static int
1357 dpif_netdev_flow_del(struct dpif *dpif, const struct dpif_flow_del *del)
1358 {
1359     struct dp_netdev *dp = get_dp_netdev(dpif);
1360     struct dp_netdev_flow *netdev_flow;
1361     struct flow key;
1362     int error;
1363
1364     error = dpif_netdev_flow_from_nlattrs(del->key, del->key_len, &key);
1365     if (error) {
1366         return error;
1367     }
1368
1369     ovs_mutex_lock(&dp->flow_mutex);
1370     fat_rwlock_wrlock(&dp->cls.rwlock);
1371     netdev_flow = dp_netdev_find_flow(dp, &key);
1372     if (netdev_flow) {
1373         if (del->stats) {
1374             get_dpif_flow_stats(netdev_flow, del->stats);
1375         }
1376         dp_netdev_remove_flow(dp, netdev_flow);
1377     } else {
1378         error = ENOENT;
1379     }
1380     fat_rwlock_unlock(&dp->cls.rwlock);
1381     ovs_mutex_unlock(&dp->flow_mutex);
1382
1383     return error;
1384 }
1385
1386 struct dpif_netdev_flow_dump {
1387     struct dpif_flow_dump up;
1388     uint32_t bucket;
1389     uint32_t offset;
1390     int status;
1391     struct ovs_mutex mutex;
1392 };
1393
1394 static struct dpif_netdev_flow_dump *
1395 dpif_netdev_flow_dump_cast(struct dpif_flow_dump *dump)
1396 {
1397     return CONTAINER_OF(dump, struct dpif_netdev_flow_dump, up);
1398 }
1399
1400 static struct dpif_flow_dump *
1401 dpif_netdev_flow_dump_create(const struct dpif *dpif_)
1402 {
1403     struct dpif_netdev_flow_dump *dump;
1404
1405     dump = xmalloc(sizeof *dump);
1406     dpif_flow_dump_init(&dump->up, dpif_);
1407     dump->bucket = 0;
1408     dump->offset = 0;
1409     dump->status = 0;
1410     ovs_mutex_init(&dump->mutex);
1411
1412     return &dump->up;
1413 }
1414
1415 static int
1416 dpif_netdev_flow_dump_destroy(struct dpif_flow_dump *dump_)
1417 {
1418     struct dpif_netdev_flow_dump *dump = dpif_netdev_flow_dump_cast(dump_);
1419
1420     ovs_mutex_destroy(&dump->mutex);
1421     free(dump);
1422     return 0;
1423 }
1424
1425 struct dpif_netdev_flow_dump_thread {
1426     struct dpif_flow_dump_thread up;
1427     struct dpif_netdev_flow_dump *dump;
1428     struct odputil_keybuf keybuf;
1429     struct odputil_keybuf maskbuf;
1430 };
1431
1432 static struct dpif_netdev_flow_dump_thread *
1433 dpif_netdev_flow_dump_thread_cast(struct dpif_flow_dump_thread *thread)
1434 {
1435     return CONTAINER_OF(thread, struct dpif_netdev_flow_dump_thread, up);
1436 }
1437
1438 static struct dpif_flow_dump_thread *
1439 dpif_netdev_flow_dump_thread_create(struct dpif_flow_dump *dump_)
1440 {
1441     struct dpif_netdev_flow_dump *dump = dpif_netdev_flow_dump_cast(dump_);
1442     struct dpif_netdev_flow_dump_thread *thread;
1443
1444     thread = xmalloc(sizeof *thread);
1445     dpif_flow_dump_thread_init(&thread->up, &dump->up);
1446     thread->dump = dump;
1447     return &thread->up;
1448 }
1449
1450 static void
1451 dpif_netdev_flow_dump_thread_destroy(struct dpif_flow_dump_thread *thread_)
1452 {
1453     struct dpif_netdev_flow_dump_thread *thread
1454         = dpif_netdev_flow_dump_thread_cast(thread_);
1455
1456     free(thread);
1457 }
1458
1459 /* XXX the caller must use 'actions' without quiescing */
1460 static int
1461 dpif_netdev_flow_dump_next(struct dpif_flow_dump_thread *thread_,
1462                            struct dpif_flow *f, int max_flows OVS_UNUSED)
1463 {
1464     struct dpif_netdev_flow_dump_thread *thread
1465         = dpif_netdev_flow_dump_thread_cast(thread_);
1466     struct dpif_netdev_flow_dump *dump = thread->dump;
1467     struct dpif_netdev *dpif = dpif_netdev_cast(thread->up.dpif);
1468     struct dp_netdev *dp = get_dp_netdev(&dpif->dpif);
1469     struct dp_netdev_flow *netdev_flow;
1470     struct flow_wildcards wc;
1471     struct dp_netdev_actions *dp_actions;
1472     struct ofpbuf buf;
1473     int error;
1474
1475     ovs_mutex_lock(&dump->mutex);
1476     error = dump->status;
1477     if (!error) {
1478         struct hmap_node *node;
1479
1480         fat_rwlock_rdlock(&dp->cls.rwlock);
1481         node = hmap_at_position(&dp->flow_table, &dump->bucket, &dump->offset);
1482         if (node) {
1483             netdev_flow = CONTAINER_OF(node, struct dp_netdev_flow, node);
1484         }
1485         fat_rwlock_unlock(&dp->cls.rwlock);
1486         if (!node) {
1487             dump->status = error = EOF;
1488         }
1489     }
1490     ovs_mutex_unlock(&dump->mutex);
1491     if (error) {
1492         return 0;
1493     }
1494
1495     minimask_expand(&netdev_flow->cr.match.mask, &wc);
1496
1497     /* Key. */
1498     ofpbuf_use_stack(&buf, &thread->keybuf, sizeof thread->keybuf);
1499     odp_flow_key_from_flow(&buf, &netdev_flow->flow, &wc.masks,
1500                            netdev_flow->flow.in_port.odp_port, true);
1501     f->key = ofpbuf_data(&buf);
1502     f->key_len = ofpbuf_size(&buf);
1503
1504     /* Mask. */
1505     ofpbuf_use_stack(&buf, &thread->maskbuf, sizeof thread->maskbuf);
1506     odp_flow_key_from_mask(&buf, &wc.masks, &netdev_flow->flow,
1507                            odp_to_u32(wc.masks.in_port.odp_port),
1508                            SIZE_MAX, true);
1509     f->mask = ofpbuf_data(&buf);
1510     f->mask_len = ofpbuf_size(&buf);
1511
1512     /* Actions. */
1513     dp_actions = dp_netdev_flow_get_actions(netdev_flow);
1514     f->actions = dp_actions->actions;
1515     f->actions_len = dp_actions->size;
1516
1517     /* Stats. */
1518     get_dpif_flow_stats(netdev_flow, &f->stats);
1519
1520     return 1;
1521 }
1522
1523 static int
1524 dpif_netdev_execute(struct dpif *dpif, struct dpif_execute *execute)
1525 {
1526     struct dp_netdev *dp = get_dp_netdev(dpif);
1527     struct dpif_packet packet;
1528     struct pkt_metadata *md = &execute->md;
1529     struct {
1530         struct miniflow flow;
1531         uint32_t buf[FLOW_U32S];
1532     } key;
1533
1534     if (ofpbuf_size(execute->packet) < ETH_HEADER_LEN ||
1535         ofpbuf_size(execute->packet) > UINT16_MAX) {
1536         return EINVAL;
1537     }
1538
1539     /* Extract flow key. */
1540     miniflow_initialize(&key.flow, key.buf);
1541     miniflow_extract(execute->packet, md, &key.flow);
1542
1543     packet.ofpbuf = *execute->packet;
1544
1545     dp_netdev_execute_actions(dp, &key.flow, &packet, false, md,
1546                               execute->actions, execute->actions_len);
1547
1548     /* Even though may_steal is set to false, some actions could modify or
1549      * reallocate the ofpbuf memory. We need to pass those changes to the
1550      * caller */
1551     *execute->packet = packet.ofpbuf;
1552
1553     return 0;
1554 }
1555
1556 static void
1557 dp_netdev_destroy_all_queues(struct dp_netdev *dp)
1558     OVS_REQ_WRLOCK(dp->queue_rwlock)
1559 {
1560     size_t i;
1561
1562     dp_netdev_purge_queues(dp);
1563
1564     for (i = 0; i < dp->n_handlers; i++) {
1565         struct dp_netdev_queue *q = &dp->handler_queues[i];
1566
1567         ovs_mutex_destroy(&q->mutex);
1568         seq_destroy(q->seq);
1569     }
1570     free(dp->handler_queues);
1571     dp->handler_queues = NULL;
1572     dp->n_handlers = 0;
1573 }
1574
1575 static void
1576 dp_netdev_refresh_queues(struct dp_netdev *dp, uint32_t n_handlers)
1577     OVS_REQ_WRLOCK(dp->queue_rwlock)
1578 {
1579     if (dp->n_handlers != n_handlers) {
1580         size_t i;
1581
1582         dp_netdev_destroy_all_queues(dp);
1583
1584         dp->n_handlers = n_handlers;
1585         dp->handler_queues = xzalloc(n_handlers * sizeof *dp->handler_queues);
1586
1587         for (i = 0; i < n_handlers; i++) {
1588             struct dp_netdev_queue *q = &dp->handler_queues[i];
1589
1590             ovs_mutex_init(&q->mutex);
1591             q->seq = seq_create();
1592         }
1593     }
1594 }
1595
1596 static int
1597 dpif_netdev_recv_set(struct dpif *dpif, bool enable)
1598 {
1599     struct dp_netdev *dp = get_dp_netdev(dpif);
1600
1601     if ((dp->handler_queues != NULL) == enable) {
1602         return 0;
1603     }
1604
1605     fat_rwlock_wrlock(&dp->queue_rwlock);
1606     if (!enable) {
1607         dp_netdev_destroy_all_queues(dp);
1608     } else {
1609         dp_netdev_refresh_queues(dp, 1);
1610     }
1611     fat_rwlock_unlock(&dp->queue_rwlock);
1612
1613     return 0;
1614 }
1615
1616 static int
1617 dpif_netdev_handlers_set(struct dpif *dpif, uint32_t n_handlers)
1618 {
1619     struct dp_netdev *dp = get_dp_netdev(dpif);
1620
1621     fat_rwlock_wrlock(&dp->queue_rwlock);
1622     if (dp->handler_queues) {
1623         dp_netdev_refresh_queues(dp, n_handlers);
1624     }
1625     fat_rwlock_unlock(&dp->queue_rwlock);
1626
1627     return 0;
1628 }
1629
1630 static int
1631 dpif_netdev_queue_to_priority(const struct dpif *dpif OVS_UNUSED,
1632                               uint32_t queue_id, uint32_t *priority)
1633 {
1634     *priority = queue_id;
1635     return 0;
1636 }
1637
1638 static bool
1639 dp_netdev_recv_check(const struct dp_netdev *dp, const uint32_t handler_id)
1640     OVS_REQ_RDLOCK(dp->queue_rwlock)
1641 {
1642     static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
1643
1644     if (!dp->handler_queues) {
1645         VLOG_WARN_RL(&rl, "receiving upcall disabled");
1646         return false;
1647     }
1648
1649     if (handler_id >= dp->n_handlers) {
1650         VLOG_WARN_RL(&rl, "handler index out of bound");
1651         return false;
1652     }
1653
1654     return true;
1655 }
1656
1657 static int
1658 dpif_netdev_recv(struct dpif *dpif, uint32_t handler_id,
1659                  struct dpif_upcall *upcall, struct ofpbuf *buf)
1660 {
1661     struct dp_netdev *dp = get_dp_netdev(dpif);
1662     struct dp_netdev_queue *q;
1663     int error = 0;
1664
1665     fat_rwlock_rdlock(&dp->queue_rwlock);
1666
1667     if (!dp_netdev_recv_check(dp, handler_id)) {
1668         error = EAGAIN;
1669         goto out;
1670     }
1671
1672     q = &dp->handler_queues[handler_id];
1673     ovs_mutex_lock(&q->mutex);
1674     if (q->head != q->tail) {
1675         struct dp_netdev_upcall *u = &q->upcalls[q->tail++ & QUEUE_MASK];
1676
1677         *upcall = u->upcall;
1678
1679         ofpbuf_uninit(buf);
1680         *buf = u->buf;
1681     } else {
1682         error = EAGAIN;
1683     }
1684     ovs_mutex_unlock(&q->mutex);
1685
1686 out:
1687     fat_rwlock_unlock(&dp->queue_rwlock);
1688
1689     return error;
1690 }
1691
1692 static void
1693 dpif_netdev_recv_wait(struct dpif *dpif, uint32_t handler_id)
1694 {
1695     struct dp_netdev *dp = get_dp_netdev(dpif);
1696     struct dp_netdev_queue *q;
1697     uint64_t seq;
1698
1699     fat_rwlock_rdlock(&dp->queue_rwlock);
1700
1701     if (!dp_netdev_recv_check(dp, handler_id)) {
1702         goto out;
1703     }
1704
1705     q = &dp->handler_queues[handler_id];
1706     ovs_mutex_lock(&q->mutex);
1707     seq = seq_read(q->seq);
1708     if (q->head != q->tail) {
1709         poll_immediate_wake();
1710     } else {
1711         seq_wait(q->seq, seq);
1712     }
1713
1714     ovs_mutex_unlock(&q->mutex);
1715
1716 out:
1717     fat_rwlock_unlock(&dp->queue_rwlock);
1718 }
1719
1720 static void
1721 dpif_netdev_recv_purge(struct dpif *dpif)
1722 {
1723     struct dpif_netdev *dpif_netdev = dpif_netdev_cast(dpif);
1724
1725     fat_rwlock_wrlock(&dpif_netdev->dp->queue_rwlock);
1726     dp_netdev_purge_queues(dpif_netdev->dp);
1727     fat_rwlock_unlock(&dpif_netdev->dp->queue_rwlock);
1728 }
1729 \f
1730 /* Creates and returns a new 'struct dp_netdev_actions', with a reference count
1731  * of 1, whose actions are a copy of from the 'ofpacts_len' bytes of
1732  * 'ofpacts'. */
1733 struct dp_netdev_actions *
1734 dp_netdev_actions_create(const struct nlattr *actions, size_t size)
1735 {
1736     struct dp_netdev_actions *netdev_actions;
1737
1738     netdev_actions = xmalloc(sizeof *netdev_actions);
1739     netdev_actions->actions = xmemdup(actions, size);
1740     netdev_actions->size = size;
1741
1742     return netdev_actions;
1743 }
1744
1745 struct dp_netdev_actions *
1746 dp_netdev_flow_get_actions(const struct dp_netdev_flow *flow)
1747 {
1748     return ovsrcu_get(struct dp_netdev_actions *, &flow->actions);
1749 }
1750
1751 static void
1752 dp_netdev_actions_free(struct dp_netdev_actions *actions)
1753 {
1754     free(actions->actions);
1755     free(actions);
1756 }
1757 \f
1758
1759 static void
1760 dp_netdev_process_rxq_port(struct dp_netdev *dp,
1761                           struct dp_netdev_port *port,
1762                           struct netdev_rxq *rxq)
1763 {
1764     struct dpif_packet *packet[NETDEV_MAX_RX_BATCH];
1765     int error, c;
1766
1767     error = netdev_rxq_recv(rxq, packet, &c);
1768     if (!error) {
1769         struct pkt_metadata md = PKT_METADATA_INITIALIZER(port->port_no);
1770         int i;
1771
1772         for (i = 0; i < c; i++) {
1773             dp_netdev_port_input(dp, packet[i], &md);
1774         }
1775     } else if (error != EAGAIN && error != EOPNOTSUPP) {
1776         static struct vlog_rate_limit rl
1777             = VLOG_RATE_LIMIT_INIT(1, 5);
1778
1779         VLOG_ERR_RL(&rl, "error receiving data from %s: %s",
1780                     netdev_get_name(port->netdev),
1781                     ovs_strerror(error));
1782     }
1783 }
1784
1785 static void
1786 dpif_netdev_run(struct dpif *dpif)
1787 {
1788     struct dp_netdev_port *port;
1789     struct dp_netdev *dp = get_dp_netdev(dpif);
1790
1791     CMAP_FOR_EACH (port, node, &dp->ports) {
1792         if (!netdev_is_pmd(port->netdev)) {
1793             int i;
1794
1795             for (i = 0; i < netdev_n_rxq(port->netdev); i++) {
1796                 dp_netdev_process_rxq_port(dp, port, port->rxq[i]);
1797             }
1798         }
1799     }
1800 }
1801
1802 static void
1803 dpif_netdev_wait(struct dpif *dpif)
1804 {
1805     struct dp_netdev_port *port;
1806     struct dp_netdev *dp = get_dp_netdev(dpif);
1807
1808     ovs_mutex_lock(&dp_netdev_mutex);
1809     CMAP_FOR_EACH (port, node, &dp->ports) {
1810         if (!netdev_is_pmd(port->netdev)) {
1811             int i;
1812
1813             for (i = 0; i < netdev_n_rxq(port->netdev); i++) {
1814                 netdev_rxq_wait(port->rxq[i]);
1815             }
1816         }
1817     }
1818     ovs_mutex_unlock(&dp_netdev_mutex);
1819 }
1820
1821 struct rxq_poll {
1822     struct dp_netdev_port *port;
1823     struct netdev_rxq *rx;
1824 };
1825
1826 static int
1827 pmd_load_queues(struct pmd_thread *f,
1828                 struct rxq_poll **ppoll_list, int poll_cnt)
1829 {
1830     struct dp_netdev *dp = f->dp;
1831     struct rxq_poll *poll_list = *ppoll_list;
1832     struct dp_netdev_port *port;
1833     int id = f->id;
1834     int index;
1835     int i;
1836
1837     /* Simple scheduler for netdev rx polling. */
1838     for (i = 0; i < poll_cnt; i++) {
1839          port_unref(poll_list[i].port);
1840     }
1841
1842     poll_cnt = 0;
1843     index = 0;
1844
1845     CMAP_FOR_EACH (port, node, &f->dp->ports) {
1846         if (netdev_is_pmd(port->netdev)) {
1847             int i;
1848
1849             for (i = 0; i < netdev_n_rxq(port->netdev); i++) {
1850                 if ((index % dp->n_pmd_threads) == id) {
1851                     poll_list = xrealloc(poll_list, sizeof *poll_list * (poll_cnt + 1));
1852
1853                     port_ref(port);
1854                     poll_list[poll_cnt].port = port;
1855                     poll_list[poll_cnt].rx = port->rxq[i];
1856                     poll_cnt++;
1857                 }
1858                 index++;
1859             }
1860         }
1861     }
1862
1863     *ppoll_list = poll_list;
1864     return poll_cnt;
1865 }
1866
1867 static void *
1868 pmd_thread_main(void *f_)
1869 {
1870     struct pmd_thread *f = f_;
1871     struct dp_netdev *dp = f->dp;
1872     unsigned int lc = 0;
1873     struct rxq_poll *poll_list;
1874     unsigned int port_seq;
1875     int poll_cnt;
1876     int i;
1877
1878     poll_cnt = 0;
1879     poll_list = NULL;
1880
1881     pmd_thread_setaffinity_cpu(f->id);
1882 reload:
1883     poll_cnt = pmd_load_queues(f, &poll_list, poll_cnt);
1884     atomic_read(&f->change_seq, &port_seq);
1885
1886     for (;;) {
1887         unsigned int c_port_seq;
1888         int i;
1889
1890         for (i = 0; i < poll_cnt; i++) {
1891             dp_netdev_process_rxq_port(dp,  poll_list[i].port, poll_list[i].rx);
1892         }
1893
1894         if (lc++ > 1024) {
1895             ovsrcu_quiesce();
1896
1897             /* TODO: need completely userspace based signaling method.
1898              * to keep this thread entirely in userspace.
1899              * For now using atomic counter. */
1900             lc = 0;
1901             atomic_read_explicit(&f->change_seq, &c_port_seq, memory_order_consume);
1902             if (c_port_seq != port_seq) {
1903                 break;
1904             }
1905         }
1906     }
1907
1908     if (!latch_is_set(&f->dp->exit_latch)){
1909         goto reload;
1910     }
1911
1912     for (i = 0; i < poll_cnt; i++) {
1913          port_unref(poll_list[i].port);
1914     }
1915
1916     free(poll_list);
1917     return NULL;
1918 }
1919
1920 static void
1921 dp_netdev_set_pmd_threads(struct dp_netdev *dp, int n)
1922 {
1923     int i;
1924
1925     if (n == dp->n_pmd_threads) {
1926         return;
1927     }
1928
1929     /* Stop existing threads. */
1930     latch_set(&dp->exit_latch);
1931     dp_netdev_reload_pmd_threads(dp);
1932     for (i = 0; i < dp->n_pmd_threads; i++) {
1933         struct pmd_thread *f = &dp->pmd_threads[i];
1934
1935         xpthread_join(f->thread, NULL);
1936     }
1937     latch_poll(&dp->exit_latch);
1938     free(dp->pmd_threads);
1939
1940     /* Start new threads. */
1941     dp->pmd_threads = xmalloc(n * sizeof *dp->pmd_threads);
1942     dp->n_pmd_threads = n;
1943
1944     for (i = 0; i < n; i++) {
1945         struct pmd_thread *f = &dp->pmd_threads[i];
1946
1947         f->dp = dp;
1948         f->id = i;
1949         atomic_store(&f->change_seq, 1);
1950
1951         /* Each thread will distribute all devices rx-queues among
1952          * themselves. */
1953         f->thread = ovs_thread_create("pmd", pmd_thread_main, f);
1954     }
1955 }
1956
1957 \f
1958 static void *
1959 dp_netdev_flow_stats_new_cb(void)
1960 {
1961     struct dp_netdev_flow_stats *bucket = xzalloc_cacheline(sizeof *bucket);
1962     ovs_mutex_init(&bucket->mutex);
1963     return bucket;
1964 }
1965
1966 static void
1967 dp_netdev_flow_used(struct dp_netdev_flow *netdev_flow,
1968                     const struct ofpbuf *packet,
1969                     const struct miniflow *key)
1970 {
1971     uint16_t tcp_flags = miniflow_get_tcp_flags(key);
1972     long long int now = time_msec();
1973     struct dp_netdev_flow_stats *bucket;
1974
1975     bucket = ovsthread_stats_bucket_get(&netdev_flow->stats,
1976                                         dp_netdev_flow_stats_new_cb);
1977
1978     ovs_mutex_lock(&bucket->mutex);
1979     bucket->used = MAX(now, bucket->used);
1980     bucket->packet_count++;
1981     bucket->byte_count += ofpbuf_size(packet);
1982     bucket->tcp_flags |= tcp_flags;
1983     ovs_mutex_unlock(&bucket->mutex);
1984 }
1985
1986 static void *
1987 dp_netdev_stats_new_cb(void)
1988 {
1989     struct dp_netdev_stats *bucket = xzalloc_cacheline(sizeof *bucket);
1990     ovs_mutex_init(&bucket->mutex);
1991     return bucket;
1992 }
1993
1994 static void
1995 dp_netdev_count_packet(struct dp_netdev *dp, enum dp_stat_type type)
1996 {
1997     struct dp_netdev_stats *bucket;
1998
1999     bucket = ovsthread_stats_bucket_get(&dp->stats, dp_netdev_stats_new_cb);
2000     ovs_mutex_lock(&bucket->mutex);
2001     bucket->n[type]++;
2002     ovs_mutex_unlock(&bucket->mutex);
2003 }
2004
2005 static void
2006 dp_netdev_input(struct dp_netdev *dp, struct dpif_packet *packet,
2007                 struct pkt_metadata *md)
2008 {
2009     struct dp_netdev_flow *netdev_flow;
2010     struct ofpbuf *buf = &packet->ofpbuf;
2011     struct {
2012         struct miniflow flow;
2013         uint32_t buf[FLOW_U32S];
2014     } key;
2015
2016     if (ofpbuf_size(buf) < ETH_HEADER_LEN) {
2017         dpif_packet_delete(packet);
2018         return;
2019     }
2020     miniflow_initialize(&key.flow, key.buf);
2021     miniflow_extract(buf, md, &key.flow);
2022
2023     netdev_flow = dp_netdev_lookup_flow(dp, &key.flow);
2024     if (netdev_flow) {
2025         struct dp_netdev_actions *actions;
2026
2027         dp_netdev_flow_used(netdev_flow, buf, &key.flow);
2028
2029         actions = dp_netdev_flow_get_actions(netdev_flow);
2030         dp_netdev_execute_actions(dp, &key.flow, packet, true, md,
2031                                   actions->actions, actions->size);
2032         dp_netdev_count_packet(dp, DP_STAT_HIT);
2033     } else if (dp->handler_queues) {
2034         dp_netdev_count_packet(dp, DP_STAT_MISS);
2035         dp_netdev_output_userspace(dp, buf, miniflow_hash_5tuple(&key.flow, 0)
2036                                    % dp->n_handlers,
2037                                    DPIF_UC_MISS, &key.flow, NULL);
2038     }
2039 }
2040
2041 static void
2042 dp_netdev_port_input(struct dp_netdev *dp, struct dpif_packet *packet,
2043                      struct pkt_metadata *md)
2044 {
2045     uint32_t *recirc_depth = recirc_depth_get();
2046
2047     *recirc_depth = 0;
2048     dp_netdev_input(dp, packet, md);
2049 }
2050
2051 static int
2052 dp_netdev_output_userspace(struct dp_netdev *dp, struct ofpbuf *packet,
2053                            int queue_no, int type, const struct miniflow *key,
2054                            const struct nlattr *userdata)
2055 {
2056     struct dp_netdev_queue *q;
2057     int error;
2058
2059     fat_rwlock_rdlock(&dp->queue_rwlock);
2060     q = &dp->handler_queues[queue_no];
2061     ovs_mutex_lock(&q->mutex);
2062     if (q->head - q->tail < MAX_QUEUE_LEN) {
2063         struct dp_netdev_upcall *u = &q->upcalls[q->head++ & QUEUE_MASK];
2064         struct dpif_upcall *upcall = &u->upcall;
2065         struct ofpbuf *buf = &u->buf;
2066         size_t buf_size;
2067         struct flow flow;
2068
2069         upcall->type = type;
2070
2071         /* Allocate buffer big enough for everything. */
2072         buf_size = ODPUTIL_FLOW_KEY_BYTES;
2073         if (userdata) {
2074             buf_size += NLA_ALIGN(userdata->nla_len);
2075         }
2076         ofpbuf_init(buf, buf_size);
2077
2078         /* Put ODP flow. */
2079         miniflow_expand(key, &flow);
2080         odp_flow_key_from_flow(buf, &flow, NULL, flow.in_port.odp_port, true);
2081         upcall->key = ofpbuf_data(buf);
2082         upcall->key_len = ofpbuf_size(buf);
2083
2084         /* Put userdata. */
2085         if (userdata) {
2086             upcall->userdata = ofpbuf_put(buf, userdata,
2087                                           NLA_ALIGN(userdata->nla_len));
2088         }
2089
2090         upcall->packet = *packet;
2091
2092         seq_change(q->seq);
2093
2094         error = 0;
2095     } else {
2096         dp_netdev_count_packet(dp, DP_STAT_LOST);
2097         ofpbuf_delete(packet);
2098         error = ENOBUFS;
2099     }
2100     ovs_mutex_unlock(&q->mutex);
2101     fat_rwlock_unlock(&dp->queue_rwlock);
2102
2103     return error;
2104 }
2105
2106 struct dp_netdev_execute_aux {
2107     struct dp_netdev *dp;
2108     const struct miniflow *key;
2109 };
2110
2111 static void
2112 dp_execute_cb(void *aux_, struct dpif_packet *packet,
2113               struct pkt_metadata *md,
2114               const struct nlattr *a, bool may_steal)
2115     OVS_NO_THREAD_SAFETY_ANALYSIS
2116 {
2117     struct dp_netdev_execute_aux *aux = aux_;
2118     int type = nl_attr_type(a);
2119     struct dp_netdev_port *p;
2120     uint32_t *depth = recirc_depth_get();
2121
2122     switch ((enum ovs_action_attr)type) {
2123     case OVS_ACTION_ATTR_OUTPUT:
2124         p = dp_netdev_lookup_port(aux->dp, u32_to_odp(nl_attr_get_u32(a)));
2125         if (p) {
2126             netdev_send(p->netdev, &packet, 1, may_steal);
2127         }
2128         break;
2129
2130     case OVS_ACTION_ATTR_USERSPACE: {
2131         struct ofpbuf *userspace_packet;
2132         const struct nlattr *userdata;
2133
2134         userdata = nl_attr_find_nested(a, OVS_USERSPACE_ATTR_USERDATA);
2135         userspace_packet = may_steal
2136                            ? &packet->ofpbuf
2137                            : ofpbuf_clone(&packet->ofpbuf);
2138
2139         dp_netdev_output_userspace(aux->dp, userspace_packet,
2140                                    miniflow_hash_5tuple(aux->key, 0)
2141                                        % aux->dp->n_handlers,
2142                                    DPIF_UC_ACTION, aux->key,
2143                                    userdata);
2144         break;
2145     }
2146
2147     case OVS_ACTION_ATTR_HASH: {
2148         const struct ovs_action_hash *hash_act;
2149         uint32_t hash;
2150
2151         hash_act = nl_attr_get(a);
2152         if (hash_act->hash_alg == OVS_HASH_ALG_L4) {
2153             /* Hash need not be symmetric, nor does it need to include
2154              * L2 fields. */
2155             hash = miniflow_hash_5tuple(aux->key, hash_act->hash_basis);
2156             if (!hash) {
2157                 hash = 1; /* 0 is not valid */
2158             }
2159
2160         } else {
2161             VLOG_WARN("Unknown hash algorithm specified for the hash action.");
2162             hash = 2;
2163         }
2164
2165         md->dp_hash = hash;
2166         break;
2167     }
2168
2169     case OVS_ACTION_ATTR_RECIRC:
2170         if (*depth < MAX_RECIRC_DEPTH) {
2171             struct pkt_metadata recirc_md = *md;
2172             struct dpif_packet *recirc_packet;
2173
2174             recirc_packet = may_steal ? packet : dpif_packet_clone(packet);
2175             recirc_md.recirc_id = nl_attr_get_u32(a);
2176
2177             (*depth)++;
2178             dp_netdev_input(aux->dp, recirc_packet, &recirc_md);
2179             (*depth)--;
2180
2181             break;
2182         } else {
2183             VLOG_WARN("Packet dropped. Max recirculation depth exceeded.");
2184         }
2185         break;
2186
2187     case OVS_ACTION_ATTR_PUSH_VLAN:
2188     case OVS_ACTION_ATTR_POP_VLAN:
2189     case OVS_ACTION_ATTR_PUSH_MPLS:
2190     case OVS_ACTION_ATTR_POP_MPLS:
2191     case OVS_ACTION_ATTR_SET:
2192     case OVS_ACTION_ATTR_SAMPLE:
2193     case OVS_ACTION_ATTR_UNSPEC:
2194     case __OVS_ACTION_ATTR_MAX:
2195         OVS_NOT_REACHED();
2196     }
2197 }
2198
2199 static void
2200 dp_netdev_execute_actions(struct dp_netdev *dp, const struct miniflow *key,
2201                           struct dpif_packet *packet, bool may_steal,
2202                           struct pkt_metadata *md,
2203                           const struct nlattr *actions, size_t actions_len)
2204 {
2205     struct dp_netdev_execute_aux aux = {dp, key};
2206
2207     odp_execute_actions(&aux, packet, may_steal, md,
2208                         actions, actions_len, dp_execute_cb);
2209 }
2210
2211 const struct dpif_class dpif_netdev_class = {
2212     "netdev",
2213     dpif_netdev_enumerate,
2214     dpif_netdev_port_open_type,
2215     dpif_netdev_open,
2216     dpif_netdev_close,
2217     dpif_netdev_destroy,
2218     dpif_netdev_run,
2219     dpif_netdev_wait,
2220     dpif_netdev_get_stats,
2221     dpif_netdev_port_add,
2222     dpif_netdev_port_del,
2223     dpif_netdev_port_query_by_number,
2224     dpif_netdev_port_query_by_name,
2225     NULL,                       /* port_get_pid */
2226     dpif_netdev_port_dump_start,
2227     dpif_netdev_port_dump_next,
2228     dpif_netdev_port_dump_done,
2229     dpif_netdev_port_poll,
2230     dpif_netdev_port_poll_wait,
2231     dpif_netdev_flow_get,
2232     dpif_netdev_flow_put,
2233     dpif_netdev_flow_del,
2234     dpif_netdev_flow_flush,
2235     dpif_netdev_flow_dump_create,
2236     dpif_netdev_flow_dump_destroy,
2237     dpif_netdev_flow_dump_thread_create,
2238     dpif_netdev_flow_dump_thread_destroy,
2239     dpif_netdev_flow_dump_next,
2240     dpif_netdev_execute,
2241     NULL,                       /* operate */
2242     dpif_netdev_recv_set,
2243     dpif_netdev_handlers_set,
2244     dpif_netdev_queue_to_priority,
2245     dpif_netdev_recv,
2246     dpif_netdev_recv_wait,
2247     dpif_netdev_recv_purge,
2248 };
2249
2250 static void
2251 dpif_dummy_change_port_number(struct unixctl_conn *conn, int argc OVS_UNUSED,
2252                               const char *argv[], void *aux OVS_UNUSED)
2253 {
2254     struct dp_netdev_port *old_port;
2255     struct dp_netdev_port *new_port;
2256     struct dp_netdev *dp;
2257     odp_port_t port_no;
2258
2259     ovs_mutex_lock(&dp_netdev_mutex);
2260     dp = shash_find_data(&dp_netdevs, argv[1]);
2261     if (!dp || !dpif_netdev_class_is_dummy(dp->class)) {
2262         ovs_mutex_unlock(&dp_netdev_mutex);
2263         unixctl_command_reply_error(conn, "unknown datapath or not a dummy");
2264         return;
2265     }
2266     ovs_refcount_ref(&dp->ref_cnt);
2267     ovs_mutex_unlock(&dp_netdev_mutex);
2268
2269     ovs_mutex_lock(&dp->port_mutex);
2270     if (get_port_by_name(dp, argv[2], &old_port)) {
2271         unixctl_command_reply_error(conn, "unknown port");
2272         goto exit;
2273     }
2274
2275     port_no = u32_to_odp(atoi(argv[3]));
2276     if (!port_no || port_no == ODPP_NONE) {
2277         unixctl_command_reply_error(conn, "bad port number");
2278         goto exit;
2279     }
2280     if (dp_netdev_lookup_port(dp, port_no)) {
2281         unixctl_command_reply_error(conn, "port number already in use");
2282         goto exit;
2283     }
2284
2285     /* Remove old port. */
2286     cmap_remove(&dp->ports, &old_port->node, hash_port_no(old_port->port_no));
2287     ovsrcu_postpone(free, old_port);
2288
2289     /* Insert new port (cmap semantics mean we cannot re-insert 'old_port'). */
2290     new_port = xmemdup(old_port, sizeof *old_port);
2291     new_port->port_no = port_no;
2292     cmap_insert(&dp->ports, &new_port->node, hash_port_no(port_no));
2293
2294     seq_change(dp->port_seq);
2295     unixctl_command_reply(conn, NULL);
2296
2297 exit:
2298     ovs_mutex_unlock(&dp->port_mutex);
2299     dp_netdev_unref(dp);
2300 }
2301
2302 static void
2303 dpif_dummy_delete_port(struct unixctl_conn *conn, int argc OVS_UNUSED,
2304                        const char *argv[], void *aux OVS_UNUSED)
2305 {
2306     struct dp_netdev_port *port;
2307     struct dp_netdev *dp;
2308
2309     ovs_mutex_lock(&dp_netdev_mutex);
2310     dp = shash_find_data(&dp_netdevs, argv[1]);
2311     if (!dp || !dpif_netdev_class_is_dummy(dp->class)) {
2312         ovs_mutex_unlock(&dp_netdev_mutex);
2313         unixctl_command_reply_error(conn, "unknown datapath or not a dummy");
2314         return;
2315     }
2316     ovs_refcount_ref(&dp->ref_cnt);
2317     ovs_mutex_unlock(&dp_netdev_mutex);
2318
2319     ovs_mutex_lock(&dp->port_mutex);
2320     if (get_port_by_name(dp, argv[2], &port)) {
2321         unixctl_command_reply_error(conn, "unknown port");
2322     } else if (port->port_no == ODPP_LOCAL) {
2323         unixctl_command_reply_error(conn, "can't delete local port");
2324     } else {
2325         do_del_port(dp, port);
2326         unixctl_command_reply(conn, NULL);
2327     }
2328     ovs_mutex_unlock(&dp->port_mutex);
2329
2330     dp_netdev_unref(dp);
2331 }
2332
2333 static void
2334 dpif_dummy_register__(const char *type)
2335 {
2336     struct dpif_class *class;
2337
2338     class = xmalloc(sizeof *class);
2339     *class = dpif_netdev_class;
2340     class->type = xstrdup(type);
2341     dp_register_provider(class);
2342 }
2343
2344 void
2345 dpif_dummy_register(bool override)
2346 {
2347     if (override) {
2348         struct sset types;
2349         const char *type;
2350
2351         sset_init(&types);
2352         dp_enumerate_types(&types);
2353         SSET_FOR_EACH (type, &types) {
2354             if (!dp_unregister_provider(type)) {
2355                 dpif_dummy_register__(type);
2356             }
2357         }
2358         sset_destroy(&types);
2359     }
2360
2361     dpif_dummy_register__("dummy");
2362
2363     unixctl_command_register("dpif-dummy/change-port-number",
2364                              "DP PORT NEW-NUMBER",
2365                              3, 3, dpif_dummy_change_port_number, NULL);
2366     unixctl_command_register("dpif-dummy/delete-port", "DP PORT",
2367                              2, 2, dpif_dummy_delete_port, NULL);
2368 }