From: Alex Wang Date: Thu, 29 May 2014 22:37:37 +0000 (-0700) Subject: ovs-thread: Implement OVS specific barrier. X-Git-Tag: v2.3~37 X-Git-Url: http://git.cascardo.eti.br/?p=cascardo%2Fovs.git;a=commitdiff_plain;h=6b1585a0ffeb8f7cb2e11bbe45b083b9804a6eaf ovs-thread: Implement OVS specific barrier. Non-leader revalidator thread uses pthread_barrier_* functions in their main loop to synchronize with leader thread. However, since those threads only call poll_block() intermittently, the poll interval check in poll_block() can wrongly take the time since last call as poll interval and issue the following warnings: "Unreasonably long XXXXms poll interval". To prevent it, this commit implements the barrier struct and operations for OVS which allow thread to block on barrier via poll_block(). Signed-off-by: Alex Wang Acked-by: Ben Pfaff --- diff --git a/lib/ovs-thread.c b/lib/ovs-thread.c index 49b347418..176f434d9 100644 --- a/lib/ovs-thread.c +++ b/lib/ovs-thread.c @@ -24,6 +24,7 @@ #include "hash.h" #include "ovs-rcu.h" #include "poll-loop.h" +#include "seq.h" #include "socket-util.h" #include "util.h" @@ -166,10 +167,6 @@ XPTHREAD_FUNC1(pthread_cond_destroy, pthread_cond_t *); XPTHREAD_FUNC1(pthread_cond_signal, pthread_cond_t *); XPTHREAD_FUNC1(pthread_cond_broadcast, pthread_cond_t *); -XPTHREAD_FUNC3(pthread_barrier_init, pthread_barrier_t *, - pthread_barrierattr_t *, unsigned int); -XPTHREAD_FUNC1(pthread_barrier_destroy, pthread_barrier_t *); - XPTHREAD_FUNC2(pthread_join, pthread_t, void **); typedef void destructor_func(void *); @@ -255,20 +252,43 @@ ovs_mutex_cond_wait(pthread_cond_t *cond, const struct ovs_mutex *mutex_) } } -int -xpthread_barrier_wait(pthread_barrier_t *barrier) +/* Initializes the 'barrier'. 'size' is the number of threads + * expected to hit the barrier. */ +void +ovs_barrier_init(struct ovs_barrier *barrier, uint32_t size) { - int error; + barrier->size = size; + atomic_init(&barrier->count, 0); + barrier->seq = seq_create(); +} - ovsrcu_quiesce_start(); - error = pthread_barrier_wait(barrier); - ovsrcu_quiesce_end(); +/* Destroys the 'barrier'. */ +void +ovs_barrier_destroy(struct ovs_barrier *barrier) +{ + seq_destroy(barrier->seq); +} - if (error && OVS_UNLIKELY(error != PTHREAD_BARRIER_SERIAL_THREAD)) { - ovs_abort(error, "pthread_barrier_wait failed"); +/* Makes the calling thread block on the 'barrier' until all + * 'barrier->size' threads hit the barrier. */ +void +ovs_barrier_block(struct ovs_barrier *barrier) +{ + uint64_t seq = seq_read(barrier->seq); + uint32_t orig; + + atomic_add(&barrier->count, 1, &orig); + if (orig + 1 == barrier->size) { + atomic_store(&barrier->count, 0); + seq_change(barrier->seq); } - return error; + /* To prevent thread from waking up by other event, + * keeps waiting for the change of 'barrier->seq'. */ + while (seq == seq_read(barrier->seq)) { + seq_wait(barrier->seq, seq); + poll_block(); + } } DEFINE_EXTERN_PER_THREAD_DATA(ovsthread_id, 0); diff --git a/lib/ovs-thread.h b/lib/ovs-thread.h index 68db71fba..6565abf17 100644 --- a/lib/ovs-thread.h +++ b/lib/ovs-thread.h @@ -23,6 +23,7 @@ #include "ovs-atomic.h" #include "util.h" +struct seq; /* Mutex. */ struct OVS_LOCKABLE ovs_mutex { @@ -30,6 +31,13 @@ struct OVS_LOCKABLE ovs_mutex { const char *where; /* NULL if and only if uninitialized. */ }; +/* Poll-block()-able barrier similar to pthread_barrier_t. */ +struct ovs_barrier { + uint32_t size; /* Number of threads to wait. */ + atomic_uint32_t count; /* Number of threads already hit the barrier. */ + struct seq *seq; +}; + /* "struct ovs_mutex" initializer. */ #ifdef PTHREAD_ERRORCHECK_MUTEX_INITIALIZER_NP #define OVS_MUTEX_INITIALIZER { PTHREAD_ERRORCHECK_MUTEX_INITIALIZER_NP, \ @@ -139,6 +147,11 @@ int ovs_rwlock_tryrdlock_at(const struct ovs_rwlock *rwlock, const char *where) #define ovs_rwlock_tryrdlock(rwlock) \ ovs_rwlock_tryrdlock_at(rwlock, SOURCE_LOCATOR) +/* ovs_barrier functions analogous to pthread_barrier_*() functions. */ +void ovs_barrier_init(struct ovs_barrier *, uint32_t count); +void ovs_barrier_destroy(struct ovs_barrier *); +void ovs_barrier_block(struct ovs_barrier *); + /* Wrappers for xpthread_cond_*() that abort the process on any error. * * Use ovs_mutex_cond_wait() to wait for a condition. */ @@ -147,12 +160,6 @@ void xpthread_cond_destroy(pthread_cond_t *); void xpthread_cond_signal(pthread_cond_t *); void xpthread_cond_broadcast(pthread_cond_t *); -/* Wrappers for pthread_barrier_*() that abort the process on any error. */ -void xpthread_barrier_init(pthread_barrier_t *, pthread_barrierattr_t *, - unsigned int count); -int xpthread_barrier_wait(pthread_barrier_t *); -void xpthread_barrier_destroy(pthread_barrier_t *); - void xpthread_key_create(pthread_key_t *, void (*destructor)(void *)); void xpthread_key_delete(pthread_key_t); void xpthread_setspecific(pthread_key_t, const void *); diff --git a/ofproto/ofproto-dpif-upcall.c b/ofproto/ofproto-dpif-upcall.c index 74d9686e1..c8d35c804 100644 --- a/ofproto/ofproto-dpif-upcall.c +++ b/ofproto/ofproto-dpif-upcall.c @@ -101,7 +101,7 @@ struct udpif { struct seq *reval_seq; /* Incremented to force revalidation. */ bool need_revalidate; /* As indicated by 'reval_seq'. */ bool reval_exit; /* Set by leader on 'exit_latch. */ - pthread_barrier_t reval_barrier; /* Barrier used by revalidators. */ + struct ovs_barrier reval_barrier; /* Barrier used by revalidators. */ struct dpif_flow_dump dump; /* DPIF flow dump state. */ long long int dump_duration; /* Duration of the last flow dump. */ struct seq *dump_seq; /* Increments each dump iteration. */ @@ -315,7 +315,7 @@ udpif_stop_threads(struct udpif *udpif) latch_poll(&udpif->exit_latch); - xpthread_barrier_destroy(&udpif->reval_barrier); + ovs_barrier_destroy(&udpif->reval_barrier); free(udpif->revalidators); udpif->revalidators = NULL; @@ -352,8 +352,7 @@ udpif_start_threads(struct udpif *udpif, size_t n_handlers, "handler", udpif_upcall_handler, handler); } - xpthread_barrier_init(&udpif->reval_barrier, NULL, - udpif->n_revalidators); + ovs_barrier_init(&udpif->reval_barrier, udpif->n_revalidators); udpif->reval_exit = false; udpif->revalidators = xzalloc(udpif->n_revalidators * sizeof *udpif->revalidators); @@ -585,18 +584,18 @@ udpif_revalidator(void *arg) } /* Wait for the leader to start the flow dump. */ - xpthread_barrier_wait(&udpif->reval_barrier); + ovs_barrier_block(&udpif->reval_barrier); if (udpif->reval_exit) { break; } revalidate(revalidator); /* Wait for all flows to have been dumped before we garbage collect. */ - xpthread_barrier_wait(&udpif->reval_barrier); + ovs_barrier_block(&udpif->reval_barrier); revalidator_sweep(revalidator); /* Wait for all revalidators to finish garbage collection. */ - xpthread_barrier_wait(&udpif->reval_barrier); + ovs_barrier_block(&udpif->reval_barrier); if (leader) { long long int duration;