dpif-netdev: Use cmap instead of hmap.
[cascardo/ovs.git] / lib / dpif-netdev.c
1 /*
2  * Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014 Nicira, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at:
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #include <config.h>
18 #include "dpif.h"
19
20 #include <ctype.h>
21 #include <errno.h>
22 #include <fcntl.h>
23 #include <inttypes.h>
24 #include <netinet/in.h>
25 #include <sys/socket.h>
26 #include <net/if.h>
27 #include <stdint.h>
28 #include <stdlib.h>
29 #include <string.h>
30 #include <sys/ioctl.h>
31 #include <sys/stat.h>
32 #include <unistd.h>
33
34 #include "classifier.h"
35 #include "cmap.h"
36 #include "csum.h"
37 #include "dpif.h"
38 #include "dpif-provider.h"
39 #include "dummy.h"
40 #include "dynamic-string.h"
41 #include "flow.h"
42 #include "cmap.h"
43 #include "latch.h"
44 #include "list.h"
45 #include "meta-flow.h"
46 #include "netdev.h"
47 #include "netdev-dpdk.h"
48 #include "netdev-vport.h"
49 #include "netlink.h"
50 #include "odp-execute.h"
51 #include "odp-util.h"
52 #include "ofp-print.h"
53 #include "ofpbuf.h"
54 #include "ovs-rcu.h"
55 #include "packet-dpif.h"
56 #include "packets.h"
57 #include "poll-loop.h"
58 #include "random.h"
59 #include "seq.h"
60 #include "shash.h"
61 #include "sset.h"
62 #include "timeval.h"
63 #include "unixctl.h"
64 #include "util.h"
65 #include "vlog.h"
66
67 VLOG_DEFINE_THIS_MODULE(dpif_netdev);
68
69 /* By default, choose a priority in the middle. */
70 #define NETDEV_RULE_PRIORITY 0x8000
71
72 #define FLOW_DUMP_MAX_BATCH 50
73 #define NR_THREADS 1
74 /* Use per thread recirc_depth to prevent recirculation loop. */
75 #define MAX_RECIRC_DEPTH 5
76 DEFINE_STATIC_PER_THREAD_DATA(uint32_t, recirc_depth, 0)
77
78 /* Configuration parameters. */
79 enum { MAX_FLOWS = 65536 };     /* Maximum number of flows in flow table. */
80
81 /* Queues. */
82 enum { MAX_QUEUE_LEN = 128 };   /* Maximum number of packets per queue. */
83 enum { QUEUE_MASK = MAX_QUEUE_LEN - 1 };
84 BUILD_ASSERT_DECL(IS_POW2(MAX_QUEUE_LEN));
85
86 /* Protects against changes to 'dp_netdevs'. */
87 static struct ovs_mutex dp_netdev_mutex = OVS_MUTEX_INITIALIZER;
88
89 /* Contains all 'struct dp_netdev's. */
90 static struct shash dp_netdevs OVS_GUARDED_BY(dp_netdev_mutex)
91     = SHASH_INITIALIZER(&dp_netdevs);
92
93 struct dp_netdev_upcall {
94     struct dpif_upcall upcall;  /* Queued upcall information. */
95     struct ofpbuf buf;          /* ofpbuf instance for upcall.packet. */
96 };
97
98 /* A queue passing packets from a struct dp_netdev to its clients (handlers).
99  *
100  *
101  * Thread-safety
102  * =============
103  *
104  * Any access at all requires the owning 'dp_netdev''s queue_rwlock and
105  * its own mutex. */
106 struct dp_netdev_queue {
107     struct ovs_mutex mutex;
108     struct seq *seq;      /* Incremented whenever a packet is queued. */
109     struct dp_netdev_upcall upcalls[MAX_QUEUE_LEN] OVS_GUARDED;
110     unsigned int head OVS_GUARDED;
111     unsigned int tail OVS_GUARDED;
112 };
113
114 /* Datapath based on the network device interface from netdev.h.
115  *
116  *
117  * Thread-safety
118  * =============
119  *
120  * Some members, marked 'const', are immutable.  Accessing other members
121  * requires synchronization, as noted in more detail below.
122  *
123  * Acquisition order is, from outermost to innermost:
124  *
125  *    dp_netdev_mutex (global)
126  *    port_mutex
127  *    flow_mutex
128  *    cls.rwlock
129  *    queue_rwlock
130  */
131 struct dp_netdev {
132     const struct dpif_class *const class;
133     const char *const name;
134     struct ovs_refcount ref_cnt;
135     atomic_flag destroyed;
136
137     /* Flows.
138      *
139      * Readers of 'cls' must take a 'cls->rwlock' read lock.
140      *
141      * Writers of 'flow_table' must take the 'flow_mutex'.
142      *
143      * Writers of 'cls' must take the 'flow_mutex' and then the 'cls->rwlock'
144      * write lock.  (The outer 'flow_mutex' allows writers to atomically
145      * perform multiple operations on 'cls' and 'flow_table'.)
146      */
147     struct ovs_mutex flow_mutex;
148     struct classifier cls;      /* Classifier.  Protected by cls.rwlock. */
149     struct cmap flow_table OVS_GUARDED; /* Flow table. */
150
151     /* Queues.
152      *
153      * 'queue_rwlock' protects the modification of 'handler_queues' and
154      * 'n_handlers'.  The queue elements are protected by its
155      * 'handler_queues''s mutex. */
156     struct fat_rwlock queue_rwlock;
157     struct dp_netdev_queue *handler_queues;
158     uint32_t n_handlers;
159
160     /* Statistics.
161      *
162      * ovsthread_stats is internally synchronized. */
163     struct ovsthread_stats stats; /* Contains 'struct dp_netdev_stats *'. */
164
165     /* Ports.
166      *
167      * Protected by RCU.  Take the mutex to add or remove ports. */
168     struct ovs_mutex port_mutex;
169     struct cmap ports;
170     struct seq *port_seq;       /* Incremented whenever a port changes. */
171
172     /* Forwarding threads. */
173     struct latch exit_latch;
174     struct pmd_thread *pmd_threads;
175     size_t n_pmd_threads;
176     int pmd_count;
177 };
178
179 static struct dp_netdev_port *dp_netdev_lookup_port(const struct dp_netdev *dp,
180                                                     odp_port_t);
181
182 enum dp_stat_type {
183     DP_STAT_HIT,                /* Packets that matched in the flow table. */
184     DP_STAT_MISS,               /* Packets that did not match. */
185     DP_STAT_LOST,               /* Packets not passed up to the client. */
186     DP_N_STATS
187 };
188
189 /* Contained by struct dp_netdev's 'stats' member.  */
190 struct dp_netdev_stats {
191     struct ovs_mutex mutex;          /* Protects 'n'. */
192
193     /* Indexed by DP_STAT_*, protected by 'mutex'. */
194     unsigned long long int n[DP_N_STATS] OVS_GUARDED;
195 };
196
197
198 /* A port in a netdev-based datapath. */
199 struct dp_netdev_port {
200     struct cmap_node node;      /* Node in dp_netdev's 'ports'. */
201     odp_port_t port_no;
202     struct netdev *netdev;
203     struct netdev_saved_flags *sf;
204     struct netdev_rxq **rxq;
205     struct ovs_refcount ref_cnt;
206     char *type;                 /* Port type as requested by user. */
207 };
208
209
210 /* Stores a miniflow */
211
212 /* There are fields in the flow structure that we never use. Therefore we can
213  * save a few words of memory */
214 #define NETDEV_KEY_BUF_SIZE_U32 (FLOW_U32S \
215                                  - FLOW_U32_SIZE(regs) \
216                                  - FLOW_U32_SIZE(metadata) \
217                                 )
218 struct netdev_flow_key {
219     struct miniflow flow;
220     uint32_t buf[NETDEV_KEY_BUF_SIZE_U32];
221 };
222
223 /* A flow in dp_netdev's 'flow_table'.
224  *
225  *
226  * Thread-safety
227  * =============
228  *
229  * Except near the beginning or ending of its lifespan, rule 'rule' belongs to
230  * its dp_netdev's classifier.  The text below calls this classifier 'cls'.
231  *
232  * Motivation
233  * ----------
234  *
235  * The thread safety rules described here for "struct dp_netdev_flow" are
236  * motivated by two goals:
237  *
238  *    - Prevent threads that read members of "struct dp_netdev_flow" from
239  *      reading bad data due to changes by some thread concurrently modifying
240  *      those members.
241  *
242  *    - Prevent two threads making changes to members of a given "struct
243  *      dp_netdev_flow" from interfering with each other.
244  *
245  *
246  * Rules
247  * -----
248  *
249  * A flow 'flow' may be accessed without a risk of being freed by code that
250  * holds a read-lock or write-lock on 'cls->rwlock' or that owns a reference to
251  * 'flow->ref_cnt' (or both).  Code that needs to hold onto a flow for a while
252  * should take 'cls->rwlock', find the flow it needs, increment 'flow->ref_cnt'
253  * with dpif_netdev_flow_ref(), and drop 'cls->rwlock'.
254  *
255  * 'flow->ref_cnt' protects 'flow' from being freed.  It doesn't protect the
256  * flow from being deleted from 'cls' (that's 'cls->rwlock') and it doesn't
257  * protect members of 'flow' from modification.
258  *
259  * Some members, marked 'const', are immutable.  Accessing other members
260  * requires synchronization, as noted in more detail below.
261  */
262 struct dp_netdev_flow {
263     /* Packet classification. */
264     const struct cls_rule cr;   /* In owning dp_netdev's 'cls'. */
265
266     /* Hash table index by unmasked flow. */
267     const struct cmap_node node; /* In owning dp_netdev's 'flow_table'. */
268     const struct flow flow;      /* The flow that created this entry. */
269
270     /* Statistics.
271      *
272      * Reading or writing these members requires 'mutex'. */
273     struct ovsthread_stats stats; /* Contains "struct dp_netdev_flow_stats". */
274
275     /* Actions. */
276     OVSRCU_TYPE(struct dp_netdev_actions *) actions;
277 };
278
279 static void dp_netdev_flow_free(struct dp_netdev_flow *);
280
281 /* Contained by struct dp_netdev_flow's 'stats' member.  */
282 struct dp_netdev_flow_stats {
283     struct ovs_mutex mutex;         /* Guards all the other members. */
284
285     long long int used OVS_GUARDED; /* Last used time, in monotonic msecs. */
286     long long int packet_count OVS_GUARDED; /* Number of packets matched. */
287     long long int byte_count OVS_GUARDED;   /* Number of bytes matched. */
288     uint16_t tcp_flags OVS_GUARDED; /* Bitwise-OR of seen tcp_flags values. */
289 };
290
291 /* A set of datapath actions within a "struct dp_netdev_flow".
292  *
293  *
294  * Thread-safety
295  * =============
296  *
297  * A struct dp_netdev_actions 'actions' is protected with RCU. */
298 struct dp_netdev_actions {
299     /* These members are immutable: they do not change during the struct's
300      * lifetime.  */
301     struct nlattr *actions;     /* Sequence of OVS_ACTION_ATTR_* attributes. */
302     unsigned int size;          /* Size of 'actions', in bytes. */
303 };
304
305 struct dp_netdev_actions *dp_netdev_actions_create(const struct nlattr *,
306                                                    size_t);
307 struct dp_netdev_actions *dp_netdev_flow_get_actions(
308     const struct dp_netdev_flow *);
309 static void dp_netdev_actions_free(struct dp_netdev_actions *);
310
311 /* PMD: Poll modes drivers.  PMD accesses devices via polling to eliminate
312  * the performance overhead of interrupt processing.  Therefore netdev can
313  * not implement rx-wait for these devices.  dpif-netdev needs to poll
314  * these device to check for recv buffer.  pmd-thread does polling for
315  * devices assigned to itself thread.
316  *
317  * DPDK used PMD for accessing NIC.
318  *
319  * A thread that receives packets from PMD ports, looks them up in the flow
320  * table, and executes the actions it finds.
321  **/
322 struct pmd_thread {
323     struct dp_netdev *dp;
324     pthread_t thread;
325     int id;
326     atomic_uint change_seq;
327 };
328
329 /* Interface to netdev-based datapath. */
330 struct dpif_netdev {
331     struct dpif dpif;
332     struct dp_netdev *dp;
333     uint64_t last_port_seq;
334 };
335
336 static int get_port_by_number(struct dp_netdev *dp, odp_port_t port_no,
337                               struct dp_netdev_port **portp);
338 static int get_port_by_name(struct dp_netdev *dp, const char *devname,
339                             struct dp_netdev_port **portp);
340 static void dp_netdev_free(struct dp_netdev *)
341     OVS_REQUIRES(dp_netdev_mutex);
342 static void dp_netdev_flow_flush(struct dp_netdev *);
343 static int do_add_port(struct dp_netdev *dp, const char *devname,
344                        const char *type, odp_port_t port_no)
345     OVS_REQUIRES(dp->port_mutex);
346 static void do_del_port(struct dp_netdev *dp, struct dp_netdev_port *)
347     OVS_REQUIRES(dp->port_mutex);
348 static void dp_netdev_destroy_all_queues(struct dp_netdev *dp)
349     OVS_REQ_WRLOCK(dp->queue_rwlock);
350 static int dpif_netdev_open(const struct dpif_class *, const char *name,
351                             bool create, struct dpif **);
352 static int dp_netdev_output_userspace(struct dp_netdev *dp, struct ofpbuf **,
353                                       int cnt, int queue_no, int type,
354                                       const struct miniflow *,
355                                       const struct nlattr *userdata);
356 static void dp_netdev_execute_actions(struct dp_netdev *dp,
357                                       struct dpif_packet **, int c,
358                                       bool may_steal, struct pkt_metadata *,
359                                       const struct nlattr *actions,
360                                       size_t actions_len);
361 static void dp_netdev_port_input(struct dp_netdev *dp,
362                                  struct dpif_packet **packets, int cnt,
363                                  odp_port_t port_no);
364
365 static void dp_netdev_set_pmd_threads(struct dp_netdev *, int n);
366
367 static struct dpif_netdev *
368 dpif_netdev_cast(const struct dpif *dpif)
369 {
370     ovs_assert(dpif->dpif_class->open == dpif_netdev_open);
371     return CONTAINER_OF(dpif, struct dpif_netdev, dpif);
372 }
373
374 static struct dp_netdev *
375 get_dp_netdev(const struct dpif *dpif)
376 {
377     return dpif_netdev_cast(dpif)->dp;
378 }
379
380 static int
381 dpif_netdev_enumerate(struct sset *all_dps,
382                       const struct dpif_class *dpif_class)
383 {
384     struct shash_node *node;
385
386     ovs_mutex_lock(&dp_netdev_mutex);
387     SHASH_FOR_EACH(node, &dp_netdevs) {
388         struct dp_netdev *dp = node->data;
389         if (dpif_class != dp->class) {
390             /* 'dp_netdevs' contains both "netdev" and "dummy" dpifs.
391              * If the class doesn't match, skip this dpif. */
392              continue;
393         }
394         sset_add(all_dps, node->name);
395     }
396     ovs_mutex_unlock(&dp_netdev_mutex);
397
398     return 0;
399 }
400
401 static bool
402 dpif_netdev_class_is_dummy(const struct dpif_class *class)
403 {
404     return class != &dpif_netdev_class;
405 }
406
407 static const char *
408 dpif_netdev_port_open_type(const struct dpif_class *class, const char *type)
409 {
410     return strcmp(type, "internal") ? type
411                   : dpif_netdev_class_is_dummy(class) ? "dummy"
412                   : "tap";
413 }
414
415 static struct dpif *
416 create_dpif_netdev(struct dp_netdev *dp)
417 {
418     uint16_t netflow_id = hash_string(dp->name, 0);
419     struct dpif_netdev *dpif;
420
421     ovs_refcount_ref(&dp->ref_cnt);
422
423     dpif = xmalloc(sizeof *dpif);
424     dpif_init(&dpif->dpif, dp->class, dp->name, netflow_id >> 8, netflow_id);
425     dpif->dp = dp;
426     dpif->last_port_seq = seq_read(dp->port_seq);
427
428     return &dpif->dpif;
429 }
430
431 /* Choose an unused, non-zero port number and return it on success.
432  * Return ODPP_NONE on failure. */
433 static odp_port_t
434 choose_port(struct dp_netdev *dp, const char *name)
435     OVS_REQUIRES(dp->port_mutex)
436 {
437     uint32_t port_no;
438
439     if (dp->class != &dpif_netdev_class) {
440         const char *p;
441         int start_no = 0;
442
443         /* If the port name begins with "br", start the number search at
444          * 100 to make writing tests easier. */
445         if (!strncmp(name, "br", 2)) {
446             start_no = 100;
447         }
448
449         /* If the port name contains a number, try to assign that port number.
450          * This can make writing unit tests easier because port numbers are
451          * predictable. */
452         for (p = name; *p != '\0'; p++) {
453             if (isdigit((unsigned char) *p)) {
454                 port_no = start_no + strtol(p, NULL, 10);
455                 if (port_no > 0 && port_no != odp_to_u32(ODPP_NONE)
456                     && !dp_netdev_lookup_port(dp, u32_to_odp(port_no))) {
457                     return u32_to_odp(port_no);
458                 }
459                 break;
460             }
461         }
462     }
463
464     for (port_no = 1; port_no <= UINT16_MAX; port_no++) {
465         if (!dp_netdev_lookup_port(dp, u32_to_odp(port_no))) {
466             return u32_to_odp(port_no);
467         }
468     }
469
470     return ODPP_NONE;
471 }
472
473 static int
474 create_dp_netdev(const char *name, const struct dpif_class *class,
475                  struct dp_netdev **dpp)
476     OVS_REQUIRES(dp_netdev_mutex)
477 {
478     struct dp_netdev *dp;
479     int error;
480
481     dp = xzalloc(sizeof *dp);
482     shash_add(&dp_netdevs, name, dp);
483
484     *CONST_CAST(const struct dpif_class **, &dp->class) = class;
485     *CONST_CAST(const char **, &dp->name) = xstrdup(name);
486     ovs_refcount_init(&dp->ref_cnt);
487     atomic_flag_clear(&dp->destroyed);
488
489     ovs_mutex_init(&dp->flow_mutex);
490     classifier_init(&dp->cls, NULL);
491     cmap_init(&dp->flow_table);
492
493     fat_rwlock_init(&dp->queue_rwlock);
494
495     ovsthread_stats_init(&dp->stats);
496
497     ovs_mutex_init(&dp->port_mutex);
498     cmap_init(&dp->ports);
499     dp->port_seq = seq_create();
500     latch_init(&dp->exit_latch);
501
502     ovs_mutex_lock(&dp->port_mutex);
503     error = do_add_port(dp, name, "internal", ODPP_LOCAL);
504     ovs_mutex_unlock(&dp->port_mutex);
505     if (error) {
506         dp_netdev_free(dp);
507         return error;
508     }
509
510     *dpp = dp;
511     return 0;
512 }
513
514 static int
515 dpif_netdev_open(const struct dpif_class *class, const char *name,
516                  bool create, struct dpif **dpifp)
517 {
518     struct dp_netdev *dp;
519     int error;
520
521     ovs_mutex_lock(&dp_netdev_mutex);
522     dp = shash_find_data(&dp_netdevs, name);
523     if (!dp) {
524         error = create ? create_dp_netdev(name, class, &dp) : ENODEV;
525     } else {
526         error = (dp->class != class ? EINVAL
527                  : create ? EEXIST
528                  : 0);
529     }
530     if (!error) {
531         *dpifp = create_dpif_netdev(dp);
532     }
533     ovs_mutex_unlock(&dp_netdev_mutex);
534
535     return error;
536 }
537
538 static void
539 dp_netdev_purge_queues(struct dp_netdev *dp)
540     OVS_REQ_WRLOCK(dp->queue_rwlock)
541 {
542     int i;
543
544     for (i = 0; i < dp->n_handlers; i++) {
545         struct dp_netdev_queue *q = &dp->handler_queues[i];
546
547         ovs_mutex_lock(&q->mutex);
548         while (q->tail != q->head) {
549             struct dp_netdev_upcall *u = &q->upcalls[q->tail++ & QUEUE_MASK];
550             ofpbuf_uninit(&u->upcall.packet);
551             ofpbuf_uninit(&u->buf);
552         }
553         ovs_mutex_unlock(&q->mutex);
554     }
555 }
556
557 /* Requires dp_netdev_mutex so that we can't get a new reference to 'dp'
558  * through the 'dp_netdevs' shash while freeing 'dp'. */
559 static void
560 dp_netdev_free(struct dp_netdev *dp)
561     OVS_REQUIRES(dp_netdev_mutex)
562 {
563     struct dp_netdev_port *port;
564     struct dp_netdev_stats *bucket;
565     int i;
566
567     shash_find_and_delete(&dp_netdevs, dp->name);
568
569     dp_netdev_set_pmd_threads(dp, 0);
570     free(dp->pmd_threads);
571
572     dp_netdev_flow_flush(dp);
573     ovs_mutex_lock(&dp->port_mutex);
574     CMAP_FOR_EACH (port, node, &dp->ports) {
575         do_del_port(dp, port);
576     }
577     ovs_mutex_unlock(&dp->port_mutex);
578
579     OVSTHREAD_STATS_FOR_EACH_BUCKET (bucket, i, &dp->stats) {
580         ovs_mutex_destroy(&bucket->mutex);
581         free_cacheline(bucket);
582     }
583     ovsthread_stats_destroy(&dp->stats);
584
585     fat_rwlock_wrlock(&dp->queue_rwlock);
586     dp_netdev_destroy_all_queues(dp);
587     fat_rwlock_unlock(&dp->queue_rwlock);
588
589     fat_rwlock_destroy(&dp->queue_rwlock);
590
591     classifier_destroy(&dp->cls);
592     cmap_destroy(&dp->flow_table);
593     ovs_mutex_destroy(&dp->flow_mutex);
594     seq_destroy(dp->port_seq);
595     cmap_destroy(&dp->ports);
596     latch_destroy(&dp->exit_latch);
597     free(CONST_CAST(char *, dp->name));
598     free(dp);
599 }
600
601 static void
602 dp_netdev_unref(struct dp_netdev *dp)
603 {
604     if (dp) {
605         /* Take dp_netdev_mutex so that, if dp->ref_cnt falls to zero, we can't
606          * get a new reference to 'dp' through the 'dp_netdevs' shash. */
607         ovs_mutex_lock(&dp_netdev_mutex);
608         if (ovs_refcount_unref(&dp->ref_cnt) == 1) {
609             dp_netdev_free(dp);
610         }
611         ovs_mutex_unlock(&dp_netdev_mutex);
612     }
613 }
614
615 static void
616 dpif_netdev_close(struct dpif *dpif)
617 {
618     struct dp_netdev *dp = get_dp_netdev(dpif);
619
620     dp_netdev_unref(dp);
621     free(dpif);
622 }
623
624 static int
625 dpif_netdev_destroy(struct dpif *dpif)
626 {
627     struct dp_netdev *dp = get_dp_netdev(dpif);
628
629     if (!atomic_flag_test_and_set(&dp->destroyed)) {
630         if (ovs_refcount_unref(&dp->ref_cnt) == 1) {
631             /* Can't happen: 'dpif' still owns a reference to 'dp'. */
632             OVS_NOT_REACHED();
633         }
634     }
635
636     return 0;
637 }
638
639 static int
640 dpif_netdev_get_stats(const struct dpif *dpif, struct dpif_dp_stats *stats)
641 {
642     struct dp_netdev *dp = get_dp_netdev(dpif);
643     struct dp_netdev_stats *bucket;
644     size_t i;
645
646     stats->n_flows = cmap_count(&dp->flow_table);
647
648     stats->n_hit = stats->n_missed = stats->n_lost = 0;
649     OVSTHREAD_STATS_FOR_EACH_BUCKET (bucket, i, &dp->stats) {
650         ovs_mutex_lock(&bucket->mutex);
651         stats->n_hit += bucket->n[DP_STAT_HIT];
652         stats->n_missed += bucket->n[DP_STAT_MISS];
653         stats->n_lost += bucket->n[DP_STAT_LOST];
654         ovs_mutex_unlock(&bucket->mutex);
655     }
656     stats->n_masks = UINT32_MAX;
657     stats->n_mask_hit = UINT64_MAX;
658
659     return 0;
660 }
661
662 static void
663 dp_netdev_reload_pmd_threads(struct dp_netdev *dp)
664 {
665     int i;
666
667     for (i = 0; i < dp->n_pmd_threads; i++) {
668         struct pmd_thread *f = &dp->pmd_threads[i];
669         int id;
670
671         atomic_add(&f->change_seq, 1, &id);
672    }
673 }
674
675 static uint32_t
676 hash_port_no(odp_port_t port_no)
677 {
678     return hash_int(odp_to_u32(port_no), 0);
679 }
680
681 static int
682 do_add_port(struct dp_netdev *dp, const char *devname, const char *type,
683             odp_port_t port_no)
684     OVS_REQUIRES(dp->port_mutex)
685 {
686     struct netdev_saved_flags *sf;
687     struct dp_netdev_port *port;
688     struct netdev *netdev;
689     enum netdev_flags flags;
690     const char *open_type;
691     int error;
692     int i;
693
694     /* XXX reject devices already in some dp_netdev. */
695
696     /* Open and validate network device. */
697     open_type = dpif_netdev_port_open_type(dp->class, type);
698     error = netdev_open(devname, open_type, &netdev);
699     if (error) {
700         return error;
701     }
702     /* XXX reject non-Ethernet devices */
703
704     netdev_get_flags(netdev, &flags);
705     if (flags & NETDEV_LOOPBACK) {
706         VLOG_ERR("%s: cannot add a loopback device", devname);
707         netdev_close(netdev);
708         return EINVAL;
709     }
710
711     port = xzalloc(sizeof *port);
712     port->port_no = port_no;
713     port->netdev = netdev;
714     port->rxq = xmalloc(sizeof *port->rxq * netdev_n_rxq(netdev));
715     port->type = xstrdup(type);
716     for (i = 0; i < netdev_n_rxq(netdev); i++) {
717         error = netdev_rxq_open(netdev, &port->rxq[i], i);
718         if (error
719             && !(error == EOPNOTSUPP && dpif_netdev_class_is_dummy(dp->class))) {
720             VLOG_ERR("%s: cannot receive packets on this network device (%s)",
721                      devname, ovs_strerror(errno));
722             netdev_close(netdev);
723             return error;
724         }
725     }
726
727     error = netdev_turn_flags_on(netdev, NETDEV_PROMISC, &sf);
728     if (error) {
729         for (i = 0; i < netdev_n_rxq(netdev); i++) {
730             netdev_rxq_close(port->rxq[i]);
731         }
732         netdev_close(netdev);
733         free(port->rxq);
734         free(port);
735         return error;
736     }
737     port->sf = sf;
738
739     if (netdev_is_pmd(netdev)) {
740         dp->pmd_count++;
741         dp_netdev_set_pmd_threads(dp, NR_THREADS);
742         dp_netdev_reload_pmd_threads(dp);
743     }
744     ovs_refcount_init(&port->ref_cnt);
745
746     cmap_insert(&dp->ports, &port->node, hash_port_no(port_no));
747     seq_change(dp->port_seq);
748
749     return 0;
750 }
751
752 static int
753 dpif_netdev_port_add(struct dpif *dpif, struct netdev *netdev,
754                      odp_port_t *port_nop)
755 {
756     struct dp_netdev *dp = get_dp_netdev(dpif);
757     char namebuf[NETDEV_VPORT_NAME_BUFSIZE];
758     const char *dpif_port;
759     odp_port_t port_no;
760     int error;
761
762     ovs_mutex_lock(&dp->port_mutex);
763     dpif_port = netdev_vport_get_dpif_port(netdev, namebuf, sizeof namebuf);
764     if (*port_nop != ODPP_NONE) {
765         port_no = *port_nop;
766         error = dp_netdev_lookup_port(dp, *port_nop) ? EBUSY : 0;
767     } else {
768         port_no = choose_port(dp, dpif_port);
769         error = port_no == ODPP_NONE ? EFBIG : 0;
770     }
771     if (!error) {
772         *port_nop = port_no;
773         error = do_add_port(dp, dpif_port, netdev_get_type(netdev), port_no);
774     }
775     ovs_mutex_unlock(&dp->port_mutex);
776
777     return error;
778 }
779
780 static int
781 dpif_netdev_port_del(struct dpif *dpif, odp_port_t port_no)
782 {
783     struct dp_netdev *dp = get_dp_netdev(dpif);
784     int error;
785
786     ovs_mutex_lock(&dp->port_mutex);
787     if (port_no == ODPP_LOCAL) {
788         error = EINVAL;
789     } else {
790         struct dp_netdev_port *port;
791
792         error = get_port_by_number(dp, port_no, &port);
793         if (!error) {
794             do_del_port(dp, port);
795         }
796     }
797     ovs_mutex_unlock(&dp->port_mutex);
798
799     return error;
800 }
801
802 static bool
803 is_valid_port_number(odp_port_t port_no)
804 {
805     return port_no != ODPP_NONE;
806 }
807
808 static struct dp_netdev_port *
809 dp_netdev_lookup_port(const struct dp_netdev *dp, odp_port_t port_no)
810 {
811     struct dp_netdev_port *port;
812
813     CMAP_FOR_EACH_WITH_HASH (port, node, hash_port_no(port_no), &dp->ports) {
814         if (port->port_no == port_no) {
815             return port;
816         }
817     }
818     return NULL;
819 }
820
821 static int
822 get_port_by_number(struct dp_netdev *dp,
823                    odp_port_t port_no, struct dp_netdev_port **portp)
824 {
825     if (!is_valid_port_number(port_no)) {
826         *portp = NULL;
827         return EINVAL;
828     } else {
829         *portp = dp_netdev_lookup_port(dp, port_no);
830         return *portp ? 0 : ENOENT;
831     }
832 }
833
834 static void
835 port_ref(struct dp_netdev_port *port)
836 {
837     if (port) {
838         ovs_refcount_ref(&port->ref_cnt);
839     }
840 }
841
842 static void
843 port_destroy__(struct dp_netdev_port *port)
844 {
845     int n_rxq = netdev_n_rxq(port->netdev);
846     int i;
847
848     netdev_close(port->netdev);
849     netdev_restore_flags(port->sf);
850
851     for (i = 0; i < n_rxq; i++) {
852         netdev_rxq_close(port->rxq[i]);
853     }
854     free(port->rxq);
855     free(port->type);
856     free(port);
857 }
858
859 static void
860 port_unref(struct dp_netdev_port *port)
861 {
862     if (port && ovs_refcount_unref(&port->ref_cnt) == 1) {
863         ovsrcu_postpone(port_destroy__, port);
864     }
865 }
866
867 static int
868 get_port_by_name(struct dp_netdev *dp,
869                  const char *devname, struct dp_netdev_port **portp)
870     OVS_REQUIRES(dp->port_mutex)
871 {
872     struct dp_netdev_port *port;
873
874     CMAP_FOR_EACH (port, node, &dp->ports) {
875         if (!strcmp(netdev_get_name(port->netdev), devname)) {
876             *portp = port;
877             return 0;
878         }
879     }
880     return ENOENT;
881 }
882
883 static void
884 do_del_port(struct dp_netdev *dp, struct dp_netdev_port *port)
885     OVS_REQUIRES(dp->port_mutex)
886 {
887     cmap_remove(&dp->ports, &port->node, hash_odp_port(port->port_no));
888     seq_change(dp->port_seq);
889     if (netdev_is_pmd(port->netdev)) {
890         dp_netdev_reload_pmd_threads(dp);
891     }
892
893     port_unref(port);
894 }
895
896 static void
897 answer_port_query(const struct dp_netdev_port *port,
898                   struct dpif_port *dpif_port)
899 {
900     dpif_port->name = xstrdup(netdev_get_name(port->netdev));
901     dpif_port->type = xstrdup(port->type);
902     dpif_port->port_no = port->port_no;
903 }
904
905 static int
906 dpif_netdev_port_query_by_number(const struct dpif *dpif, odp_port_t port_no,
907                                  struct dpif_port *dpif_port)
908 {
909     struct dp_netdev *dp = get_dp_netdev(dpif);
910     struct dp_netdev_port *port;
911     int error;
912
913     error = get_port_by_number(dp, port_no, &port);
914     if (!error && dpif_port) {
915         answer_port_query(port, dpif_port);
916     }
917
918     return error;
919 }
920
921 static int
922 dpif_netdev_port_query_by_name(const struct dpif *dpif, const char *devname,
923                                struct dpif_port *dpif_port)
924 {
925     struct dp_netdev *dp = get_dp_netdev(dpif);
926     struct dp_netdev_port *port;
927     int error;
928
929     ovs_mutex_lock(&dp->port_mutex);
930     error = get_port_by_name(dp, devname, &port);
931     if (!error && dpif_port) {
932         answer_port_query(port, dpif_port);
933     }
934     ovs_mutex_unlock(&dp->port_mutex);
935
936     return error;
937 }
938
939 static void
940 dp_netdev_flow_free(struct dp_netdev_flow *flow)
941 {
942     struct dp_netdev_flow_stats *bucket;
943     size_t i;
944
945     OVSTHREAD_STATS_FOR_EACH_BUCKET (bucket, i, &flow->stats) {
946         ovs_mutex_destroy(&bucket->mutex);
947         free_cacheline(bucket);
948     }
949     ovsthread_stats_destroy(&flow->stats);
950
951     cls_rule_destroy(CONST_CAST(struct cls_rule *, &flow->cr));
952     dp_netdev_actions_free(dp_netdev_flow_get_actions(flow));
953     free(flow);
954 }
955
956 static void
957 dp_netdev_remove_flow(struct dp_netdev *dp, struct dp_netdev_flow *flow)
958     OVS_REQ_WRLOCK(dp->cls.rwlock)
959     OVS_REQUIRES(dp->flow_mutex)
960 {
961     struct cls_rule *cr = CONST_CAST(struct cls_rule *, &flow->cr);
962     struct cmap_node *node = CONST_CAST(struct cmap_node *, &flow->node);
963
964     classifier_remove(&dp->cls, cr);
965     cmap_remove(&dp->flow_table, node, flow_hash(&flow->flow, 0));
966     ovsrcu_postpone(dp_netdev_flow_free, flow);
967 }
968
969 static void
970 dp_netdev_flow_flush(struct dp_netdev *dp)
971 {
972     struct dp_netdev_flow *netdev_flow, *next;
973
974     ovs_mutex_lock(&dp->flow_mutex);
975     fat_rwlock_wrlock(&dp->cls.rwlock);
976     CMAP_FOR_EACH_SAFE (netdev_flow, next, node, &dp->flow_table) {
977         dp_netdev_remove_flow(dp, netdev_flow);
978     }
979     fat_rwlock_unlock(&dp->cls.rwlock);
980     ovs_mutex_unlock(&dp->flow_mutex);
981 }
982
983 static int
984 dpif_netdev_flow_flush(struct dpif *dpif)
985 {
986     struct dp_netdev *dp = get_dp_netdev(dpif);
987
988     dp_netdev_flow_flush(dp);
989     return 0;
990 }
991
992 struct dp_netdev_port_state {
993     struct cmap_position position;
994     char *name;
995 };
996
997 static int
998 dpif_netdev_port_dump_start(const struct dpif *dpif OVS_UNUSED, void **statep)
999 {
1000     *statep = xzalloc(sizeof(struct dp_netdev_port_state));
1001     return 0;
1002 }
1003
1004 static int
1005 dpif_netdev_port_dump_next(const struct dpif *dpif, void *state_,
1006                            struct dpif_port *dpif_port)
1007 {
1008     struct dp_netdev_port_state *state = state_;
1009     struct dp_netdev *dp = get_dp_netdev(dpif);
1010     struct cmap_node *node;
1011     int retval;
1012
1013     node = cmap_next_position(&dp->ports, &state->position);
1014     if (node) {
1015         struct dp_netdev_port *port;
1016
1017         port = CONTAINER_OF(node, struct dp_netdev_port, node);
1018
1019         free(state->name);
1020         state->name = xstrdup(netdev_get_name(port->netdev));
1021         dpif_port->name = state->name;
1022         dpif_port->type = port->type;
1023         dpif_port->port_no = port->port_no;
1024
1025         retval = 0;
1026     } else {
1027         retval = EOF;
1028     }
1029
1030     return retval;
1031 }
1032
1033 static int
1034 dpif_netdev_port_dump_done(const struct dpif *dpif OVS_UNUSED, void *state_)
1035 {
1036     struct dp_netdev_port_state *state = state_;
1037     free(state->name);
1038     free(state);
1039     return 0;
1040 }
1041
1042 static int
1043 dpif_netdev_port_poll(const struct dpif *dpif_, char **devnamep OVS_UNUSED)
1044 {
1045     struct dpif_netdev *dpif = dpif_netdev_cast(dpif_);
1046     uint64_t new_port_seq;
1047     int error;
1048
1049     new_port_seq = seq_read(dpif->dp->port_seq);
1050     if (dpif->last_port_seq != new_port_seq) {
1051         dpif->last_port_seq = new_port_seq;
1052         error = ENOBUFS;
1053     } else {
1054         error = EAGAIN;
1055     }
1056
1057     return error;
1058 }
1059
1060 static void
1061 dpif_netdev_port_poll_wait(const struct dpif *dpif_)
1062 {
1063     struct dpif_netdev *dpif = dpif_netdev_cast(dpif_);
1064
1065     seq_wait(dpif->dp->port_seq, dpif->last_port_seq);
1066 }
1067
1068 static struct dp_netdev_flow *
1069 dp_netdev_flow_cast(const struct cls_rule *cr)
1070 {
1071     return cr ? CONTAINER_OF(cr, struct dp_netdev_flow, cr) : NULL;
1072 }
1073
1074 static struct dp_netdev_flow *
1075 dp_netdev_lookup_flow(const struct dp_netdev *dp, const struct miniflow *key)
1076     OVS_REQ_RDLOCK(dp->cls.rwlock)
1077 {
1078     struct dp_netdev_flow *netdev_flow;
1079     struct cls_rule *rule;
1080
1081     classifier_lookup_miniflow_batch(&dp->cls, &key, &rule, 1);
1082     netdev_flow = dp_netdev_flow_cast(rule);
1083
1084     return netdev_flow;
1085 }
1086
1087 static struct dp_netdev_flow *
1088 dp_netdev_find_flow(const struct dp_netdev *dp, const struct flow *flow)
1089 {
1090     struct dp_netdev_flow *netdev_flow;
1091
1092     CMAP_FOR_EACH_WITH_HASH (netdev_flow, node, flow_hash(flow, 0),
1093                              &dp->flow_table) {
1094         if (flow_equal(&netdev_flow->flow, flow)) {
1095             return netdev_flow;
1096         }
1097     }
1098
1099     return NULL;
1100 }
1101
1102 static void
1103 get_dpif_flow_stats(struct dp_netdev_flow *netdev_flow,
1104                     struct dpif_flow_stats *stats)
1105 {
1106     struct dp_netdev_flow_stats *bucket;
1107     size_t i;
1108
1109     memset(stats, 0, sizeof *stats);
1110     OVSTHREAD_STATS_FOR_EACH_BUCKET (bucket, i, &netdev_flow->stats) {
1111         ovs_mutex_lock(&bucket->mutex);
1112         stats->n_packets += bucket->packet_count;
1113         stats->n_bytes += bucket->byte_count;
1114         stats->used = MAX(stats->used, bucket->used);
1115         stats->tcp_flags |= bucket->tcp_flags;
1116         ovs_mutex_unlock(&bucket->mutex);
1117     }
1118 }
1119
1120 static int
1121 dpif_netdev_mask_from_nlattrs(const struct nlattr *key, uint32_t key_len,
1122                               const struct nlattr *mask_key,
1123                               uint32_t mask_key_len, const struct flow *flow,
1124                               struct flow *mask)
1125 {
1126     if (mask_key_len) {
1127         enum odp_key_fitness fitness;
1128
1129         fitness = odp_flow_key_to_mask(mask_key, mask_key_len, mask, flow);
1130         if (fitness) {
1131             /* This should not happen: it indicates that
1132              * odp_flow_key_from_mask() and odp_flow_key_to_mask()
1133              * disagree on the acceptable form of a mask.  Log the problem
1134              * as an error, with enough details to enable debugging. */
1135             static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
1136
1137             if (!VLOG_DROP_ERR(&rl)) {
1138                 struct ds s;
1139
1140                 ds_init(&s);
1141                 odp_flow_format(key, key_len, mask_key, mask_key_len, NULL, &s,
1142                                 true);
1143                 VLOG_ERR("internal error parsing flow mask %s (%s)",
1144                          ds_cstr(&s), odp_key_fitness_to_string(fitness));
1145                 ds_destroy(&s);
1146             }
1147
1148             return EINVAL;
1149         }
1150     } else {
1151         enum mf_field_id id;
1152         /* No mask key, unwildcard everything except fields whose
1153          * prerequisities are not met. */
1154         memset(mask, 0x0, sizeof *mask);
1155
1156         for (id = 0; id < MFF_N_IDS; ++id) {
1157             /* Skip registers and metadata. */
1158             if (!(id >= MFF_REG0 && id < MFF_REG0 + FLOW_N_REGS)
1159                 && id != MFF_METADATA) {
1160                 const struct mf_field *mf = mf_from_id(id);
1161                 if (mf_are_prereqs_ok(mf, flow)) {
1162                     mf_mask_field(mf, mask);
1163                 }
1164             }
1165         }
1166     }
1167
1168     /* Force unwildcard the in_port.
1169      *
1170      * We need to do this even in the case where we unwildcard "everything"
1171      * above because "everything" only includes the 16-bit OpenFlow port number
1172      * mask->in_port.ofp_port, which only covers half of the 32-bit datapath
1173      * port number mask->in_port.odp_port. */
1174     mask->in_port.odp_port = u32_to_odp(UINT32_MAX);
1175
1176     return 0;
1177 }
1178
1179 static int
1180 dpif_netdev_flow_from_nlattrs(const struct nlattr *key, uint32_t key_len,
1181                               struct flow *flow)
1182 {
1183     odp_port_t in_port;
1184
1185     if (odp_flow_key_to_flow(key, key_len, flow)) {
1186         /* This should not happen: it indicates that odp_flow_key_from_flow()
1187          * and odp_flow_key_to_flow() disagree on the acceptable form of a
1188          * flow.  Log the problem as an error, with enough details to enable
1189          * debugging. */
1190         static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
1191
1192         if (!VLOG_DROP_ERR(&rl)) {
1193             struct ds s;
1194
1195             ds_init(&s);
1196             odp_flow_format(key, key_len, NULL, 0, NULL, &s, true);
1197             VLOG_ERR("internal error parsing flow key %s", ds_cstr(&s));
1198             ds_destroy(&s);
1199         }
1200
1201         return EINVAL;
1202     }
1203
1204     in_port = flow->in_port.odp_port;
1205     if (!is_valid_port_number(in_port) && in_port != ODPP_NONE) {
1206         return EINVAL;
1207     }
1208
1209     return 0;
1210 }
1211
1212 static int
1213 dpif_netdev_flow_get(const struct dpif *dpif,
1214                      const struct nlattr *nl_key, size_t nl_key_len,
1215                      struct ofpbuf **actionsp, struct dpif_flow_stats *stats)
1216 {
1217     struct dp_netdev *dp = get_dp_netdev(dpif);
1218     struct dp_netdev_flow *netdev_flow;
1219     struct flow key;
1220     int error;
1221
1222     error = dpif_netdev_flow_from_nlattrs(nl_key, nl_key_len, &key);
1223     if (error) {
1224         return error;
1225     }
1226
1227     netdev_flow = dp_netdev_find_flow(dp, &key);
1228
1229     if (netdev_flow) {
1230         if (stats) {
1231             get_dpif_flow_stats(netdev_flow, stats);
1232         }
1233
1234         if (actionsp) {
1235             struct dp_netdev_actions *actions;
1236
1237             actions = dp_netdev_flow_get_actions(netdev_flow);
1238             *actionsp = ofpbuf_clone_data(actions->actions, actions->size);
1239         }
1240      } else {
1241         error = ENOENT;
1242     }
1243
1244     return error;
1245 }
1246
1247 static int
1248 dp_netdev_flow_add(struct dp_netdev *dp, const struct flow *flow,
1249                    const struct flow_wildcards *wc,
1250                    const struct nlattr *actions,
1251                    size_t actions_len)
1252     OVS_REQUIRES(dp->flow_mutex)
1253 {
1254     struct dp_netdev_flow *netdev_flow;
1255     struct match match;
1256
1257     netdev_flow = xzalloc(sizeof *netdev_flow);
1258     *CONST_CAST(struct flow *, &netdev_flow->flow) = *flow;
1259
1260     ovsthread_stats_init(&netdev_flow->stats);
1261
1262     ovsrcu_set(&netdev_flow->actions,
1263                dp_netdev_actions_create(actions, actions_len));
1264
1265     match_init(&match, flow, wc);
1266     cls_rule_init(CONST_CAST(struct cls_rule *, &netdev_flow->cr),
1267                   &match, NETDEV_RULE_PRIORITY);
1268     cmap_insert(&dp->flow_table,
1269                 CONST_CAST(struct cmap_node *, &netdev_flow->node),
1270                 flow_hash(flow, 0));
1271     fat_rwlock_wrlock(&dp->cls.rwlock);
1272     classifier_insert(&dp->cls,
1273                       CONST_CAST(struct cls_rule *, &netdev_flow->cr));
1274     fat_rwlock_unlock(&dp->cls.rwlock);
1275
1276     return 0;
1277 }
1278
1279 static void
1280 clear_stats(struct dp_netdev_flow *netdev_flow)
1281 {
1282     struct dp_netdev_flow_stats *bucket;
1283     size_t i;
1284
1285     OVSTHREAD_STATS_FOR_EACH_BUCKET (bucket, i, &netdev_flow->stats) {
1286         ovs_mutex_lock(&bucket->mutex);
1287         bucket->used = 0;
1288         bucket->packet_count = 0;
1289         bucket->byte_count = 0;
1290         bucket->tcp_flags = 0;
1291         ovs_mutex_unlock(&bucket->mutex);
1292     }
1293 }
1294
1295 static int
1296 dpif_netdev_flow_put(struct dpif *dpif, const struct dpif_flow_put *put)
1297 {
1298     struct dp_netdev *dp = get_dp_netdev(dpif);
1299     struct dp_netdev_flow *netdev_flow;
1300     struct flow flow;
1301     struct miniflow miniflow;
1302     struct flow_wildcards wc;
1303     int error;
1304
1305     error = dpif_netdev_flow_from_nlattrs(put->key, put->key_len, &flow);
1306     if (error) {
1307         return error;
1308     }
1309     error = dpif_netdev_mask_from_nlattrs(put->key, put->key_len,
1310                                           put->mask, put->mask_len,
1311                                           &flow, &wc.masks);
1312     if (error) {
1313         return error;
1314     }
1315     miniflow_init(&miniflow, &flow);
1316
1317     ovs_mutex_lock(&dp->flow_mutex);
1318     fat_rwlock_rdlock(&dp->cls.rwlock);
1319     netdev_flow = dp_netdev_lookup_flow(dp, &miniflow);
1320     fat_rwlock_unlock(&dp->cls.rwlock);
1321     if (!netdev_flow) {
1322         if (put->flags & DPIF_FP_CREATE) {
1323             if (cmap_count(&dp->flow_table) < MAX_FLOWS) {
1324                 if (put->stats) {
1325                     memset(put->stats, 0, sizeof *put->stats);
1326                 }
1327                 error = dp_netdev_flow_add(dp, &flow, &wc, put->actions,
1328                                            put->actions_len);
1329             } else {
1330                 error = EFBIG;
1331             }
1332         } else {
1333             error = ENOENT;
1334         }
1335     } else {
1336         if (put->flags & DPIF_FP_MODIFY
1337             && flow_equal(&flow, &netdev_flow->flow)) {
1338             struct dp_netdev_actions *new_actions;
1339             struct dp_netdev_actions *old_actions;
1340
1341             new_actions = dp_netdev_actions_create(put->actions,
1342                                                    put->actions_len);
1343
1344             old_actions = dp_netdev_flow_get_actions(netdev_flow);
1345             ovsrcu_set(&netdev_flow->actions, new_actions);
1346
1347             if (put->stats) {
1348                 get_dpif_flow_stats(netdev_flow, put->stats);
1349             }
1350             if (put->flags & DPIF_FP_ZERO_STATS) {
1351                 clear_stats(netdev_flow);
1352             }
1353
1354             ovsrcu_postpone(dp_netdev_actions_free, old_actions);
1355         } else if (put->flags & DPIF_FP_CREATE) {
1356             error = EEXIST;
1357         } else {
1358             /* Overlapping flow. */
1359             error = EINVAL;
1360         }
1361     }
1362     ovs_mutex_unlock(&dp->flow_mutex);
1363     miniflow_destroy(&miniflow);
1364
1365     return error;
1366 }
1367
1368 static int
1369 dpif_netdev_flow_del(struct dpif *dpif, const struct dpif_flow_del *del)
1370 {
1371     struct dp_netdev *dp = get_dp_netdev(dpif);
1372     struct dp_netdev_flow *netdev_flow;
1373     struct flow key;
1374     int error;
1375
1376     error = dpif_netdev_flow_from_nlattrs(del->key, del->key_len, &key);
1377     if (error) {
1378         return error;
1379     }
1380
1381     ovs_mutex_lock(&dp->flow_mutex);
1382     netdev_flow = dp_netdev_find_flow(dp, &key);
1383     if (netdev_flow) {
1384         if (del->stats) {
1385             get_dpif_flow_stats(netdev_flow, del->stats);
1386         }
1387         fat_rwlock_wrlock(&dp->cls.rwlock);
1388         dp_netdev_remove_flow(dp, netdev_flow);
1389         fat_rwlock_unlock(&dp->cls.rwlock);
1390     } else {
1391         error = ENOENT;
1392     }
1393     ovs_mutex_unlock(&dp->flow_mutex);
1394
1395     return error;
1396 }
1397
1398 struct dpif_netdev_flow_dump {
1399     struct dpif_flow_dump up;
1400     struct cmap_position pos;
1401     int status;
1402     struct ovs_mutex mutex;
1403 };
1404
1405 static struct dpif_netdev_flow_dump *
1406 dpif_netdev_flow_dump_cast(struct dpif_flow_dump *dump)
1407 {
1408     return CONTAINER_OF(dump, struct dpif_netdev_flow_dump, up);
1409 }
1410
1411 static struct dpif_flow_dump *
1412 dpif_netdev_flow_dump_create(const struct dpif *dpif_)
1413 {
1414     struct dpif_netdev_flow_dump *dump;
1415
1416     dump = xmalloc(sizeof *dump);
1417     dpif_flow_dump_init(&dump->up, dpif_);
1418     memset(&dump->pos, 0, sizeof dump->pos);
1419     dump->status = 0;
1420     ovs_mutex_init(&dump->mutex);
1421
1422     return &dump->up;
1423 }
1424
1425 static int
1426 dpif_netdev_flow_dump_destroy(struct dpif_flow_dump *dump_)
1427 {
1428     struct dpif_netdev_flow_dump *dump = dpif_netdev_flow_dump_cast(dump_);
1429
1430     ovs_mutex_destroy(&dump->mutex);
1431     free(dump);
1432     return 0;
1433 }
1434
1435 struct dpif_netdev_flow_dump_thread {
1436     struct dpif_flow_dump_thread up;
1437     struct dpif_netdev_flow_dump *dump;
1438     struct odputil_keybuf keybuf[FLOW_DUMP_MAX_BATCH];
1439     struct odputil_keybuf maskbuf[FLOW_DUMP_MAX_BATCH];
1440 };
1441
1442 static struct dpif_netdev_flow_dump_thread *
1443 dpif_netdev_flow_dump_thread_cast(struct dpif_flow_dump_thread *thread)
1444 {
1445     return CONTAINER_OF(thread, struct dpif_netdev_flow_dump_thread, up);
1446 }
1447
1448 static struct dpif_flow_dump_thread *
1449 dpif_netdev_flow_dump_thread_create(struct dpif_flow_dump *dump_)
1450 {
1451     struct dpif_netdev_flow_dump *dump = dpif_netdev_flow_dump_cast(dump_);
1452     struct dpif_netdev_flow_dump_thread *thread;
1453
1454     thread = xmalloc(sizeof *thread);
1455     dpif_flow_dump_thread_init(&thread->up, &dump->up);
1456     thread->dump = dump;
1457     return &thread->up;
1458 }
1459
1460 static void
1461 dpif_netdev_flow_dump_thread_destroy(struct dpif_flow_dump_thread *thread_)
1462 {
1463     struct dpif_netdev_flow_dump_thread *thread
1464         = dpif_netdev_flow_dump_thread_cast(thread_);
1465
1466     free(thread);
1467 }
1468
1469 /* XXX the caller must use 'actions' without quiescing */
1470 static int
1471 dpif_netdev_flow_dump_next(struct dpif_flow_dump_thread *thread_,
1472                            struct dpif_flow *flows, int max_flows)
1473 {
1474     struct dpif_netdev_flow_dump_thread *thread
1475         = dpif_netdev_flow_dump_thread_cast(thread_);
1476     struct dpif_netdev_flow_dump *dump = thread->dump;
1477     struct dpif_netdev *dpif = dpif_netdev_cast(thread->up.dpif);
1478     struct dp_netdev_flow *netdev_flows[FLOW_DUMP_MAX_BATCH];
1479     struct dp_netdev *dp = get_dp_netdev(&dpif->dpif);
1480     int n_flows = 0;
1481     int i;
1482
1483     ovs_mutex_lock(&dump->mutex);
1484     if (!dump->status) {
1485         for (n_flows = 0; n_flows < MIN(max_flows, FLOW_DUMP_MAX_BATCH);
1486              n_flows++) {
1487             struct cmap_node *node;
1488
1489             node = cmap_next_position(&dp->flow_table, &dump->pos);
1490             if (!node) {
1491                 dump->status = EOF;
1492                 break;
1493             }
1494             netdev_flows[n_flows] = CONTAINER_OF(node, struct dp_netdev_flow,
1495                                                  node);
1496         }
1497     }
1498     ovs_mutex_unlock(&dump->mutex);
1499
1500     for (i = 0; i < n_flows; i++) {
1501         struct odputil_keybuf *maskbuf = &thread->maskbuf[i];
1502         struct odputil_keybuf *keybuf = &thread->keybuf[i];
1503         struct dp_netdev_flow *netdev_flow = netdev_flows[i];
1504         struct dpif_flow *f = &flows[i];
1505         struct dp_netdev_actions *dp_actions;
1506         struct flow_wildcards wc;
1507         struct ofpbuf buf;
1508
1509         minimask_expand(&netdev_flow->cr.match.mask, &wc);
1510
1511         /* Key. */
1512         ofpbuf_use_stack(&buf, keybuf, sizeof *keybuf);
1513         odp_flow_key_from_flow(&buf, &netdev_flow->flow, &wc.masks,
1514                                netdev_flow->flow.in_port.odp_port, true);
1515         f->key = ofpbuf_data(&buf);
1516         f->key_len = ofpbuf_size(&buf);
1517
1518         /* Mask. */
1519         ofpbuf_use_stack(&buf, maskbuf, sizeof *maskbuf);
1520         odp_flow_key_from_mask(&buf, &wc.masks, &netdev_flow->flow,
1521                                odp_to_u32(wc.masks.in_port.odp_port),
1522                                SIZE_MAX, true);
1523         f->mask = ofpbuf_data(&buf);
1524         f->mask_len = ofpbuf_size(&buf);
1525
1526         /* Actions. */
1527         dp_actions = dp_netdev_flow_get_actions(netdev_flow);
1528         f->actions = dp_actions->actions;
1529         f->actions_len = dp_actions->size;
1530
1531         /* Stats. */
1532         get_dpif_flow_stats(netdev_flow, &f->stats);
1533     }
1534
1535     return n_flows;
1536 }
1537
1538 static int
1539 dpif_netdev_execute(struct dpif *dpif, struct dpif_execute *execute)
1540 {
1541     struct dp_netdev *dp = get_dp_netdev(dpif);
1542     struct dpif_packet packet, *pp;
1543     struct pkt_metadata *md = &execute->md;
1544
1545     if (ofpbuf_size(execute->packet) < ETH_HEADER_LEN ||
1546         ofpbuf_size(execute->packet) > UINT16_MAX) {
1547         return EINVAL;
1548     }
1549
1550     packet.ofpbuf = *execute->packet;
1551     pp = &packet;
1552
1553     dp_netdev_execute_actions(dp, &pp, 1, false, md,
1554                               execute->actions, execute->actions_len);
1555
1556     /* Even though may_steal is set to false, some actions could modify or
1557      * reallocate the ofpbuf memory. We need to pass those changes to the
1558      * caller */
1559     *execute->packet = packet.ofpbuf;
1560
1561     return 0;
1562 }
1563
1564 static void
1565 dp_netdev_destroy_all_queues(struct dp_netdev *dp)
1566     OVS_REQ_WRLOCK(dp->queue_rwlock)
1567 {
1568     size_t i;
1569
1570     dp_netdev_purge_queues(dp);
1571
1572     for (i = 0; i < dp->n_handlers; i++) {
1573         struct dp_netdev_queue *q = &dp->handler_queues[i];
1574
1575         ovs_mutex_destroy(&q->mutex);
1576         seq_destroy(q->seq);
1577     }
1578     free(dp->handler_queues);
1579     dp->handler_queues = NULL;
1580     dp->n_handlers = 0;
1581 }
1582
1583 static void
1584 dp_netdev_refresh_queues(struct dp_netdev *dp, uint32_t n_handlers)
1585     OVS_REQ_WRLOCK(dp->queue_rwlock)
1586 {
1587     if (dp->n_handlers != n_handlers) {
1588         size_t i;
1589
1590         dp_netdev_destroy_all_queues(dp);
1591
1592         dp->n_handlers = n_handlers;
1593         dp->handler_queues = xzalloc(n_handlers * sizeof *dp->handler_queues);
1594
1595         for (i = 0; i < n_handlers; i++) {
1596             struct dp_netdev_queue *q = &dp->handler_queues[i];
1597
1598             ovs_mutex_init(&q->mutex);
1599             q->seq = seq_create();
1600         }
1601     }
1602 }
1603
1604 static int
1605 dpif_netdev_recv_set(struct dpif *dpif, bool enable)
1606 {
1607     struct dp_netdev *dp = get_dp_netdev(dpif);
1608
1609     if ((dp->handler_queues != NULL) == enable) {
1610         return 0;
1611     }
1612
1613     fat_rwlock_wrlock(&dp->queue_rwlock);
1614     if (!enable) {
1615         dp_netdev_destroy_all_queues(dp);
1616     } else {
1617         dp_netdev_refresh_queues(dp, 1);
1618     }
1619     fat_rwlock_unlock(&dp->queue_rwlock);
1620
1621     return 0;
1622 }
1623
1624 static int
1625 dpif_netdev_handlers_set(struct dpif *dpif, uint32_t n_handlers)
1626 {
1627     struct dp_netdev *dp = get_dp_netdev(dpif);
1628
1629     fat_rwlock_wrlock(&dp->queue_rwlock);
1630     if (dp->handler_queues) {
1631         dp_netdev_refresh_queues(dp, n_handlers);
1632     }
1633     fat_rwlock_unlock(&dp->queue_rwlock);
1634
1635     return 0;
1636 }
1637
1638 static int
1639 dpif_netdev_queue_to_priority(const struct dpif *dpif OVS_UNUSED,
1640                               uint32_t queue_id, uint32_t *priority)
1641 {
1642     *priority = queue_id;
1643     return 0;
1644 }
1645
1646 static bool
1647 dp_netdev_recv_check(const struct dp_netdev *dp, const uint32_t handler_id)
1648     OVS_REQ_RDLOCK(dp->queue_rwlock)
1649 {
1650     static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
1651
1652     if (!dp->handler_queues) {
1653         VLOG_WARN_RL(&rl, "receiving upcall disabled");
1654         return false;
1655     }
1656
1657     if (handler_id >= dp->n_handlers) {
1658         VLOG_WARN_RL(&rl, "handler index out of bound");
1659         return false;
1660     }
1661
1662     return true;
1663 }
1664
1665 static int
1666 dpif_netdev_recv(struct dpif *dpif, uint32_t handler_id,
1667                  struct dpif_upcall *upcall, struct ofpbuf *buf)
1668 {
1669     struct dp_netdev *dp = get_dp_netdev(dpif);
1670     struct dp_netdev_queue *q;
1671     int error = 0;
1672
1673     fat_rwlock_rdlock(&dp->queue_rwlock);
1674
1675     if (!dp_netdev_recv_check(dp, handler_id)) {
1676         error = EAGAIN;
1677         goto out;
1678     }
1679
1680     q = &dp->handler_queues[handler_id];
1681     ovs_mutex_lock(&q->mutex);
1682     if (q->head != q->tail) {
1683         struct dp_netdev_upcall *u = &q->upcalls[q->tail++ & QUEUE_MASK];
1684
1685         *upcall = u->upcall;
1686
1687         ofpbuf_uninit(buf);
1688         *buf = u->buf;
1689     } else {
1690         error = EAGAIN;
1691     }
1692     ovs_mutex_unlock(&q->mutex);
1693
1694 out:
1695     fat_rwlock_unlock(&dp->queue_rwlock);
1696
1697     return error;
1698 }
1699
1700 static void
1701 dpif_netdev_recv_wait(struct dpif *dpif, uint32_t handler_id)
1702 {
1703     struct dp_netdev *dp = get_dp_netdev(dpif);
1704     struct dp_netdev_queue *q;
1705     uint64_t seq;
1706
1707     fat_rwlock_rdlock(&dp->queue_rwlock);
1708
1709     if (!dp_netdev_recv_check(dp, handler_id)) {
1710         goto out;
1711     }
1712
1713     q = &dp->handler_queues[handler_id];
1714     ovs_mutex_lock(&q->mutex);
1715     seq = seq_read(q->seq);
1716     if (q->head != q->tail) {
1717         poll_immediate_wake();
1718     } else {
1719         seq_wait(q->seq, seq);
1720     }
1721
1722     ovs_mutex_unlock(&q->mutex);
1723
1724 out:
1725     fat_rwlock_unlock(&dp->queue_rwlock);
1726 }
1727
1728 static void
1729 dpif_netdev_recv_purge(struct dpif *dpif)
1730 {
1731     struct dpif_netdev *dpif_netdev = dpif_netdev_cast(dpif);
1732
1733     fat_rwlock_wrlock(&dpif_netdev->dp->queue_rwlock);
1734     dp_netdev_purge_queues(dpif_netdev->dp);
1735     fat_rwlock_unlock(&dpif_netdev->dp->queue_rwlock);
1736 }
1737 \f
1738 /* Creates and returns a new 'struct dp_netdev_actions', with a reference count
1739  * of 1, whose actions are a copy of from the 'ofpacts_len' bytes of
1740  * 'ofpacts'. */
1741 struct dp_netdev_actions *
1742 dp_netdev_actions_create(const struct nlattr *actions, size_t size)
1743 {
1744     struct dp_netdev_actions *netdev_actions;
1745
1746     netdev_actions = xmalloc(sizeof *netdev_actions);
1747     netdev_actions->actions = xmemdup(actions, size);
1748     netdev_actions->size = size;
1749
1750     return netdev_actions;
1751 }
1752
1753 struct dp_netdev_actions *
1754 dp_netdev_flow_get_actions(const struct dp_netdev_flow *flow)
1755 {
1756     return ovsrcu_get(struct dp_netdev_actions *, &flow->actions);
1757 }
1758
1759 static void
1760 dp_netdev_actions_free(struct dp_netdev_actions *actions)
1761 {
1762     free(actions->actions);
1763     free(actions);
1764 }
1765 \f
1766
1767 static void
1768 dp_netdev_process_rxq_port(struct dp_netdev *dp,
1769                           struct dp_netdev_port *port,
1770                           struct netdev_rxq *rxq)
1771 {
1772     struct dpif_packet *packets[NETDEV_MAX_RX_BATCH];
1773     int error, cnt;
1774
1775     error = netdev_rxq_recv(rxq, packets, &cnt);
1776     if (!error) {
1777         dp_netdev_port_input(dp, packets, cnt, port->port_no);
1778     } else if (error != EAGAIN && error != EOPNOTSUPP) {
1779         static struct vlog_rate_limit rl
1780             = VLOG_RATE_LIMIT_INIT(1, 5);
1781
1782         VLOG_ERR_RL(&rl, "error receiving data from %s: %s",
1783                     netdev_get_name(port->netdev),
1784                     ovs_strerror(error));
1785     }
1786 }
1787
1788 static void
1789 dpif_netdev_run(struct dpif *dpif)
1790 {
1791     struct dp_netdev_port *port;
1792     struct dp_netdev *dp = get_dp_netdev(dpif);
1793
1794     CMAP_FOR_EACH (port, node, &dp->ports) {
1795         if (!netdev_is_pmd(port->netdev)) {
1796             int i;
1797
1798             for (i = 0; i < netdev_n_rxq(port->netdev); i++) {
1799                 dp_netdev_process_rxq_port(dp, port, port->rxq[i]);
1800             }
1801         }
1802     }
1803 }
1804
1805 static void
1806 dpif_netdev_wait(struct dpif *dpif)
1807 {
1808     struct dp_netdev_port *port;
1809     struct dp_netdev *dp = get_dp_netdev(dpif);
1810
1811     ovs_mutex_lock(&dp_netdev_mutex);
1812     CMAP_FOR_EACH (port, node, &dp->ports) {
1813         if (!netdev_is_pmd(port->netdev)) {
1814             int i;
1815
1816             for (i = 0; i < netdev_n_rxq(port->netdev); i++) {
1817                 netdev_rxq_wait(port->rxq[i]);
1818             }
1819         }
1820     }
1821     ovs_mutex_unlock(&dp_netdev_mutex);
1822 }
1823
1824 struct rxq_poll {
1825     struct dp_netdev_port *port;
1826     struct netdev_rxq *rx;
1827 };
1828
1829 static int
1830 pmd_load_queues(struct pmd_thread *f,
1831                 struct rxq_poll **ppoll_list, int poll_cnt)
1832 {
1833     struct dp_netdev *dp = f->dp;
1834     struct rxq_poll *poll_list = *ppoll_list;
1835     struct dp_netdev_port *port;
1836     int id = f->id;
1837     int index;
1838     int i;
1839
1840     /* Simple scheduler for netdev rx polling. */
1841     for (i = 0; i < poll_cnt; i++) {
1842          port_unref(poll_list[i].port);
1843     }
1844
1845     poll_cnt = 0;
1846     index = 0;
1847
1848     CMAP_FOR_EACH (port, node, &f->dp->ports) {
1849         if (netdev_is_pmd(port->netdev)) {
1850             int i;
1851
1852             for (i = 0; i < netdev_n_rxq(port->netdev); i++) {
1853                 if ((index % dp->n_pmd_threads) == id) {
1854                     poll_list = xrealloc(poll_list, sizeof *poll_list * (poll_cnt + 1));
1855
1856                     port_ref(port);
1857                     poll_list[poll_cnt].port = port;
1858                     poll_list[poll_cnt].rx = port->rxq[i];
1859                     poll_cnt++;
1860                 }
1861                 index++;
1862             }
1863         }
1864     }
1865
1866     *ppoll_list = poll_list;
1867     return poll_cnt;
1868 }
1869
1870 static void *
1871 pmd_thread_main(void *f_)
1872 {
1873     struct pmd_thread *f = f_;
1874     struct dp_netdev *dp = f->dp;
1875     unsigned int lc = 0;
1876     struct rxq_poll *poll_list;
1877     unsigned int port_seq;
1878     int poll_cnt;
1879     int i;
1880
1881     poll_cnt = 0;
1882     poll_list = NULL;
1883
1884     pmd_thread_setaffinity_cpu(f->id);
1885 reload:
1886     poll_cnt = pmd_load_queues(f, &poll_list, poll_cnt);
1887     atomic_read(&f->change_seq, &port_seq);
1888
1889     for (;;) {
1890         unsigned int c_port_seq;
1891         int i;
1892
1893         for (i = 0; i < poll_cnt; i++) {
1894             dp_netdev_process_rxq_port(dp,  poll_list[i].port, poll_list[i].rx);
1895         }
1896
1897         if (lc++ > 1024) {
1898             ovsrcu_quiesce();
1899
1900             /* TODO: need completely userspace based signaling method.
1901              * to keep this thread entirely in userspace.
1902              * For now using atomic counter. */
1903             lc = 0;
1904             atomic_read_explicit(&f->change_seq, &c_port_seq, memory_order_consume);
1905             if (c_port_seq != port_seq) {
1906                 break;
1907             }
1908         }
1909     }
1910
1911     if (!latch_is_set(&f->dp->exit_latch)){
1912         goto reload;
1913     }
1914
1915     for (i = 0; i < poll_cnt; i++) {
1916          port_unref(poll_list[i].port);
1917     }
1918
1919     free(poll_list);
1920     return NULL;
1921 }
1922
1923 static void
1924 dp_netdev_set_pmd_threads(struct dp_netdev *dp, int n)
1925 {
1926     int i;
1927
1928     if (n == dp->n_pmd_threads) {
1929         return;
1930     }
1931
1932     /* Stop existing threads. */
1933     latch_set(&dp->exit_latch);
1934     dp_netdev_reload_pmd_threads(dp);
1935     for (i = 0; i < dp->n_pmd_threads; i++) {
1936         struct pmd_thread *f = &dp->pmd_threads[i];
1937
1938         xpthread_join(f->thread, NULL);
1939     }
1940     latch_poll(&dp->exit_latch);
1941     free(dp->pmd_threads);
1942
1943     /* Start new threads. */
1944     dp->pmd_threads = xmalloc(n * sizeof *dp->pmd_threads);
1945     dp->n_pmd_threads = n;
1946
1947     for (i = 0; i < n; i++) {
1948         struct pmd_thread *f = &dp->pmd_threads[i];
1949
1950         f->dp = dp;
1951         f->id = i;
1952         atomic_store(&f->change_seq, 1);
1953
1954         /* Each thread will distribute all devices rx-queues among
1955          * themselves. */
1956         f->thread = ovs_thread_create("pmd", pmd_thread_main, f);
1957     }
1958 }
1959
1960 \f
1961 static void *
1962 dp_netdev_flow_stats_new_cb(void)
1963 {
1964     struct dp_netdev_flow_stats *bucket = xzalloc_cacheline(sizeof *bucket);
1965     ovs_mutex_init(&bucket->mutex);
1966     return bucket;
1967 }
1968
1969 static void
1970 dp_netdev_flow_used(struct dp_netdev_flow *netdev_flow,
1971                     int cnt, int size,
1972                     uint16_t tcp_flags)
1973 {
1974     long long int now = time_msec();
1975     struct dp_netdev_flow_stats *bucket;
1976
1977     bucket = ovsthread_stats_bucket_get(&netdev_flow->stats,
1978                                         dp_netdev_flow_stats_new_cb);
1979
1980     ovs_mutex_lock(&bucket->mutex);
1981     bucket->used = MAX(now, bucket->used);
1982     bucket->packet_count += cnt;
1983     bucket->byte_count += size;
1984     bucket->tcp_flags |= tcp_flags;
1985     ovs_mutex_unlock(&bucket->mutex);
1986 }
1987
1988 static void *
1989 dp_netdev_stats_new_cb(void)
1990 {
1991     struct dp_netdev_stats *bucket = xzalloc_cacheline(sizeof *bucket);
1992     ovs_mutex_init(&bucket->mutex);
1993     return bucket;
1994 }
1995
1996 static void
1997 dp_netdev_count_packet(struct dp_netdev *dp, enum dp_stat_type type, int cnt)
1998 {
1999     struct dp_netdev_stats *bucket;
2000
2001     bucket = ovsthread_stats_bucket_get(&dp->stats, dp_netdev_stats_new_cb);
2002     ovs_mutex_lock(&bucket->mutex);
2003     bucket->n[type] += cnt;
2004     ovs_mutex_unlock(&bucket->mutex);
2005 }
2006
2007 struct packet_batch {
2008     unsigned int packet_count;
2009     unsigned int byte_count;
2010     uint16_t tcp_flags;
2011
2012     struct dp_netdev_flow *flow;
2013
2014     struct dpif_packet *packets[NETDEV_MAX_RX_BATCH];
2015     struct pkt_metadata md;
2016 };
2017
2018 static inline void
2019 packet_batch_update(struct packet_batch *batch,
2020                     struct dpif_packet *packet, const struct miniflow *mf)
2021 {
2022     batch->tcp_flags |= miniflow_get_tcp_flags(mf);
2023     batch->packets[batch->packet_count++] = packet;
2024     batch->byte_count += ofpbuf_size(&packet->ofpbuf);
2025 }
2026
2027 static inline void
2028 packet_batch_init(struct packet_batch *batch, struct dp_netdev_flow *flow,
2029                   struct pkt_metadata *md)
2030 {
2031     batch->flow = flow;
2032     batch->md = *md;
2033
2034     batch->packet_count = 0;
2035     batch->byte_count = 0;
2036     batch->tcp_flags = 0;
2037 }
2038
2039 static inline void
2040 packet_batch_execute(struct packet_batch *batch, struct dp_netdev *dp)
2041 {
2042     struct dp_netdev_actions *actions;
2043     struct dp_netdev_flow *flow = batch->flow;
2044
2045     dp_netdev_flow_used(batch->flow, batch->packet_count, batch->byte_count,
2046                         batch->tcp_flags);
2047
2048     actions = dp_netdev_flow_get_actions(flow);
2049
2050     dp_netdev_execute_actions(dp, batch->packets,
2051                               batch->packet_count, true, &batch->md,
2052                               actions->actions, actions->size);
2053
2054     dp_netdev_count_packet(dp, DP_STAT_HIT, batch->packet_count);
2055 }
2056
2057 static void
2058 dp_netdev_input(struct dp_netdev *dp, struct dpif_packet **packets, int cnt,
2059                 struct pkt_metadata *md)
2060 {
2061     struct packet_batch batches[NETDEV_MAX_RX_BATCH];
2062     struct netdev_flow_key keys[NETDEV_MAX_RX_BATCH];
2063     const struct miniflow *mfs[NETDEV_MAX_RX_BATCH]; /* NULL at bad packets. */
2064     struct cls_rule *rules[NETDEV_MAX_RX_BATCH];
2065     size_t n_batches, i;
2066
2067     for (i = 0; i < cnt; i++) {
2068         if (OVS_UNLIKELY(ofpbuf_size(&packets[i]->ofpbuf) < ETH_HEADER_LEN)) {
2069             dpif_packet_delete(packets[i]);
2070             mfs[i] = NULL;
2071             continue;
2072         }
2073
2074         miniflow_initialize(&keys[i].flow, keys[i].buf);
2075         miniflow_extract(&packets[i]->ofpbuf, md, &keys[i].flow);
2076         mfs[i] = &keys[i].flow;
2077     }
2078
2079     fat_rwlock_rdlock(&dp->cls.rwlock);
2080     classifier_lookup_miniflow_batch(&dp->cls, mfs, rules, cnt);
2081     fat_rwlock_unlock(&dp->cls.rwlock);
2082
2083     n_batches = 0;
2084     for (i = 0; i < cnt; i++) {
2085         struct dp_netdev_flow *flow;
2086         struct packet_batch *batch;
2087         size_t j;
2088
2089         if (OVS_UNLIKELY(!mfs[i])) {
2090             continue;
2091         }
2092
2093         if (OVS_UNLIKELY(!rules[i])) {
2094             dp_netdev_count_packet(dp, DP_STAT_MISS, 1);
2095             if (OVS_LIKELY(dp->handler_queues)) {
2096                 uint32_t hash = miniflow_hash_5tuple(mfs[i], 0);
2097                 struct ofpbuf *buf = &packets[i]->ofpbuf;
2098
2099                 dp_netdev_output_userspace(dp, &buf, 1, hash % dp->n_handlers,
2100                                            DPIF_UC_MISS, mfs[i], NULL);
2101             } else {
2102                 /* No upcall queue.  Freeing the packet */
2103                 dpif_packet_delete(packets[i]);
2104             }
2105             continue;
2106         }
2107
2108         /* XXX: This O(n^2) algortihm makes sense if we're operating under the
2109          * assumption that the number of distinct flows (and therefore the
2110          * number of distinct batches) is quite small.  If this turns out not
2111          * to be the case, it may make sense to pre sort based on the
2112          * netdev_flow pointer.  That done we can get the appropriate batching
2113          * in O(n * log(n)) instead. */
2114         batch = NULL;
2115         flow = dp_netdev_flow_cast(rules[i]);
2116         for (j = 0; j < n_batches; j++) {
2117             if (batches[j].flow == flow) {
2118                 batch = &batches[j];
2119                 break;
2120             }
2121         }
2122
2123         if (!batch) {
2124             batch = &batches[n_batches++];
2125             packet_batch_init(batch, flow, md);
2126         }
2127         packet_batch_update(batch, packets[i], mfs[i]);
2128     }
2129
2130     for (i = 0; i < n_batches; i++) {
2131         packet_batch_execute(&batches[i], dp);
2132     }
2133 }
2134
2135 static void
2136 dp_netdev_port_input(struct dp_netdev *dp, struct dpif_packet **packets,
2137                      int cnt, odp_port_t port_no)
2138 {
2139     uint32_t *recirc_depth = recirc_depth_get();
2140     struct pkt_metadata md = PKT_METADATA_INITIALIZER(port_no);
2141
2142     *recirc_depth = 0;
2143     dp_netdev_input(dp, packets, cnt, &md);
2144 }
2145
2146 static int
2147 dp_netdev_queue_userspace_packet(struct dp_netdev_queue *q,
2148                                  struct ofpbuf *packet, int type,
2149                                  const struct miniflow *key,
2150                                  const struct nlattr *userdata)
2151 OVS_REQUIRES(q->mutex)
2152 {
2153     if (q->head - q->tail < MAX_QUEUE_LEN) {
2154         struct dp_netdev_upcall *u = &q->upcalls[q->head++ & QUEUE_MASK];
2155         struct dpif_upcall *upcall = &u->upcall;
2156         struct ofpbuf *buf = &u->buf;
2157         size_t buf_size;
2158         struct flow flow;
2159
2160         upcall->type = type;
2161
2162         /* Allocate buffer big enough for everything. */
2163         buf_size = ODPUTIL_FLOW_KEY_BYTES;
2164         if (userdata) {
2165             buf_size += NLA_ALIGN(userdata->nla_len);
2166         }
2167         ofpbuf_init(buf, buf_size);
2168
2169         /* Put ODP flow. */
2170         miniflow_expand(key, &flow);
2171         odp_flow_key_from_flow(buf, &flow, NULL, flow.in_port.odp_port, true);
2172         upcall->key = ofpbuf_data(buf);
2173         upcall->key_len = ofpbuf_size(buf);
2174
2175         /* Put userdata. */
2176         if (userdata) {
2177             upcall->userdata = ofpbuf_put(buf, userdata,
2178                     NLA_ALIGN(userdata->nla_len));
2179         }
2180
2181         upcall->packet = *packet;
2182
2183         seq_change(q->seq);
2184
2185         return 0;
2186     } else {
2187         ofpbuf_delete(packet);
2188         return ENOBUFS;
2189     }
2190
2191 }
2192
2193 static int
2194 dp_netdev_output_userspace(struct dp_netdev *dp, struct ofpbuf **packets,
2195                            int cnt, int queue_no, int type,
2196                            const struct miniflow *key,
2197                            const struct nlattr *userdata)
2198 {
2199     struct dp_netdev_queue *q;
2200     int error;
2201     int i;
2202
2203     fat_rwlock_rdlock(&dp->queue_rwlock);
2204     q = &dp->handler_queues[queue_no];
2205     ovs_mutex_lock(&q->mutex);
2206     for (i = 0; i < cnt; i++) {
2207         struct ofpbuf *packet = packets[i];
2208
2209         error = dp_netdev_queue_userspace_packet(q, packet, type, key,
2210                                                  userdata);
2211         if (error == ENOBUFS) {
2212             dp_netdev_count_packet(dp, DP_STAT_LOST, 1);
2213         }
2214     }
2215     ovs_mutex_unlock(&q->mutex);
2216     fat_rwlock_unlock(&dp->queue_rwlock);
2217
2218     return error;
2219 }
2220
2221 struct dp_netdev_execute_aux {
2222     struct dp_netdev *dp;
2223 };
2224
2225 static void
2226 dp_execute_cb(void *aux_, struct dpif_packet **packets, int cnt,
2227               struct pkt_metadata *md,
2228               const struct nlattr *a, bool may_steal)
2229     OVS_NO_THREAD_SAFETY_ANALYSIS
2230 {
2231     struct dp_netdev_execute_aux *aux = aux_;
2232     int type = nl_attr_type(a);
2233     struct dp_netdev_port *p;
2234     uint32_t *depth = recirc_depth_get();
2235     int i;
2236
2237     switch ((enum ovs_action_attr)type) {
2238     case OVS_ACTION_ATTR_OUTPUT:
2239         p = dp_netdev_lookup_port(aux->dp, u32_to_odp(nl_attr_get_u32(a)));
2240         if (OVS_LIKELY(p)) {
2241             netdev_send(p->netdev, packets, cnt, may_steal);
2242         } else if (may_steal) {
2243             for (i = 0; i < cnt; i++) {
2244                 dpif_packet_delete(packets[i]);
2245             }
2246         }
2247         break;
2248
2249     case OVS_ACTION_ATTR_USERSPACE: {
2250         const struct nlattr *userdata;
2251         struct netdev_flow_key key;
2252
2253         userdata = nl_attr_find_nested(a, OVS_USERSPACE_ATTR_USERDATA);
2254
2255         miniflow_initialize(&key.flow, key.buf);
2256
2257         for (i = 0; i < cnt; i++) {
2258             struct ofpbuf *packet, *userspace_packet;
2259
2260             packet = &packets[i]->ofpbuf;
2261
2262             miniflow_extract(packet, md, &key.flow);
2263
2264             userspace_packet = may_steal ? packet : ofpbuf_clone(packet);
2265
2266             dp_netdev_output_userspace(aux->dp, &userspace_packet, 1,
2267                                        miniflow_hash_5tuple(&key.flow, 0)
2268                                            % aux->dp->n_handlers,
2269                                        DPIF_UC_ACTION, &key.flow,
2270                                        userdata);
2271         }
2272         break;
2273     }
2274
2275     case OVS_ACTION_ATTR_HASH: {
2276         const struct ovs_action_hash *hash_act;
2277         struct netdev_flow_key key;
2278         uint32_t hash;
2279
2280         hash_act = nl_attr_get(a);
2281
2282         miniflow_initialize(&key.flow, key.buf);
2283
2284         for (i = 0; i < cnt; i++) {
2285
2286             /* TODO: this is slow. Use RSS hash in the future */
2287             miniflow_extract(&packets[i]->ofpbuf, md, &key.flow);
2288
2289             if (hash_act->hash_alg == OVS_HASH_ALG_L4) {
2290                 /* Hash need not be symmetric, nor does it need to include
2291                  * L2 fields. */
2292                 hash = miniflow_hash_5tuple(&key.flow, hash_act->hash_basis);
2293             } else {
2294                 VLOG_WARN("Unknown hash algorithm specified "
2295                           "for the hash action.");
2296                 hash = 2;
2297             }
2298
2299             if (!hash) {
2300                 hash = 1; /* 0 is not valid */
2301             }
2302
2303             if (i == 0) {
2304                 md->dp_hash = hash;
2305             }
2306             packets[i]->dp_hash = hash;
2307         }
2308         break;
2309     }
2310
2311     case OVS_ACTION_ATTR_RECIRC:
2312         if (*depth < MAX_RECIRC_DEPTH) {
2313
2314             (*depth)++;
2315             for (i = 0; i < cnt; i++) {
2316                 struct dpif_packet *recirc_pkt;
2317                 struct pkt_metadata recirc_md = *md;
2318
2319                 recirc_pkt = (may_steal) ? packets[i]
2320                                     : dpif_packet_clone(packets[i]);
2321
2322                 recirc_md.recirc_id = nl_attr_get_u32(a);
2323
2324                 /* Hash is private to each packet */
2325                 recirc_md.dp_hash = packets[i]->dp_hash;
2326
2327                 dp_netdev_input(aux->dp, &recirc_pkt, 1, &recirc_md);
2328             }
2329             (*depth)--;
2330
2331             break;
2332         } else {
2333             VLOG_WARN("Packet dropped. Max recirculation depth exceeded.");
2334             if (may_steal) {
2335                 for (i = 0; i < cnt; i++) {
2336                     dpif_packet_delete(packets[i]);
2337                 }
2338             }
2339         }
2340         break;
2341
2342     case OVS_ACTION_ATTR_PUSH_VLAN:
2343     case OVS_ACTION_ATTR_POP_VLAN:
2344     case OVS_ACTION_ATTR_PUSH_MPLS:
2345     case OVS_ACTION_ATTR_POP_MPLS:
2346     case OVS_ACTION_ATTR_SET:
2347     case OVS_ACTION_ATTR_SAMPLE:
2348     case OVS_ACTION_ATTR_UNSPEC:
2349     case __OVS_ACTION_ATTR_MAX:
2350         OVS_NOT_REACHED();
2351     }
2352 }
2353
2354 static void
2355 dp_netdev_execute_actions(struct dp_netdev *dp,
2356                           struct dpif_packet **packets, int cnt,
2357                           bool may_steal, struct pkt_metadata *md,
2358                           const struct nlattr *actions, size_t actions_len)
2359 {
2360     struct dp_netdev_execute_aux aux = {dp};
2361
2362     odp_execute_actions(&aux, packets, cnt, may_steal, md, actions,
2363                         actions_len, dp_execute_cb);
2364 }
2365
2366 const struct dpif_class dpif_netdev_class = {
2367     "netdev",
2368     dpif_netdev_enumerate,
2369     dpif_netdev_port_open_type,
2370     dpif_netdev_open,
2371     dpif_netdev_close,
2372     dpif_netdev_destroy,
2373     dpif_netdev_run,
2374     dpif_netdev_wait,
2375     dpif_netdev_get_stats,
2376     dpif_netdev_port_add,
2377     dpif_netdev_port_del,
2378     dpif_netdev_port_query_by_number,
2379     dpif_netdev_port_query_by_name,
2380     NULL,                       /* port_get_pid */
2381     dpif_netdev_port_dump_start,
2382     dpif_netdev_port_dump_next,
2383     dpif_netdev_port_dump_done,
2384     dpif_netdev_port_poll,
2385     dpif_netdev_port_poll_wait,
2386     dpif_netdev_flow_get,
2387     dpif_netdev_flow_put,
2388     dpif_netdev_flow_del,
2389     dpif_netdev_flow_flush,
2390     dpif_netdev_flow_dump_create,
2391     dpif_netdev_flow_dump_destroy,
2392     dpif_netdev_flow_dump_thread_create,
2393     dpif_netdev_flow_dump_thread_destroy,
2394     dpif_netdev_flow_dump_next,
2395     dpif_netdev_execute,
2396     NULL,                       /* operate */
2397     dpif_netdev_recv_set,
2398     dpif_netdev_handlers_set,
2399     dpif_netdev_queue_to_priority,
2400     dpif_netdev_recv,
2401     dpif_netdev_recv_wait,
2402     dpif_netdev_recv_purge,
2403 };
2404
2405 static void
2406 dpif_dummy_change_port_number(struct unixctl_conn *conn, int argc OVS_UNUSED,
2407                               const char *argv[], void *aux OVS_UNUSED)
2408 {
2409     struct dp_netdev_port *old_port;
2410     struct dp_netdev_port *new_port;
2411     struct dp_netdev *dp;
2412     odp_port_t port_no;
2413
2414     ovs_mutex_lock(&dp_netdev_mutex);
2415     dp = shash_find_data(&dp_netdevs, argv[1]);
2416     if (!dp || !dpif_netdev_class_is_dummy(dp->class)) {
2417         ovs_mutex_unlock(&dp_netdev_mutex);
2418         unixctl_command_reply_error(conn, "unknown datapath or not a dummy");
2419         return;
2420     }
2421     ovs_refcount_ref(&dp->ref_cnt);
2422     ovs_mutex_unlock(&dp_netdev_mutex);
2423
2424     ovs_mutex_lock(&dp->port_mutex);
2425     if (get_port_by_name(dp, argv[2], &old_port)) {
2426         unixctl_command_reply_error(conn, "unknown port");
2427         goto exit;
2428     }
2429
2430     port_no = u32_to_odp(atoi(argv[3]));
2431     if (!port_no || port_no == ODPP_NONE) {
2432         unixctl_command_reply_error(conn, "bad port number");
2433         goto exit;
2434     }
2435     if (dp_netdev_lookup_port(dp, port_no)) {
2436         unixctl_command_reply_error(conn, "port number already in use");
2437         goto exit;
2438     }
2439
2440     /* Remove old port. */
2441     cmap_remove(&dp->ports, &old_port->node, hash_port_no(old_port->port_no));
2442     ovsrcu_postpone(free, old_port);
2443
2444     /* Insert new port (cmap semantics mean we cannot re-insert 'old_port'). */
2445     new_port = xmemdup(old_port, sizeof *old_port);
2446     new_port->port_no = port_no;
2447     cmap_insert(&dp->ports, &new_port->node, hash_port_no(port_no));
2448
2449     seq_change(dp->port_seq);
2450     unixctl_command_reply(conn, NULL);
2451
2452 exit:
2453     ovs_mutex_unlock(&dp->port_mutex);
2454     dp_netdev_unref(dp);
2455 }
2456
2457 static void
2458 dpif_dummy_delete_port(struct unixctl_conn *conn, int argc OVS_UNUSED,
2459                        const char *argv[], void *aux OVS_UNUSED)
2460 {
2461     struct dp_netdev_port *port;
2462     struct dp_netdev *dp;
2463
2464     ovs_mutex_lock(&dp_netdev_mutex);
2465     dp = shash_find_data(&dp_netdevs, argv[1]);
2466     if (!dp || !dpif_netdev_class_is_dummy(dp->class)) {
2467         ovs_mutex_unlock(&dp_netdev_mutex);
2468         unixctl_command_reply_error(conn, "unknown datapath or not a dummy");
2469         return;
2470     }
2471     ovs_refcount_ref(&dp->ref_cnt);
2472     ovs_mutex_unlock(&dp_netdev_mutex);
2473
2474     ovs_mutex_lock(&dp->port_mutex);
2475     if (get_port_by_name(dp, argv[2], &port)) {
2476         unixctl_command_reply_error(conn, "unknown port");
2477     } else if (port->port_no == ODPP_LOCAL) {
2478         unixctl_command_reply_error(conn, "can't delete local port");
2479     } else {
2480         do_del_port(dp, port);
2481         unixctl_command_reply(conn, NULL);
2482     }
2483     ovs_mutex_unlock(&dp->port_mutex);
2484
2485     dp_netdev_unref(dp);
2486 }
2487
2488 static void
2489 dpif_dummy_register__(const char *type)
2490 {
2491     struct dpif_class *class;
2492
2493     class = xmalloc(sizeof *class);
2494     *class = dpif_netdev_class;
2495     class->type = xstrdup(type);
2496     dp_register_provider(class);
2497 }
2498
2499 void
2500 dpif_dummy_register(bool override)
2501 {
2502     if (override) {
2503         struct sset types;
2504         const char *type;
2505
2506         sset_init(&types);
2507         dp_enumerate_types(&types);
2508         SSET_FOR_EACH (type, &types) {
2509             if (!dp_unregister_provider(type)) {
2510                 dpif_dummy_register__(type);
2511             }
2512         }
2513         sset_destroy(&types);
2514     }
2515
2516     dpif_dummy_register__("dummy");
2517
2518     unixctl_command_register("dpif-dummy/change-port-number",
2519                              "DP PORT NEW-NUMBER",
2520                              3, 3, dpif_dummy_change_port_number, NULL);
2521     unixctl_command_register("dpif-dummy/delete-port", "DP PORT",
2522                              2, 2, dpif_dummy_delete_port, NULL);
2523 }