tests: Add bundle action test with buffer realloc.
[cascardo/ovs.git] / lib / ovs-rcu.c
index c1ac61a..b8f8bc4 100644 (file)
 
 #include <config.h>
 #include "ovs-rcu.h"
+#include "fatal-signal.h"
 #include "guarded-list.h"
 #include "list.h"
 #include "ovs-thread.h"
 #include "poll-loop.h"
 #include "seq.h"
+#include "timeval.h"
+#include "openvswitch/vlog.h"
+
+VLOG_DEFINE_THIS_MODULE(ovs_rcu);
 
 struct ovsrcu_cb {
     void (*function)(void *aux);
@@ -28,48 +33,52 @@ struct ovsrcu_cb {
 };
 
 struct ovsrcu_cbset {
-    struct list list_node;
+    struct ovs_list list_node;
     struct ovsrcu_cb cbs[16];
     int n_cbs;
 };
 
 struct ovsrcu_perthread {
-    struct list list_node;           /* In global list. */
+    struct ovs_list list_node;  /* In global list. */
 
     struct ovs_mutex mutex;
     uint64_t seqno;
     struct ovsrcu_cbset *cbset;
+    char name[16];              /* This thread's name. */
 };
 
 static struct seq *global_seqno;
 
 static pthread_key_t perthread_key;
-static struct list ovsrcu_threads;
+static struct ovs_list ovsrcu_threads;
 static struct ovs_mutex ovsrcu_threads_mutex;
 
 static struct guarded_list flushed_cbsets;
 static struct seq *flushed_cbsets_seq;
 
-static void ovsrcu_init(void);
+static void ovsrcu_init_module(void);
 static void ovsrcu_flush_cbset(struct ovsrcu_perthread *);
 static void ovsrcu_unregister__(struct ovsrcu_perthread *);
 static bool ovsrcu_call_postponed(void);
 static void *ovsrcu_postpone_thread(void *arg OVS_UNUSED);
-static void ovsrcu_synchronize(void);
 
 static struct ovsrcu_perthread *
 ovsrcu_perthread_get(void)
 {
     struct ovsrcu_perthread *perthread;
 
-    ovsrcu_init();
+    ovsrcu_init_module();
 
     perthread = pthread_getspecific(perthread_key);
     if (!perthread) {
+        const char *name = get_subprogram_name();
+
         perthread = xmalloc(sizeof *perthread);
         ovs_mutex_init(&perthread->mutex);
         perthread->seqno = seq_read(global_seqno);
         perthread->cbset = NULL;
+        ovs_strlcpy(perthread->name, name[0] ? name : "main",
+                    sizeof perthread->name);
 
         ovs_mutex_lock(&ovsrcu_threads_mutex);
         list_push_back(&ovsrcu_threads, &perthread->list_node);
@@ -112,7 +121,7 @@ ovsrcu_quiesce_start(void)
 {
     struct ovsrcu_perthread *perthread;
 
-    ovsrcu_init();
+    ovsrcu_init_module();
     perthread = pthread_getspecific(perthread_key);
     if (perthread) {
         pthread_setspecific(perthread_key, NULL);
@@ -123,12 +132,20 @@ ovsrcu_quiesce_start(void)
 }
 
 /* Indicates a momentary quiescent state.  See "Details" near the top of
- * ovs-rcu.h. */
+ * ovs-rcu.h.
+ *
+ * Provides a full memory barrier via seq_change().
+ */
 void
 ovsrcu_quiesce(void)
 {
-    ovsrcu_init();
-    ovsrcu_perthread_get()->seqno = seq_read(global_seqno);
+    struct ovsrcu_perthread *perthread;
+
+    perthread = ovsrcu_perthread_get();
+    perthread->seqno = seq_read(global_seqno);
+    if (perthread->cbset) {
+        ovsrcu_flush_cbset(perthread);
+    }
     seq_change(global_seqno);
 
     ovsrcu_quiesced();
@@ -137,14 +154,16 @@ ovsrcu_quiesce(void)
 bool
 ovsrcu_is_quiescent(void)
 {
-    ovsrcu_init();
+    ovsrcu_init_module();
     return pthread_getspecific(perthread_key) == NULL;
 }
 
-static void
+void
 ovsrcu_synchronize(void)
 {
+    unsigned int warning_threshold = 1000;
     uint64_t target_seqno;
+    long long int start;
 
     if (single_threaded()) {
         return;
@@ -152,15 +171,20 @@ ovsrcu_synchronize(void)
 
     target_seqno = seq_read(global_seqno);
     ovsrcu_quiesce_start();
+    start = time_msec();
 
     for (;;) {
         uint64_t cur_seqno = seq_read(global_seqno);
         struct ovsrcu_perthread *perthread;
+        char stalled_thread[16];
+        unsigned int elapsed;
         bool done = true;
 
         ovs_mutex_lock(&ovsrcu_threads_mutex);
         LIST_FOR_EACH (perthread, list_node, &ovsrcu_threads) {
             if (perthread->seqno <= target_seqno) {
+                ovs_strlcpy(stalled_thread, perthread->name,
+                            sizeof stalled_thread);
                 done = false;
                 break;
             }
@@ -171,6 +195,14 @@ ovsrcu_synchronize(void)
             break;
         }
 
+        elapsed = time_msec() - start;
+        if (elapsed >= warning_threshold) {
+            VLOG_WARN("blocked %u ms waiting for %s to quiesce",
+                      elapsed, stalled_thread);
+            warning_threshold *= 2;
+        }
+        poll_timer_wait_until(start + warning_threshold);
+
         seq_wait(global_seqno, cur_seqno);
         poll_block();
     }
@@ -180,6 +212,19 @@ ovsrcu_synchronize(void)
 /* Registers 'function' to be called, passing 'aux' as argument, after the
  * next grace period.
  *
+ * The call is guaranteed to happen after the next time all participating
+ * threads have quiesced at least once, but there is no quarantee that all
+ * registered functions are called as early as possible, or that the functions
+ * registered by different threads would be called in the order the
+ * registrations took place.  In particular, even if two threads provably
+ * register a function each in a specific order, the functions may still be
+ * called in the opposite order, depending on the timing of when the threads
+ * call ovsrcu_quiesce(), how many functions they postpone, and when the
+ * ovs-rcu thread happens to grab the functions to be called.
+ *
+ * All functions registered by a single thread are guaranteed to execute in the
+ * registering order, however.
+ *
  * This function is more conveniently called through the ovsrcu_postpone()
  * macro, which provides a type-safe way to allow 'function''s parameter to be
  * any pointer type. */
@@ -208,8 +253,8 @@ ovsrcu_postpone__(void (*function)(void *aux), void *aux)
 static bool
 ovsrcu_call_postponed(void)
 {
-    struct ovsrcu_cbset *cbset, *next_cbset;
-    struct list cbsets;
+    struct ovsrcu_cbset *cbset;
+    struct ovs_list cbsets;
 
     guarded_list_pop_all(&flushed_cbsets, &cbsets);
     if (list_is_empty(&cbsets)) {
@@ -218,13 +263,12 @@ ovsrcu_call_postponed(void)
 
     ovsrcu_synchronize();
 
-    LIST_FOR_EACH_SAFE (cbset, next_cbset, list_node, &cbsets) {
+    LIST_FOR_EACH_POP (cbset, list_node, &cbsets) {
         struct ovsrcu_cb *cb;
 
         for (cb = cbset->cbs; cb < &cbset->cbs[cbset->n_cbs]; cb++) {
             cb->function(cb->aux);
         }
-        list_remove(&cbset->list_node);
         free(cbset);
     }
 
@@ -283,13 +327,26 @@ ovsrcu_thread_exit_cb(void *perthread)
     ovsrcu_unregister__(perthread);
 }
 
+/* Cancels the callback to ovsrcu_thread_exit_cb().
+ *
+ * Cancelling the call to the destructor during the main thread exit
+ * is needed while using pthreads-win32 library in Windows. It has been
+ * observed that in pthreads-win32, a call to the destructor during
+ * main thread exit causes undefined behavior. */
+static void
+ovsrcu_cancel_thread_exit_cb(void *aux OVS_UNUSED)
+{
+    pthread_setspecific(perthread_key, NULL);
+}
+
 static void
-ovsrcu_init(void)
+ovsrcu_init_module(void)
 {
     static struct ovsthread_once once = OVSTHREAD_ONCE_INITIALIZER;
     if (ovsthread_once_start(&once)) {
         global_seqno = seq_create();
         xpthread_key_create(&perthread_key, ovsrcu_thread_exit_cb);
+        fatal_signal_add_hook(ovsrcu_cancel_thread_exit_cb, NULL, NULL, true);
         list_init(&ovsrcu_threads);
         ovs_mutex_init(&ovsrcu_threads_mutex);