setup n_upcall_pids for vport_request when destroy all channels
[cascardo/ovs.git] / lib / dpif-linux.c
index b70ddef..9eeb889 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2008, 2009, 2010, 2011, 2012, 2013, 2014 Nicira, Inc.
+ * Copyright (c) 2008, 2009, 2010, 2011, 2012, 2013, 2014, 2015 Nicira, Inc.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -180,7 +180,8 @@ static int dpif_linux_init(void);
 static int open_dpif(const struct dpif_linux_dp *, struct dpif **);
 static uint32_t dpif_linux_port_get_pid(const struct dpif *,
                                         odp_port_t port_no, uint32_t hash);
-static int dpif_linux_refresh_channels(struct dpif_linux *, uint32_t n_handlers);
+static int dpif_linux_refresh_channels(struct dpif_linux *,
+                                       uint32_t n_handlers);
 static void dpif_linux_vport_to_ofpbuf(const struct dpif_linux_vport *,
                                        struct ofpbuf *);
 static int dpif_linux_vport_from_ofpbuf(struct dpif_linux_vport *,
@@ -459,7 +460,7 @@ vport_del_channels(struct dpif_linux *dpif, odp_port_t port_no)
 }
 
 static void
-destroy_all_channels(struct dpif_linux *dpif)
+destroy_all_channels(struct dpif_linux *dpif) OVS_REQ_WRLOCK(dpif->upcall_lock)
 {
     unsigned int i;
 
@@ -483,6 +484,7 @@ destroy_all_channels(struct dpif_linux *dpif)
         vport_request.cmd = OVS_VPORT_CMD_SET;
         vport_request.dp_ifindex = dpif->dp_ifindex;
         vport_request.port_no = u32_to_odp(i);
+        vport_request.n_upcall_pids = 1;
         vport_request.upcall_pids = &upcall_pids;
         dpif_linux_vport_transact(&vport_request, NULL, NULL);
 
@@ -625,6 +627,7 @@ netdev_to_ovs_vport_type(const struct netdev *netdev)
 static int
 dpif_linux_port_add__(struct dpif_linux *dpif, struct netdev *netdev,
                       odp_port_t *port_nop)
+    OVS_REQ_WRLOCK(dpif->upcall_lock)
 {
     const struct netdev_tunnel_config *tnl_cfg;
     char namebuf[NETDEV_VPORT_NAME_BUFSIZE];
@@ -674,7 +677,7 @@ dpif_linux_port_add__(struct dpif_linux *dpif, struct netdev *netdev,
 
     request.port_no = *port_nop;
     upcall_pids = vport_socksp_to_pids(socksp, dpif->n_handlers);
-    request.n_upcall_pids = dpif->n_handlers;
+    request.n_upcall_pids = socksp ? dpif->n_handlers : 1;
     request.upcall_pids = upcall_pids;
 
     error = dpif_linux_vport_transact(&request, &reply, &buf);
@@ -731,6 +734,7 @@ dpif_linux_port_add(struct dpif *dpif_, struct netdev *netdev,
 
 static int
 dpif_linux_port_del__(struct dpif_linux *dpif, odp_port_t port_no)
+    OVS_REQ_WRLOCK(dpif->upcall_lock)
 {
     struct dpif_linux_vport vport;
     int error;
@@ -809,27 +813,45 @@ dpif_linux_port_query_by_name(const struct dpif *dpif_, const char *devname,
 }
 
 static uint32_t
-dpif_linux_port_get_pid(const struct dpif *dpif_, odp_port_t port_no,
-                        uint32_t hash)
+dpif_linux_port_get_pid__(const struct dpif_linux *dpif, odp_port_t port_no,
+                          uint32_t hash)
+    OVS_REQ_RDLOCK(dpif->upcall_lock)
 {
-    struct dpif_linux *dpif = dpif_linux_cast(dpif_);
     uint32_t port_idx = odp_to_u32(port_no);
     uint32_t pid = 0;
 
-    fat_rwlock_rdlock(&dpif->upcall_lock);
-    if (dpif->handlers) {
+    if (dpif->handlers && dpif->uc_array_size > 0) {
         /* The ODPP_NONE "reserved" port number uses the "ovs-system"'s
          * channel, since it is not heavily loaded. */
         uint32_t idx = port_idx >= dpif->uc_array_size ? 0 : port_idx;
         struct dpif_handler *h = &dpif->handlers[hash % dpif->n_handlers];
 
-        pid = nl_sock_pid(h->channels[idx].sock);
+        /* Needs to check in case the socket pointer is changed in between
+         * the holding of upcall_lock.  A known case happens when the main
+         * thread deletes the vport while the handler thread is handling
+         * the upcall from that port. */
+        if (h->channels[idx].sock) {
+            pid = nl_sock_pid(h->channels[idx].sock);
+        }
     }
-    fat_rwlock_unlock(&dpif->upcall_lock);
 
     return pid;
 }
 
+static uint32_t
+dpif_linux_port_get_pid(const struct dpif *dpif_, odp_port_t port_no,
+                        uint32_t hash)
+{
+    const struct dpif_linux *dpif = dpif_linux_cast(dpif_);
+    uint32_t ret;
+
+    fat_rwlock_rdlock(&dpif->upcall_lock);
+    ret = dpif_linux_port_get_pid__(dpif, port_no, hash);
+    fat_rwlock_unlock(&dpif->upcall_lock);
+
+    return ret;
+}
+
 static int
 dpif_linux_flow_flush(struct dpif *dpif_)
 {
@@ -1026,24 +1048,27 @@ dpif_linux_flow_get__(const struct dpif_linux *dpif,
 static int
 dpif_linux_flow_get(const struct dpif *dpif_,
                     const struct nlattr *key, size_t key_len,
-                    struct ofpbuf **actionsp, struct dpif_flow_stats *stats)
+                    struct ofpbuf **bufp,
+                    struct nlattr **maskp, size_t *mask_len,
+                    struct nlattr **actionsp, size_t *actions_len,
+                    struct dpif_flow_stats *stats)
 {
     const struct dpif_linux *dpif = dpif_linux_cast(dpif_);
     struct dpif_linux_flow reply;
-    struct ofpbuf *buf;
     int error;
 
-    error = dpif_linux_flow_get__(dpif, key, key_len, &reply, &buf);
+    error = dpif_linux_flow_get__(dpif, key, key_len, &reply, bufp);
     if (!error) {
-        if (stats) {
-            dpif_linux_flow_get_stats(&reply, stats);
+        if (maskp) {
+            *maskp = CONST_CAST(struct nlattr *, reply.mask);
+            *mask_len = reply.mask_len;
         }
         if (actionsp) {
-            ofpbuf_set_data(buf, CONST_CAST(struct nlattr *, reply.actions));
-            ofpbuf_set_size(buf, reply.actions_len);
-            *actionsp = buf;
-        } else {
-            ofpbuf_delete(buf);
+            *actionsp = CONST_CAST(struct nlattr *, reply.actions);
+            *actions_len = reply.actions_len;
+        }
+        if (stats) {
+            dpif_linux_flow_get_stats(&reply, stats);
         }
     }
     return error;
@@ -1206,9 +1231,19 @@ dpif_linux_flow_dump_next(const struct dpif *dpif_, void *iter_, void *state_,
         }
 
         if (actions && !state->flow.actions) {
+            struct dpif_linux_flow reply;
+
+            /* Keys are required to be allocated from 'state->buffer' so
+             * they're preserved across calls.  Therefore we need a separate
+             * reply to prevent them from being overwritten.  Actions, however,
+             * don't have this requirement, so it's that fine they're destroyed
+             * on the next call. */
             error = dpif_linux_flow_get__(dpif, state->flow.key,
                                           state->flow.key_len,
-                                          &state->flow, &state->tmp);
+                                          &reply, &state->tmp);
+            state->flow.actions = reply.actions;
+            state->flow.actions_len = reply.actions_len;
+
             if (error == ENOENT) {
                 VLOG_DBG("dumped flow disappeared on get");
             } else if (error) {
@@ -1241,8 +1276,16 @@ static bool
 dpif_linux_flow_dump_next_may_destroy_keys(void *state_)
 {
     struct dpif_linux_flow_state *state = state_;
+    struct dpif_linux_flow flow;
+    struct ofpbuf nlmsg;
 
-    return ofpbuf_size(&state->buffer) ? false : true;
+    /* Check whether there's a flow remaining in the buffer that includes
+     * actions.  (If it does not include actions, then we could end up
+     * destroying keys previously returned trying to retrieve its actions
+     * fails.) */
+    return (!nl_dump_peek(&nlmsg, &state->buffer)
+            || dpif_linux_flow_from_ofpbuf(&flow, &nlmsg)
+            || !flow.actions);
 }
 
 static int
@@ -1310,11 +1353,13 @@ dpif_linux_execute(struct dpif *dpif_, struct dpif_execute *execute)
     return dpif_linux_execute__(dpif->dp_ifindex, execute);
 }
 
-#define MAX_OPS 50
-
-static void
+/* Executes, against 'dpif', up to the first 'n_ops' operations in 'ops'.
+ * Returns the number actually executed (at least 1, if 'n_ops' is
+ * positive). */
+static size_t
 dpif_linux_operate__(struct dpif_linux *dpif, struct dpif_op **ops, size_t n_ops)
 {
+    enum { MAX_OPS = 50 };
 
     struct op_auxdata {
         struct nl_transaction txn;
@@ -1329,13 +1374,12 @@ dpif_linux_operate__(struct dpif_linux *dpif, struct dpif_op **ops, size_t n_ops
     struct nl_transaction *txnsp[MAX_OPS];
     size_t i;
 
-    ovs_assert(n_ops <= MAX_OPS);
+    n_ops = MIN(n_ops, MAX_OPS);
     for (i = 0; i < n_ops; i++) {
         struct op_auxdata *aux = &auxes[i];
         struct dpif_op *op = ops[i];
         struct dpif_flow_put *put;
         struct dpif_flow_del *del;
-        struct dpif_execute *execute;
         struct dpif_linux_flow flow;
 
         ofpbuf_use_stub(&aux->request,
@@ -1367,9 +1411,25 @@ dpif_linux_operate__(struct dpif_linux *dpif, struct dpif_op **ops, size_t n_ops
             break;
 
         case DPIF_OP_EXECUTE:
-            execute = &op->u.execute;
-            dpif_linux_encode_execute(dpif->dp_ifindex, execute,
-                                      &aux->request);
+            /* Can't execute a packet that won't fit in a Netlink attribute. */
+            if (OVS_UNLIKELY(nl_attr_oversized(
+                                 ofpbuf_size(op->u.execute.packet)))) {
+                /* Report an error immediately if this is the first operation.
+                 * Otherwise the easiest thing to do is to postpone to the next
+                 * call (when this will be the first operation). */
+                if (i == 0) {
+                    VLOG_ERR_RL(&error_rl,
+                                "dropping oversized %"PRIu32"-byte packet",
+                                ofpbuf_size(op->u.execute.packet));
+                    op->error = ENOBUFS;
+                    return 1;
+                }
+                n_ops = i;
+            } else {
+                dpif_linux_encode_execute(dpif->dp_ifindex, &op->u.execute,
+                                          &aux->request);
+            }
+
             break;
 
         default:
@@ -1440,6 +1500,8 @@ dpif_linux_operate__(struct dpif_linux *dpif, struct dpif_op **ops, size_t n_ops
         ofpbuf_uninit(&aux->request);
         ofpbuf_uninit(&aux->reply);
     }
+
+    return n_ops;
 }
 
 static void
@@ -1448,8 +1510,7 @@ dpif_linux_operate(struct dpif *dpif_, struct dpif_op **ops, size_t n_ops)
     struct dpif_linux *dpif = dpif_linux_cast(dpif_);
 
     while (n_ops > 0) {
-        size_t chunk = MIN(n_ops, MAX_OPS);
-        dpif_linux_operate__(dpif, ops, chunk);
+        size_t chunk = dpif_linux_operate__(dpif, ops, n_ops);
         ops += chunk;
         n_ops -= chunk;
     }
@@ -1461,6 +1522,7 @@ dpif_linux_operate(struct dpif *dpif_, struct dpif_op **ops, size_t n_ops)
  * backing kernel vports. */
 static int
 dpif_linux_refresh_channels(struct dpif_linux *dpif, uint32_t n_handlers)
+    OVS_REQ_WRLOCK(dpif->upcall_lock)
 {
     unsigned long int *keep_channels;
     struct dpif_linux_vport vport;
@@ -1586,6 +1648,7 @@ dpif_linux_refresh_channels(struct dpif_linux *dpif, uint32_t n_handlers)
 
 static int
 dpif_linux_recv_set__(struct dpif_linux *dpif, bool enable)
+    OVS_REQ_WRLOCK(dpif->upcall_lock)
 {
     if ((dpif->handlers != NULL) == enable) {
         return 0;
@@ -1702,6 +1765,7 @@ parse_odp_packet(struct ofpbuf *buf, struct dpif_upcall *upcall,
 static int
 dpif_linux_recv__(struct dpif_linux *dpif, uint32_t handler_id,
                   struct dpif_upcall *upcall, struct ofpbuf *buf)
+    OVS_REQ_RDLOCK(dpif->upcall_lock)
 {
     struct dpif_handler *handler;
     int read_tries = 0;
@@ -1787,25 +1851,30 @@ dpif_linux_recv(struct dpif *dpif_, uint32_t handler_id,
 }
 
 static void
-dpif_linux_recv_wait(struct dpif *dpif_, uint32_t handler_id)
+dpif_linux_recv_wait__(struct dpif_linux *dpif, uint32_t handler_id)
+    OVS_REQ_RDLOCK(dpif->upcall_lock)
 {
-    struct dpif_linux *dpif = dpif_linux_cast(dpif_);
-
-    fat_rwlock_rdlock(&dpif->upcall_lock);
     if (dpif->handlers && handler_id < dpif->n_handlers) {
         struct dpif_handler *handler = &dpif->handlers[handler_id];
 
         poll_fd_wait(handler->epoll_fd, POLLIN);
     }
-    fat_rwlock_unlock(&dpif->upcall_lock);
 }
 
 static void
-dpif_linux_recv_purge(struct dpif *dpif_)
+dpif_linux_recv_wait(struct dpif *dpif_, uint32_t handler_id)
 {
     struct dpif_linux *dpif = dpif_linux_cast(dpif_);
 
-    fat_rwlock_wrlock(&dpif->upcall_lock);
+    fat_rwlock_rdlock(&dpif->upcall_lock);
+    dpif_linux_recv_wait__(dpif, handler_id);
+    fat_rwlock_unlock(&dpif->upcall_lock);
+}
+
+static void
+dpif_linux_recv_purge__(struct dpif_linux *dpif)
+    OVS_REQ_WRLOCK(dpif->upcall_lock)
+{
     if (dpif->handlers) {
         size_t i, j;
 
@@ -1819,6 +1888,15 @@ dpif_linux_recv_purge(struct dpif *dpif_)
             }
         }
     }
+}
+
+static void
+dpif_linux_recv_purge(struct dpif *dpif_)
+{
+    struct dpif_linux *dpif = dpif_linux_cast(dpif_);
+
+    fat_rwlock_wrlock(&dpif->upcall_lock);
+    dpif_linux_recv_purge__(dpif);
     fat_rwlock_unlock(&dpif->upcall_lock);
 }