dpif-netdev: Remove PMD latency on seq_mutex
authorFlavio Leitner <fbl@redhat.com>
Tue, 5 Jul 2016 13:33:38 +0000 (10:33 -0300)
committerDaniele Di Proietto <diproiettod@vmware.com>
Fri, 8 Jul 2016 23:17:20 +0000 (16:17 -0700)
The PMD thread needs to keep processing RX queues in order
to achieve maximum throughput. It also needs to sweep emc
cache and quiesce which use seq_mutex. That mutex can
eventually block the PMD thread causing latency spikes and
affecting the throughput.

Since there is no requirement for running those tasks at a
specific time, this patch extend seq API to allow tentative
locking instead.

Reported-by: Karl Rister <krister@redhat.com>
Co-authored-by: Karl Rister <krister@redhat.com>
Signed-off-by: Flavio Leitner <fbl@redhat.com>
Signed-off-by: Daniele Di Proietto <diproiettod@vmware.com>
lib/dpif-netdev.c
lib/ovs-rcu.c
lib/ovs-rcu.h
lib/seq.c
lib/seq.h

index fec7615..e0107b7 100644 (file)
@@ -2871,9 +2871,10 @@ reload:
 
             lc = 0;
 
-            emc_cache_slow_sweep(&pmd->flow_cache);
             coverage_try_clear();
-            ovsrcu_quiesce();
+            if (!ovsrcu_try_quiesce()) {
+                emc_cache_slow_sweep(&pmd->flow_cache);
+            }
 
             atomic_read_relaxed(&pmd->change_seq, &seq);
             if (seq != port_seq) {
index 0ff508e..8aef1f1 100644 (file)
@@ -15,6 +15,7 @@
  */
 
 #include <config.h>
+#include <errno.h>
 #include "ovs-rcu.h"
 #include "fatal-signal.h"
 #include "guarded-list.h"
@@ -57,6 +58,7 @@ static struct guarded_list flushed_cbsets;
 static struct seq *flushed_cbsets_seq;
 
 static void ovsrcu_init_module(void);
+static void ovsrcu_flush_cbset__(struct ovsrcu_perthread *, bool);
 static void ovsrcu_flush_cbset(struct ovsrcu_perthread *);
 static void ovsrcu_unregister__(struct ovsrcu_perthread *);
 static bool ovsrcu_call_postponed(void);
@@ -151,6 +153,27 @@ ovsrcu_quiesce(void)
     ovsrcu_quiesced();
 }
 
+int
+ovsrcu_try_quiesce(void)
+{
+    struct ovsrcu_perthread *perthread;
+    int ret = EBUSY;
+
+    ovs_assert(!single_threaded());
+    perthread = ovsrcu_perthread_get();
+    if (!seq_try_lock()) {
+        perthread->seqno = seq_read_protected(global_seqno);
+        if (perthread->cbset) {
+            ovsrcu_flush_cbset__(perthread, true);
+        }
+        seq_change_protected(global_seqno);
+        seq_unlock();
+        ovsrcu_quiesced();
+        ret = 0;
+    }
+    return ret;
+}
+
 bool
 ovsrcu_is_quiescent(void)
 {
@@ -292,7 +315,7 @@ ovsrcu_postpone_thread(void *arg OVS_UNUSED)
 }
 
 static void
-ovsrcu_flush_cbset(struct ovsrcu_perthread *perthread)
+ovsrcu_flush_cbset__(struct ovsrcu_perthread *perthread, bool protected)
 {
     struct ovsrcu_cbset *cbset = perthread->cbset;
 
@@ -300,10 +323,20 @@ ovsrcu_flush_cbset(struct ovsrcu_perthread *perthread)
         guarded_list_push_back(&flushed_cbsets, &cbset->list_node, SIZE_MAX);
         perthread->cbset = NULL;
 
-        seq_change(flushed_cbsets_seq);
+        if (protected) {
+            seq_change_protected(flushed_cbsets_seq);
+        } else {
+            seq_change(flushed_cbsets_seq);
+        }
     }
 }
 
+static void
+ovsrcu_flush_cbset(struct ovsrcu_perthread *perthread)
+{
+    ovsrcu_flush_cbset__(perthread, false);
+}
+
 static void
 ovsrcu_unregister__(struct ovsrcu_perthread *perthread)
 {
index 8750ead..dc75749 100644 (file)
@@ -217,6 +217,7 @@ void ovsrcu_postpone__(void (*function)(void *aux), void *aux);
 void ovsrcu_quiesce_start(void);
 void ovsrcu_quiesce_end(void);
 void ovsrcu_quiesce(void);
+int ovsrcu_try_quiesce(void);
 bool ovsrcu_is_quiescent(void);
 
 /* Synchronization.  Waits for all non-quiescent threads to quiesce at least
index 57e0775..b8b5b65 100644 (file)
--- a/lib/seq.c
+++ b/lib/seq.c
@@ -100,20 +100,61 @@ seq_destroy(struct seq *seq)
     ovs_mutex_unlock(&seq_mutex);
 }
 
+int
+seq_try_lock(void)
+{
+    return ovs_mutex_trylock(&seq_mutex);
+}
+
+void
+seq_lock(void)
+    OVS_ACQUIRES(seq_mutex)
+{
+    ovs_mutex_lock(&seq_mutex);
+}
+
+void
+seq_unlock(void)
+    OVS_RELEASES(seq_mutex)
+{
+    ovs_mutex_unlock(&seq_mutex);
+}
+
 /* Increments 'seq''s sequence number, waking up any threads that are waiting
  * on 'seq'. */
 void
-seq_change(struct seq *seq)
-    OVS_EXCLUDED(seq_mutex)
+seq_change_protected(struct seq *seq)
+    OVS_REQUIRES(seq_mutex)
 {
     COVERAGE_INC(seq_change);
 
-    ovs_mutex_lock(&seq_mutex);
     seq->value = seq_next++;
     seq_wake_waiters(seq);
+}
+
+/* Increments 'seq''s sequence number, waking up any threads that are waiting
+ * on 'seq'. */
+void
+seq_change(struct seq *seq)
+    OVS_EXCLUDED(seq_mutex)
+{
+    ovs_mutex_lock(&seq_mutex);
+    seq_change_protected(seq);
     ovs_mutex_unlock(&seq_mutex);
 }
 
+/* Returns 'seq''s current sequence number (which could change immediately).
+ *
+ * seq_read() and seq_wait() can be used together to yield a race-free wakeup
+ * when an object changes, even without an ability to lock the object.  See
+ * Usage in seq.h for details. */
+uint64_t
+seq_read_protected(const struct seq *seq)
+    OVS_REQUIRES(seq_mutex)
+{
+    return seq->value;
+}
+
 /* Returns 'seq''s current sequence number (which could change immediately).
  *
  * seq_read() and seq_wait() can be used together to yield a race-free wakeup
@@ -126,7 +167,7 @@ seq_read(const struct seq *seq)
     uint64_t value;
 
     ovs_mutex_lock(&seq_mutex);
-    value = seq->value;
+    value = seq_read_protected(seq);
     ovs_mutex_unlock(&seq_mutex);
 
     return value;
index f3f4b53..221ab9a 100644 (file)
--- a/lib/seq.h
+++ b/lib/seq.h
 struct seq *seq_create(void);
 void seq_destroy(struct seq *);
 void seq_change(struct seq *);
+void seq_change_protected(struct seq *);
+void seq_lock(void);
+int seq_try_lock(void);
+void seq_unlock(void);
 
 /* For observers. */
 uint64_t seq_read(const struct seq *);
+uint64_t seq_read_protected(const struct seq *);
 
 void seq_wait_at(const struct seq *, uint64_t value, const char *where);
 #define seq_wait(seq, value) seq_wait_at(seq, value, OVS_SOURCE_LOCATOR)