ofproto-dpif-upcall: Batch upcalls.
authorJarno Rajahalme <jrajahalme@nicira.com>
Wed, 28 Aug 2013 19:06:07 +0000 (12:06 -0700)
committerBen Pfaff <blp@nicira.com>
Wed, 28 Aug 2013 19:53:27 +0000 (12:53 -0700)
Batching reduces overheads and enables upto 4 times the upcall processing
performance in a specialized test case.

Signed-off-by: Jarno Rajahalme <jrajahalme@nicira.com>
Signed-off-by: Ben Pfaff <blp@nicira.com>
ofproto/ofproto-dpif-upcall.c

index db78af3..22cdf1b 100644 (file)
@@ -55,6 +55,8 @@ struct handler {
     struct list upcalls OVS_GUARDED;
     size_t n_upcalls OVS_GUARDED;
 
+    size_t n_new_upcalls;              /* Only changed by the dispatcher. */
+
     pthread_cond_t wake_cond;          /* Wakes 'thread' while holding
                                           'mutex'. */
 };
@@ -515,6 +517,10 @@ static void
 recv_upcalls(struct udpif *udpif)
 {
     static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(60, 60);
+    size_t n_udpif_new_upcalls = 0;
+    struct handler *handler;
+    int n;
+
     for (;;) {
         struct upcall *upcall;
         int error;
@@ -535,7 +541,6 @@ recv_upcalls(struct udpif *udpif)
         } else if (upcall->type == MISS_UPCALL) {
             struct dpif_upcall *dupcall = &upcall->dpif_upcall;
             uint32_t hash = udpif->secret;
-            struct handler *handler;
             struct nlattr *nla;
             size_t n_bytes, left;
 
@@ -561,8 +566,11 @@ recv_upcalls(struct udpif *udpif)
             ovs_mutex_lock(&handler->mutex);
             if (handler->n_upcalls < MAX_QUEUE_LENGTH) {
                 list_push_back(&handler->upcalls, &upcall->list_node);
-                handler->n_upcalls++;
-                xpthread_cond_signal(&handler->wake_cond);
+                handler->n_new_upcalls = ++handler->n_upcalls;
+
+                if (handler->n_new_upcalls >= FLOW_MISS_MAX_BATCH) {
+                    xpthread_cond_signal(&handler->wake_cond);
+                }
                 ovs_mutex_unlock(&handler->mutex);
                 if (!VLOG_DROP_DBG(&rl)) {
                     struct ds ds = DS_EMPTY_INITIALIZER;
@@ -581,10 +589,13 @@ recv_upcalls(struct udpif *udpif)
         } else {
             ovs_mutex_lock(&udpif->upcall_mutex);
             if (udpif->n_upcalls < MAX_QUEUE_LENGTH) {
-                udpif->n_upcalls++;
+                n_udpif_new_upcalls = ++udpif->n_upcalls;
                 list_push_back(&udpif->upcalls, &upcall->list_node);
                 ovs_mutex_unlock(&udpif->upcall_mutex);
-                seq_change(udpif->wait_seq);
+
+                if (n_udpif_new_upcalls >= FLOW_MISS_MAX_BATCH) {
+                    seq_change(udpif->wait_seq);
+                }
             } else {
                 ovs_mutex_unlock(&udpif->upcall_mutex);
                 COVERAGE_INC(upcall_queue_overflow);
@@ -592,6 +603,18 @@ recv_upcalls(struct udpif *udpif)
             }
         }
     }
+    for (n = 0; n < udpif->n_handlers; ++n) {
+        handler = &udpif->handlers[n];
+        if (handler->n_new_upcalls) {
+            handler->n_new_upcalls = 0;
+            ovs_mutex_lock(&handler->mutex);
+            xpthread_cond_signal(&handler->wake_cond);
+            ovs_mutex_unlock(&handler->mutex);
+        }
+    }
+    if (n_udpif_new_upcalls) {
+        seq_change(udpif->wait_seq);
+    }
 }
 
 static struct flow_miss *