rconn: Make rconn_packet_counter thread-safe.
[cascardo/ovs.git] / lib / rconn.c
index 9b6cd86..e54c770 100644 (file)
@@ -150,6 +150,7 @@ static void reconnect(struct rconn *);
 static void report_error(struct rconn *, int error);
 static void disconnect(struct rconn *, int error);
 static void flush_queue(struct rconn *);
+static void close_monitor(struct rconn *, size_t idx, int retval);
 static void copy_to_monitor(struct rconn *, const struct ofpbuf *);
 static bool is_connected_state(enum state);
 static bool is_admitted_msg(const struct ofpbuf *);
@@ -378,7 +379,8 @@ reconnect(struct rconn *rc)
         rc->backoff_deadline = time_now() + rc->backoff;
         state_transition(rc, S_CONNECTING);
     } else {
-        VLOG_WARN("%s: connection failed (%s)", rc->name, strerror(retval));
+        VLOG_WARN("%s: connection failed (%s)",
+                  rc->name, ovs_strerror(retval));
         rc->backoff_deadline = TIME_MAX; /* Prevent resetting backoff. */
         disconnect(rc, retval);
     }
@@ -416,7 +418,7 @@ run_CONNECTING(struct rconn *rc)
     } else if (retval != EAGAIN) {
         if (rconn_logging_connection_attempts__(rc)) {
             VLOG_INFO("%s: connection failed (%s)",
-                      rc->name, strerror(retval));
+                      rc->name, ovs_strerror(retval));
         }
         disconnect(rc, retval);
     } else if (timed_out(rc)) {
@@ -512,8 +514,21 @@ rconn_run(struct rconn *rc)
     if (rc->vconn) {
         vconn_run(rc->vconn);
     }
-    for (i = 0; i < rc->n_monitors; i++) {
+    for (i = 0; i < rc->n_monitors; ) {
+        struct ofpbuf *msg;
+        int retval;
+
         vconn_run(rc->monitors[i]);
+
+        /* Drain any stray message that came in on the monitor connection. */
+        retval = vconn_recv(rc->monitors[i], &msg);
+        if (!retval) {
+            ofpbuf_delete(msg);
+        } else if (retval != EAGAIN) {
+            close_monitor(rc, i, retval);
+            continue;
+        }
+        i++;
     }
 
     do {
@@ -544,6 +559,7 @@ rconn_run_wait(struct rconn *rc)
     }
     for (i = 0; i < rc->n_monitors; i++) {
         vconn_run_wait(rc->monitors[i]);
+        vconn_recv_wait(rc->monitors[i]);
     }
 
     timeo = timeout(rc);
@@ -648,7 +664,7 @@ int
 rconn_send_with_limit(struct rconn *rc, struct ofpbuf *b,
                       struct rconn_packet_counter *counter, int queue_limit)
 {
-    if (counter->n_packets < queue_limit) {
+    if (rconn_packet_counter_n_packets(counter) < queue_limit) {
         return rconn_send(rc, b, counter);
     } else {
         COVERAGE_INC(rconn_overflow);
@@ -847,7 +863,10 @@ struct rconn_packet_counter *
 rconn_packet_counter_create(void)
 {
     struct rconn_packet_counter *c = xzalloc(sizeof *c);
+    ovs_mutex_init(&c->mutex);
+    ovs_mutex_lock(&c->mutex);
     c->ref_cnt = 1;
+    ovs_mutex_unlock(&c->mutex);
     return c;
 }
 
@@ -855,8 +874,15 @@ void
 rconn_packet_counter_destroy(struct rconn_packet_counter *c)
 {
     if (c) {
+        bool dead;
+
+        ovs_mutex_lock(&c->mutex);
         ovs_assert(c->ref_cnt > 0);
-        if (!--c->ref_cnt && !c->n_packets) {
+        dead = !--c->ref_cnt && !c->n_packets;
+        ovs_mutex_unlock(&c->mutex);
+
+        if (dead) {
+            ovs_mutex_destroy(&c->mutex);
             free(c);
         }
     }
@@ -865,25 +891,56 @@ rconn_packet_counter_destroy(struct rconn_packet_counter *c)
 void
 rconn_packet_counter_inc(struct rconn_packet_counter *c, unsigned int n_bytes)
 {
+    ovs_mutex_lock(&c->mutex);
     c->n_packets++;
     c->n_bytes += n_bytes;
+    ovs_mutex_unlock(&c->mutex);
 }
 
 void
 rconn_packet_counter_dec(struct rconn_packet_counter *c, unsigned int n_bytes)
 {
-    ovs_assert(c->n_packets > 0);
-    ovs_assert(c->n_bytes >= n_bytes);
+    bool dead = false;
 
-    c->n_bytes -= n_bytes;
+    ovs_mutex_lock(&c->mutex);
+    ovs_assert(c->n_packets > 0);
+    ovs_assert(c->n_packets == 1
+               ? c->n_bytes == n_bytes
+               : c->n_bytes > n_bytes);
     c->n_packets--;
-    if (!c->n_packets) {
-        ovs_assert(!c->n_bytes);
-        if (!c->ref_cnt) {
-            free(c);
-        }
+    c->n_bytes -= n_bytes;
+    dead = !c->n_packets && !c->ref_cnt;
+    ovs_mutex_unlock(&c->mutex);
+
+    if (dead) {
+        ovs_mutex_destroy(&c->mutex);
+        free(c);
     }
 }
+
+unsigned int
+rconn_packet_counter_n_packets(const struct rconn_packet_counter *c)
+{
+    unsigned int n;
+
+    ovs_mutex_lock(&c->mutex);
+    n = c->n_packets;
+    ovs_mutex_unlock(&c->mutex);
+
+    return n;
+}
+
+unsigned int
+rconn_packet_counter_n_bytes(const struct rconn_packet_counter *c)
+{
+    unsigned int n;
+
+    ovs_mutex_lock(&c->mutex);
+    n = c->n_bytes;
+    ovs_mutex_unlock(&c->mutex);
+
+    return n;
+}
 \f
 /* Set rc->target and rc->name to 'target' and 'name', respectively.  If 'name'
  * is null, 'target' is used.
@@ -947,7 +1004,8 @@ report_error(struct rconn *rc, int error)
         enum vlog_level level = rc->reliable ? VLL_INFO : VLL_DBG;
         VLOG(level, "%s: connection closed by peer", rc->name);
     } else {
-        VLOG_WARN("%s: connection dropped (%s)", rc->name, strerror(error));
+        VLOG_WARN("%s: connection dropped (%s)",
+                  rc->name, ovs_strerror(error));
     }
 }
 
@@ -1056,6 +1114,15 @@ state_transition(struct rconn *rc, enum state state)
     rc->state_entered = time_now();
 }
 
+static void
+close_monitor(struct rconn *rc, size_t idx, int retval)
+{
+    VLOG_DBG("%s: closing monitor connection to %s: %s",
+             rconn_get_name(rc), vconn_get_name(rc->monitors[idx]),
+             ovs_retval_to_string(retval));
+    rc->monitors[idx] = rc->monitors[--rc->n_monitors];
+}
+
 static void
 copy_to_monitor(struct rconn *rc, const struct ofpbuf *b)
 {
@@ -1073,10 +1140,7 @@ copy_to_monitor(struct rconn *rc, const struct ofpbuf *b)
         if (!retval) {
             clone = NULL;
         } else if (retval != EAGAIN) {
-            VLOG_DBG("%s: closing monitor connection to %s: %s",
-                     rconn_get_name(rc), vconn_get_name(vconn),
-                     strerror(retval));
-            rc->monitors[i] = rc->monitors[--rc->n_monitors];
+            close_monitor(rc, i, retval);
             continue;
         }
         i++;
@@ -1116,21 +1180,14 @@ is_admitted_msg(const struct ofpbuf *b)
     case OFPTYPE_QUEUE_GET_CONFIG_REPLY:
     case OFPTYPE_GET_ASYNC_REQUEST:
     case OFPTYPE_GET_ASYNC_REPLY:
-    case OFPTYPE_METER_MOD:
-    case OFPTYPE_GROUP_REQUEST:
-    case OFPTYPE_GROUP_REPLY:
-    case OFPTYPE_GROUP_DESC_REQUEST:
-    case OFPTYPE_GROUP_DESC_REPLY:
-    case OFPTYPE_GROUP_FEATURES_REQUEST:
-    case OFPTYPE_GROUP_FEATURES_REPLY:
-    case OFPTYPE_METER_REQUEST:
-    case OFPTYPE_METER_REPLY:
-    case OFPTYPE_METER_CONFIG_REQUEST:
-    case OFPTYPE_METER_CONFIG_REPLY:
-    case OFPTYPE_METER_FEATURES_REQUEST:
-    case OFPTYPE_METER_FEATURES_REPLY:
-    case OFPTYPE_TABLE_FEATURES_REQUEST:
-    case OFPTYPE_TABLE_FEATURES_REPLY:
+    case OFPTYPE_GROUP_STATS_REQUEST:
+    case OFPTYPE_GROUP_STATS_REPLY:
+    case OFPTYPE_GROUP_DESC_STATS_REQUEST:
+    case OFPTYPE_GROUP_DESC_STATS_REPLY:
+    case OFPTYPE_GROUP_FEATURES_STATS_REQUEST:
+    case OFPTYPE_GROUP_FEATURES_STATS_REPLY:
+    case OFPTYPE_TABLE_FEATURES_STATS_REQUEST:
+    case OFPTYPE_TABLE_FEATURES_STATS_REPLY:
         return false;
 
     case OFPTYPE_PACKET_IN:
@@ -1139,6 +1196,7 @@ is_admitted_msg(const struct ofpbuf *b)
     case OFPTYPE_PACKET_OUT:
     case OFPTYPE_FLOW_MOD:
     case OFPTYPE_PORT_MOD:
+    case OFPTYPE_METER_MOD:
     case OFPTYPE_BARRIER_REQUEST:
     case OFPTYPE_BARRIER_REPLY:
     case OFPTYPE_DESC_STATS_REQUEST:
@@ -1155,6 +1213,12 @@ is_admitted_msg(const struct ofpbuf *b)
     case OFPTYPE_QUEUE_STATS_REPLY:
     case OFPTYPE_PORT_DESC_STATS_REQUEST:
     case OFPTYPE_PORT_DESC_STATS_REPLY:
+    case OFPTYPE_METER_STATS_REQUEST:
+    case OFPTYPE_METER_STATS_REPLY:
+    case OFPTYPE_METER_CONFIG_STATS_REQUEST:
+    case OFPTYPE_METER_CONFIG_STATS_REPLY:
+    case OFPTYPE_METER_FEATURES_STATS_REQUEST:
+    case OFPTYPE_METER_FEATURES_STATS_REPLY:
     case OFPTYPE_ROLE_REQUEST:
     case OFPTYPE_ROLE_REPLY:
     case OFPTYPE_SET_FLOW_FORMAT: