ovs-thread: Implement OVS specific barrier.
authorAlex Wang <alexw@nicira.com>
Thu, 29 May 2014 22:37:37 +0000 (15:37 -0700)
committerAlex Wang <alexw@nicira.com>
Mon, 7 Jul 2014 18:27:58 +0000 (11:27 -0700)
Non-leader revalidator thread uses pthread_barrier_* functions in their
main loop to synchronize with leader thread.  However, since those threads
only call poll_block() intermittently, the poll interval check in
poll_block() can wrongly take the time since last call as poll interval
and issue the following warnings:

"Unreasonably long XXXXms poll interval".

To prevent it, this commit implements the barrier struct and operations
for OVS which allow thread to block on barrier via poll_block().

Signed-off-by: Alex Wang <alexw@nicira.com>
Acked-by: Ben Pfaff <blp@nicira.com>
lib/ovs-thread.c
lib/ovs-thread.h
ofproto/ofproto-dpif-upcall.c

index 49b3474..176f434 100644 (file)
@@ -24,6 +24,7 @@
 #include "hash.h"
 #include "ovs-rcu.h"
 #include "poll-loop.h"
+#include "seq.h"
 #include "socket-util.h"
 #include "util.h"
 
@@ -166,10 +167,6 @@ XPTHREAD_FUNC1(pthread_cond_destroy, pthread_cond_t *);
 XPTHREAD_FUNC1(pthread_cond_signal, pthread_cond_t *);
 XPTHREAD_FUNC1(pthread_cond_broadcast, pthread_cond_t *);
 
-XPTHREAD_FUNC3(pthread_barrier_init, pthread_barrier_t *,
-               pthread_barrierattr_t *, unsigned int);
-XPTHREAD_FUNC1(pthread_barrier_destroy, pthread_barrier_t *);
-
 XPTHREAD_FUNC2(pthread_join, pthread_t, void **);
 
 typedef void destructor_func(void *);
@@ -255,20 +252,43 @@ ovs_mutex_cond_wait(pthread_cond_t *cond, const struct ovs_mutex *mutex_)
     }
 }
 
-int
-xpthread_barrier_wait(pthread_barrier_t *barrier)
+/* Initializes the 'barrier'.  'size' is the number of threads
+ * expected to hit the barrier. */
+void
+ovs_barrier_init(struct ovs_barrier *barrier, uint32_t size)
 {
-    int error;
+    barrier->size = size;
+    atomic_init(&barrier->count, 0);
+    barrier->seq = seq_create();
+}
 
-    ovsrcu_quiesce_start();
-    error = pthread_barrier_wait(barrier);
-    ovsrcu_quiesce_end();
+/* Destroys the 'barrier'. */
+void
+ovs_barrier_destroy(struct ovs_barrier *barrier)
+{
+    seq_destroy(barrier->seq);
+}
 
-    if (error && OVS_UNLIKELY(error != PTHREAD_BARRIER_SERIAL_THREAD)) {
-        ovs_abort(error, "pthread_barrier_wait failed");
+/* Makes the calling thread block on the 'barrier' until all
+ * 'barrier->size' threads hit the barrier. */
+void
+ovs_barrier_block(struct ovs_barrier *barrier)
+{
+    uint64_t seq = seq_read(barrier->seq);
+    uint32_t orig;
+
+    atomic_add(&barrier->count, 1, &orig);
+    if (orig + 1 == barrier->size) {
+        atomic_store(&barrier->count, 0);
+        seq_change(barrier->seq);
     }
 
-    return error;
+    /* To prevent thread from waking up by other event,
+     * keeps waiting for the change of 'barrier->seq'. */
+    while (seq == seq_read(barrier->seq)) {
+        seq_wait(barrier->seq, seq);
+        poll_block();
+    }
 }
 \f
 DEFINE_EXTERN_PER_THREAD_DATA(ovsthread_id, 0);
index 68db71f..6565abf 100644 (file)
@@ -23,6 +23,7 @@
 #include "ovs-atomic.h"
 #include "util.h"
 
+struct seq;
 
 /* Mutex. */
 struct OVS_LOCKABLE ovs_mutex {
@@ -30,6 +31,13 @@ struct OVS_LOCKABLE ovs_mutex {
     const char *where;          /* NULL if and only if uninitialized. */
 };
 
+/* Poll-block()-able barrier similar to pthread_barrier_t. */
+struct ovs_barrier {
+    uint32_t size;            /* Number of threads to wait. */
+    atomic_uint32_t count;    /* Number of threads already hit the barrier. */
+    struct seq *seq;
+};
+
 /* "struct ovs_mutex" initializer. */
 #ifdef PTHREAD_ERRORCHECK_MUTEX_INITIALIZER_NP
 #define OVS_MUTEX_INITIALIZER { PTHREAD_ERRORCHECK_MUTEX_INITIALIZER_NP, \
@@ -139,6 +147,11 @@ int ovs_rwlock_tryrdlock_at(const struct ovs_rwlock *rwlock, const char *where)
 #define ovs_rwlock_tryrdlock(rwlock) \
         ovs_rwlock_tryrdlock_at(rwlock, SOURCE_LOCATOR)
 
+/* ovs_barrier functions analogous to pthread_barrier_*() functions. */
+void ovs_barrier_init(struct ovs_barrier *, uint32_t count);
+void ovs_barrier_destroy(struct ovs_barrier *);
+void ovs_barrier_block(struct ovs_barrier *);
+
 /* Wrappers for xpthread_cond_*() that abort the process on any error.
  *
  * Use ovs_mutex_cond_wait() to wait for a condition. */
@@ -147,12 +160,6 @@ void xpthread_cond_destroy(pthread_cond_t *);
 void xpthread_cond_signal(pthread_cond_t *);
 void xpthread_cond_broadcast(pthread_cond_t *);
 
-/* Wrappers for pthread_barrier_*() that abort the process on any error. */
-void xpthread_barrier_init(pthread_barrier_t *, pthread_barrierattr_t *,
-                           unsigned int count);
-int xpthread_barrier_wait(pthread_barrier_t *);
-void xpthread_barrier_destroy(pthread_barrier_t *);
-
 void xpthread_key_create(pthread_key_t *, void (*destructor)(void *));
 void xpthread_key_delete(pthread_key_t);
 void xpthread_setspecific(pthread_key_t, const void *);
index 74d9686..c8d35c8 100644 (file)
@@ -101,7 +101,7 @@ struct udpif {
     struct seq *reval_seq;             /* Incremented to force revalidation. */
     bool need_revalidate;              /* As indicated by 'reval_seq'. */
     bool reval_exit;                   /* Set by leader on 'exit_latch. */
-    pthread_barrier_t reval_barrier;   /* Barrier used by revalidators. */
+    struct ovs_barrier reval_barrier;  /* Barrier used by revalidators. */
     struct dpif_flow_dump dump;        /* DPIF flow dump state. */
     long long int dump_duration;       /* Duration of the last flow dump. */
     struct seq *dump_seq;              /* Increments each dump iteration. */
@@ -315,7 +315,7 @@ udpif_stop_threads(struct udpif *udpif)
 
         latch_poll(&udpif->exit_latch);
 
-        xpthread_barrier_destroy(&udpif->reval_barrier);
+        ovs_barrier_destroy(&udpif->reval_barrier);
 
         free(udpif->revalidators);
         udpif->revalidators = NULL;
@@ -352,8 +352,7 @@ udpif_start_threads(struct udpif *udpif, size_t n_handlers,
                 "handler", udpif_upcall_handler, handler);
         }
 
-        xpthread_barrier_init(&udpif->reval_barrier, NULL,
-                              udpif->n_revalidators);
+        ovs_barrier_init(&udpif->reval_barrier, udpif->n_revalidators);
         udpif->reval_exit = false;
         udpif->revalidators = xzalloc(udpif->n_revalidators
                                       * sizeof *udpif->revalidators);
@@ -585,18 +584,18 @@ udpif_revalidator(void *arg)
         }
 
         /* Wait for the leader to start the flow dump. */
-        xpthread_barrier_wait(&udpif->reval_barrier);
+        ovs_barrier_block(&udpif->reval_barrier);
         if (udpif->reval_exit) {
             break;
         }
         revalidate(revalidator);
 
         /* Wait for all flows to have been dumped before we garbage collect. */
-        xpthread_barrier_wait(&udpif->reval_barrier);
+        ovs_barrier_block(&udpif->reval_barrier);
         revalidator_sweep(revalidator);
 
         /* Wait for all revalidators to finish garbage collection. */
-        xpthread_barrier_wait(&udpif->reval_barrier);
+        ovs_barrier_block(&udpif->reval_barrier);
 
         if (leader) {
             long long int duration;