netlink-notifier: Support multiple groups.
authorJarno Rajahalme <jarno@ovn.org>
Mon, 13 Jun 2016 21:22:32 +0000 (14:22 -0700)
committerJarno Rajahalme <jarno@ovn.org>
Mon, 13 Jun 2016 21:22:32 +0000 (14:22 -0700)
A netlink notifier ('nln') already supports multiple notifiers.  This
patch allows each of these notifiers to subscribe to a different
multicast group.  Sharing a single socket for multiple event types
(each on their own multicast group) provides serialization of events
when reordering of different event types could be problematic.  For
example, if a 'create' event and 'delete' event are on different
netlink multicast group, we may want to process those events in the
order in which kernel issued them, rather than in the order we happen
to check for them.

Moving the multicast group argument from nln_create() to
nln_notifier_create() allows each notifier to specify a different
multicast group.  The parse callback needs to identify the group the
message belonged to by returning the corresponding group number, or 0
when an parse error occurs.

Signed-off-by: Jarno Rajahalme <jarno@ovn.org>
Acked-by: Thadeu Lima de Souza Cascardo <cascardo@redhat.com>
lib/netlink-notifier.c
lib/netlink-notifier.h
lib/route-table.c
lib/rtnetlink.c
tests/test-netlink-conntrack.c

index c2b4f7b..f6d1e4d 100644 (file)
@@ -32,7 +32,7 @@ VLOG_DEFINE_THIS_MODULE(netlink_notifier);
 
 COVERAGE_DEFINE(nln_changed);
 
-static void nln_report(struct nln *nln, void *change);
+static void nln_report(const struct nln *nln, void *change, int group);
 
 struct nln {
     struct nl_sock *notify_sock; /* Netlink socket. */
@@ -40,7 +40,6 @@ struct nln {
     bool has_run;                /* Guard for run and wait functions. */
 
     /* Passed in by nln_create(). */
-    int multicast_group;         /* Multicast group we listen on. */
     int protocol;                /* Protocol passed to nl_sock_create(). */
     nln_parse_func *parse;       /* Message parsing function. */
     void *change;                /* Change passed to parse. */
@@ -50,6 +49,7 @@ struct nln_notifier {
     struct nln *nln;             /* Parent nln. */
 
     struct ovs_list node;
+    int multicast_group;         /* Multicast group we listen on. */
     nln_notify_func *cb;
     void *aux;
 };
@@ -60,15 +60,13 @@ struct nln_notifier {
  * Incoming messages will be parsed with 'parse' which will be passed 'change'
  * as an argument. */
 struct nln *
-nln_create(int protocol, int multicast_group, nln_parse_func *parse,
-           void *change)
+nln_create(int protocol, nln_parse_func *parse, void *change)
 {
     struct nln *nln;
 
     nln = xzalloc(sizeof *nln);
     nln->notify_sock = NULL;
     nln->protocol = protocol;
-    nln->multicast_group = multicast_group;
     nln->parse = parse;
     nln->change = change;
     nln->has_run = false;
@@ -101,20 +99,17 @@ nln_destroy(struct nln *nln)
  *
  * Returns an initialized nln_notifier if successful, otherwise NULL. */
 struct nln_notifier *
-nln_notifier_create(struct nln *nln, nln_notify_func *cb, void *aux)
+nln_notifier_create(struct nln *nln, int multicast_group, nln_notify_func *cb,
+                    void *aux)
 {
     struct nln_notifier *notifier;
+    int error;
 
     if (!nln->notify_sock) {
         struct nl_sock *sock;
-        int error;
 
         error = nl_sock_create(nln->protocol, &sock);
-        if (!error) {
-            error = nl_sock_join_mcgroup(sock, nln->multicast_group);
-        }
         if (error) {
-            nl_sock_destroy(sock);
             VLOG_WARN("could not create netlink socket: %s",
                       ovs_strerror(error));
             return NULL;
@@ -126,11 +121,21 @@ nln_notifier_create(struct nln *nln, nln_notify_func *cb, void *aux)
         nln_run(nln);
     }
 
+    error = nl_sock_join_mcgroup(nln->notify_sock, multicast_group);
+    if (error) {
+        VLOG_WARN("could not join netlink multicast group: %s",
+                  ovs_strerror(error));
+        return NULL;
+    }
+
     notifier = xmalloc(sizeof *notifier);
-    ovs_list_push_back(&nln->all_notifiers, &notifier->node);
+    notifier->multicast_group = multicast_group;
     notifier->cb = cb;
     notifier->aux = aux;
     notifier->nln = nln;
+
+    ovs_list_push_back(&nln->all_notifiers, &notifier->node);
+
     return notifier;
 }
 
@@ -141,8 +146,21 @@ nln_notifier_destroy(struct nln_notifier *notifier)
 {
     if (notifier) {
         struct nln *nln = notifier->nln;
+        struct nln_notifier *iter;
+        int count = 0;
 
         ovs_list_remove(&notifier->node);
+
+        /* Leave the group if no other notifier is interested in it. */
+        LIST_FOR_EACH (iter, node, &nln->all_notifiers) {
+            if (iter->multicast_group == notifier->multicast_group) {
+                count++;
+            }
+        }
+        if (count == 0) {
+            nl_sock_leave_mcgroup(nln->notify_sock, notifier->multicast_group);
+        }
+
         if (ovs_list_is_empty(&nln->all_notifiers)) {
             nl_sock_destroy(nln->notify_sock);
             nln->notify_sock = NULL;
@@ -171,11 +189,13 @@ nln_run(struct nln *nln)
         ofpbuf_use_stub(&buf, buf_stub, sizeof buf_stub);
         error = nl_sock_recv(nln->notify_sock, &buf, false);
         if (!error) {
-            if (nln->parse(&buf, nln->change)) {
-                nln_report(nln, nln->change);
+            int group = nln->parse(&buf, nln->change);
+
+            if (group != 0) {
+                nln_report(nln, nln->change, group);
             } else {
                 VLOG_WARN_RL(&rl, "received bad netlink message");
-                nln_report(nln, NULL);
+                nln_report(nln, NULL, 0);
             }
             ofpbuf_uninit(&buf);
         } else if (error == EAGAIN) {
@@ -184,7 +204,7 @@ nln_run(struct nln *nln)
             if (error == ENOBUFS) {
                 /* The socket buffer might be full, there could be too many
                  * notifications, so it makes sense to call nln_report() */
-                nln_report(nln, NULL);
+                nln_report(nln, NULL, 0);
                 VLOG_WARN_RL(&rl, "netlink receive buffer overflowed");
             } else {
                 VLOG_WARN_RL(&rl, "error reading netlink socket: %s",
@@ -206,7 +226,7 @@ nln_wait(struct nln *nln)
 }
 
 static void
-nln_report(struct nln *nln, void *change)
+nln_report(const struct nln *nln, void *change, int group)
 {
     struct nln_notifier *notifier;
 
@@ -215,6 +235,8 @@ nln_report(struct nln *nln, void *change)
     }
 
     LIST_FOR_EACH (notifier, node, &nln->all_notifiers) {
-        notifier->cb(change, notifier->aux);
+        if (!change || group == notifier->multicast_group) {
+            notifier->cb(change, notifier->aux);
+        }
     }
 }
index da72fc7..f6a5150 100644 (file)
@@ -36,14 +36,15 @@ struct ofpbuf;
 typedef void nln_notify_func(const void *change, void *aux);
 
 /* Function called to parse incoming nln notifications.  The 'buf' message
- * should be parsed into 'change' as specified in nln_create(). */
-typedef bool nln_parse_func(struct ofpbuf *buf, void *change);
+ * should be parsed into 'change' as specified in nln_create().
+ * Returns the multicast_group the change belongs to, or 0 for a parse error.
+ */
+typedef int nln_parse_func(struct ofpbuf *buf, void *change);
 
-struct nln *nln_create(int protocol, int multicast_group, nln_parse_func *,
-                       void *change);
+struct nln *nln_create(int protocol, nln_parse_func *, void *change);
 void nln_destroy(struct nln *);
-struct nln_notifier *nln_notifier_create(struct nln *, nln_notify_func *,
-                                         void *aux);
+struct nln_notifier *nln_notifier_create(struct nln *, int multicast_group,
+                                         nln_notify_func *, void *aux);
 void nln_notifier_destroy(struct nln_notifier *);
 void nln_run(struct nln *);
 void nln_wait(struct nln *);
index d837d98..58e7f62 100644 (file)
@@ -62,7 +62,6 @@ static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 20);
 static uint64_t rt_change_seq;
 
 static struct nln *nln = NULL;
-static struct nln *nln6 = NULL;
 static struct route_table_msg rtmsg;
 static struct nln_notifier *route_notifier = NULL;
 static struct nln_notifier *route6_notifier = NULL;
@@ -72,7 +71,7 @@ static bool route_table_valid = false;
 
 static int route_table_reset(void);
 static void route_table_handle_msg(const struct route_table_msg *);
-static bool route_table_parse(struct ofpbuf *, struct route_table_msg *);
+static int route_table_parse(struct ofpbuf *, struct route_table_msg *);
 static void route_table_change(const struct route_table_msg *, void *);
 static void route_map_clear(void);
 
@@ -93,22 +92,19 @@ route_table_init(void)
 {
     ovs_mutex_lock(&route_table_mutex);
     ovs_assert(!nln);
-    ovs_assert(!nln6);
     ovs_assert(!route_notifier);
     ovs_assert(!route6_notifier);
 
     ovs_router_init();
-    nln = nln_create(NETLINK_ROUTE, RTNLGRP_IPV4_ROUTE,
-                     (nln_parse_func *) route_table_parse, &rtmsg);
-    nln6 = nln_create(NETLINK_ROUTE, RTNLGRP_IPV6_ROUTE,
-                      (nln_parse_func *) route_table_parse, &rtmsg);
+    nln = nln_create(NETLINK_ROUTE, (nln_parse_func *) route_table_parse,
+                     &rtmsg);
 
     route_notifier =
-        nln_notifier_create(nln, (nln_notify_func *) route_table_change,
-                            NULL);
+        nln_notifier_create(nln, RTNLGRP_IPV4_ROUTE,
+                            (nln_notify_func *) route_table_change, NULL);
     route6_notifier =
-        nln_notifier_create(nln6, (nln_notify_func *) route_table_change,
-                            NULL);
+        nln_notifier_create(nln, RTNLGRP_IPV6_ROUTE,
+                            (nln_notify_func *) route_table_change, NULL);
 
     route_table_reset();
     name_table_init();
@@ -122,14 +118,9 @@ route_table_run(void)
     OVS_EXCLUDED(route_table_mutex)
 {
     ovs_mutex_lock(&route_table_mutex);
-    if (nln || nln6) {
+    if (nln) {
         rtnetlink_run();
-        if (nln) {
-            nln_run(nln);
-        }
-        if (nln6) {
-            nln_run(nln6);
-        }
+        nln_run(nln);
 
         if (!route_table_valid) {
             route_table_reset();
@@ -144,14 +135,9 @@ route_table_wait(void)
     OVS_EXCLUDED(route_table_mutex)
 {
     ovs_mutex_lock(&route_table_mutex);
-    if (nln || nln6) {
+    if (nln) {
         rtnetlink_wait();
-        if (nln) {
-            nln_wait(nln);
-        }
-        if (nln6) {
-            nln_wait(nln6);
-        }
+        nln_wait(nln);
     }
     ovs_mutex_unlock(&route_table_mutex);
 }
@@ -191,7 +177,9 @@ route_table_reset(void)
     return nl_dump_done(&dump);
 }
 
-static bool
+/* Return RTNLGRP_IPV4_ROUTE or RTNLGRP_IPV6_ROUTE on success, 0 on parse
+ * error. */
+static int
 route_table_parse(struct ofpbuf *buf, struct route_table_msg *change)
 {
     bool parsed, ipv4 = false;
@@ -222,7 +210,7 @@ route_table_parse(struct ofpbuf *buf, struct route_table_msg *change)
                                  policy6, attrs, ARRAY_SIZE(policy6));
     } else {
         VLOG_DBG_RL(&rl, "received non AF_INET rtnetlink route message");
-        return false;
+        return 0;
     }
 
     if (parsed) {
@@ -255,7 +243,7 @@ route_table_parse(struct ofpbuf *buf, struct route_table_msg *change)
                 if (error == ENXIO) {
                     change->relevant = false;
                 } else {
-                    return false;
+                    return 0;
                 }
             }
         }
@@ -282,9 +270,11 @@ route_table_parse(struct ofpbuf *buf, struct route_table_msg *change)
         }
     } else {
         VLOG_DBG_RL(&rl, "received unparseable rtnetlink route message");
+        return 0;
     }
 
-    return parsed;
+    /* Success. */
+    return ipv4 ? RTNLGRP_IPV4_ROUTE : RTNLGRP_IPV6_ROUTE;
 }
 
 static void
index f4d8f22..5009cd5 100644 (file)
@@ -125,10 +125,11 @@ rtnetlink_parse(struct ofpbuf *buf, struct rtnetlink_change *change)
     return parsed;
 }
 
-static bool
+/* Return RTNLGRP_LINK on success, 0 on parse error. */
+static int
 rtnetlink_parse_cb(struct ofpbuf *buf, void *change)
 {
-    return rtnetlink_parse(buf, change);
+    return rtnetlink_parse(buf, change) ? RTNLGRP_LINK : 0;
 }
 
 /* Registers 'cb' to be called with auxiliary data 'aux' with network device
@@ -146,11 +147,10 @@ struct nln_notifier *
 rtnetlink_notifier_create(rtnetlink_notify_func *cb, void *aux)
 {
     if (!nln) {
-        nln = nln_create(NETLINK_ROUTE, RTNLGRP_LINK, rtnetlink_parse_cb,
-                         &rtn_change);
+        nln = nln_create(NETLINK_ROUTE, rtnetlink_parse_cb, &rtn_change);
     }
 
-    return nln_notifier_create(nln, (nln_notify_func *) cb, aux);
+    return nln_notifier_create(nln, RTNLGRP_LINK, (nln_notify_func *) cb, aux);
 }
 
 /* Destroys 'notifier', which must have previously been created with
index b18d9d6..62bef13 100644 (file)
@@ -30,12 +30,22 @@ struct test_change {
     struct ct_dpif_entry entry;
 };
 
-static bool
+static int
 event_parse(struct ofpbuf *buf, void *change_)
 {
     struct test_change *change = change_;
 
-    return nl_ct_parse_entry(buf, &change->entry, &change->type);
+    if (nl_ct_parse_entry(buf, &change->entry, &change->type)) {
+        switch (change->type) {
+        case NL_CT_EVENT_NEW:
+            return NFNLGRP_CONNTRACK_NEW;
+        case NL_CT_EVENT_UPDATE:
+            return NFNLGRP_CONNTRACK_UPDATE;
+        case NL_CT_EVENT_DELETE:
+            return NFNLGRP_CONNTRACK_DESTROY;
+        }
+    }
+    return 0;
 }
 
 static void
@@ -62,32 +72,29 @@ test_nl_ct_monitor(struct ovs_cmdl_context *ctx OVS_UNUSED)
         NFNLGRP_CONNTRACK_UPDATE,
     };
 
-    struct nln *nlns[ARRAY_SIZE(groups)];
+    struct nln *nln;
     struct nln_notifier *notifiers[ARRAY_SIZE(groups)];
 
     struct test_change change;
 
     unsigned i;
 
-    for (i = 0; i < ARRAY_SIZE(groups); i++) {
-        nlns[i] = nln_create(NETLINK_NETFILTER, groups[i], event_parse,
-                             &change);
+    nln = nln_create(NETLINK_NETFILTER, event_parse, &change);
 
-        notifiers[i] = nln_notifier_create(nlns[i], event_print, NULL);
+    for (i = 0; i < ARRAY_SIZE(groups); i++) {
+        notifiers[i] = nln_notifier_create(nln, groups[i], event_print, NULL);
     }
 
     for (;;) {
-        for (i = 0; i < ARRAY_SIZE(groups); i++) {
-            nln_run(nlns[i]);
-            nln_wait(nlns[i]);
-        }
+        nln_run(nln);
+        nln_wait(nln);
         poll_block();
     }
 
     for (i = 0; i < ARRAY_SIZE(groups); i++) {
         nln_notifier_destroy(notifiers[i]);
-        nln_destroy(nlns[i]);
     }
+    nln_destroy(nln);
 }
 \f
 /* Dump command */