list: Rename struct list to struct ovs_list
[cascardo/ovs.git] / lib / ovs-rcu.c
index c1ac61a..1cf69ae 100644 (file)
 #include "ovs-thread.h"
 #include "poll-loop.h"
 #include "seq.h"
+#include "timeval.h"
+#include "vlog.h"
+
+VLOG_DEFINE_THIS_MODULE(ovs_rcu);
 
 struct ovsrcu_cb {
     void (*function)(void *aux);
@@ -28,48 +32,52 @@ struct ovsrcu_cb {
 };
 
 struct ovsrcu_cbset {
-    struct list list_node;
+    struct ovs_list list_node;
     struct ovsrcu_cb cbs[16];
     int n_cbs;
 };
 
 struct ovsrcu_perthread {
-    struct list list_node;           /* In global list. */
+    struct ovs_list list_node;  /* In global list. */
 
     struct ovs_mutex mutex;
     uint64_t seqno;
     struct ovsrcu_cbset *cbset;
+    char name[16];              /* This thread's name. */
 };
 
 static struct seq *global_seqno;
 
 static pthread_key_t perthread_key;
-static struct list ovsrcu_threads;
+static struct ovs_list ovsrcu_threads;
 static struct ovs_mutex ovsrcu_threads_mutex;
 
 static struct guarded_list flushed_cbsets;
 static struct seq *flushed_cbsets_seq;
 
-static void ovsrcu_init(void);
+static void ovsrcu_init_module(void);
 static void ovsrcu_flush_cbset(struct ovsrcu_perthread *);
 static void ovsrcu_unregister__(struct ovsrcu_perthread *);
 static bool ovsrcu_call_postponed(void);
 static void *ovsrcu_postpone_thread(void *arg OVS_UNUSED);
-static void ovsrcu_synchronize(void);
 
 static struct ovsrcu_perthread *
 ovsrcu_perthread_get(void)
 {
     struct ovsrcu_perthread *perthread;
 
-    ovsrcu_init();
+    ovsrcu_init_module();
 
     perthread = pthread_getspecific(perthread_key);
     if (!perthread) {
+        const char *name = get_subprogram_name();
+
         perthread = xmalloc(sizeof *perthread);
         ovs_mutex_init(&perthread->mutex);
         perthread->seqno = seq_read(global_seqno);
         perthread->cbset = NULL;
+        ovs_strlcpy(perthread->name, name[0] ? name : "main",
+                    sizeof perthread->name);
 
         ovs_mutex_lock(&ovsrcu_threads_mutex);
         list_push_back(&ovsrcu_threads, &perthread->list_node);
@@ -112,7 +120,7 @@ ovsrcu_quiesce_start(void)
 {
     struct ovsrcu_perthread *perthread;
 
-    ovsrcu_init();
+    ovsrcu_init_module();
     perthread = pthread_getspecific(perthread_key);
     if (perthread) {
         pthread_setspecific(perthread_key, NULL);
@@ -123,12 +131,20 @@ ovsrcu_quiesce_start(void)
 }
 
 /* Indicates a momentary quiescent state.  See "Details" near the top of
- * ovs-rcu.h. */
+ * ovs-rcu.h.
+ *
+ * Provides a full memory barrier via seq_change().
+ */
 void
 ovsrcu_quiesce(void)
 {
-    ovsrcu_init();
-    ovsrcu_perthread_get()->seqno = seq_read(global_seqno);
+    struct ovsrcu_perthread *perthread;
+
+    perthread = ovsrcu_perthread_get();
+    perthread->seqno = seq_read(global_seqno);
+    if (perthread->cbset) {
+        ovsrcu_flush_cbset(perthread);
+    }
     seq_change(global_seqno);
 
     ovsrcu_quiesced();
@@ -137,14 +153,16 @@ ovsrcu_quiesce(void)
 bool
 ovsrcu_is_quiescent(void)
 {
-    ovsrcu_init();
+    ovsrcu_init_module();
     return pthread_getspecific(perthread_key) == NULL;
 }
 
-static void
+void
 ovsrcu_synchronize(void)
 {
+    unsigned int warning_threshold = 1000;
     uint64_t target_seqno;
+    long long int start;
 
     if (single_threaded()) {
         return;
@@ -152,15 +170,20 @@ ovsrcu_synchronize(void)
 
     target_seqno = seq_read(global_seqno);
     ovsrcu_quiesce_start();
+    start = time_msec();
 
     for (;;) {
         uint64_t cur_seqno = seq_read(global_seqno);
         struct ovsrcu_perthread *perthread;
+        char stalled_thread[16];
+        unsigned int elapsed;
         bool done = true;
 
         ovs_mutex_lock(&ovsrcu_threads_mutex);
         LIST_FOR_EACH (perthread, list_node, &ovsrcu_threads) {
             if (perthread->seqno <= target_seqno) {
+                ovs_strlcpy(stalled_thread, perthread->name,
+                            sizeof stalled_thread);
                 done = false;
                 break;
             }
@@ -171,6 +194,14 @@ ovsrcu_synchronize(void)
             break;
         }
 
+        elapsed = time_msec() - start;
+        if (elapsed >= warning_threshold) {
+            VLOG_WARN("blocked %u ms waiting for %s to quiesce",
+                      elapsed, stalled_thread);
+            warning_threshold *= 2;
+        }
+        poll_timer_wait_until(start + warning_threshold);
+
         seq_wait(global_seqno, cur_seqno);
         poll_block();
     }
@@ -209,7 +240,7 @@ static bool
 ovsrcu_call_postponed(void)
 {
     struct ovsrcu_cbset *cbset, *next_cbset;
-    struct list cbsets;
+    struct ovs_list cbsets;
 
     guarded_list_pop_all(&flushed_cbsets, &cbsets);
     if (list_is_empty(&cbsets)) {
@@ -284,7 +315,7 @@ ovsrcu_thread_exit_cb(void *perthread)
 }
 
 static void
-ovsrcu_init(void)
+ovsrcu_init_module(void)
 {
     static struct ovsthread_once once = OVSTHREAD_ONCE_INITIALIZER;
     if (ovsthread_once_start(&once)) {