ovsdb: Refactor JSON-RPC database server implementation.
authorBen Pfaff <blp@nicira.com>
Fri, 13 Nov 2009 22:47:25 +0000 (14:47 -0800)
committerBen Pfaff <blp@nicira.com>
Mon, 16 Nov 2009 18:55:29 +0000 (10:55 -0800)
This refactoring breaks up jsonrpc-server.c in a more modular fashion, in
preparation for adding code for table monitors.

ovsdb/jsonrpc-server.c
ovsdb/jsonrpc-server.h
ovsdb/ovsdb-server.c

index cea5ddc..a42696a 100644 (file)
 
 #include <errno.h>
 
+#include "column.h"
 #include "json.h"
 #include "jsonrpc.h"
 #include "ovsdb.h"
 #include "reconnect.h"
 #include "stream.h"
-#include "svec.h"
 #include "timeval.h"
 #include "trigger.h"
 
 #define THIS_MODULE VLM_ovsdb_jsonrpc_server
 #include "vlog.h"
 
-struct ovsdb_jsonrpc_trigger {
-    struct ovsdb_trigger trigger;
-    struct ovsdb_jsonrpc_session *session;
-    struct hmap_node hmap_node; /* Element in session's trigger table. */
-    struct json *id;
-};
-
-static struct ovsdb_jsonrpc_trigger *ovsdb_jsonrpc_trigger_find(
-    struct ovsdb_jsonrpc_session *, const struct json *id, size_t hash);
-static void ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger *);
-
-struct ovsdb_jsonrpc_session {
-    struct ovsdb_jsonrpc_server *server;
-    struct list node;           /* Element in server's sessions list. */
-    struct hmap triggers;
-    struct list completions;    /* Completed triggers. */
-
-    struct reconnect *reconnect; /* For back-off. */
-    bool active;                /* Active or passive connection? */
-    struct jsonrpc *rpc;
-    struct stream *stream;      /* Only if active == false and rpc == NULL. */
-};
+struct ovsdb_jsonrpc_session;
 
 static void ovsdb_jsonrpc_session_create_active(struct ovsdb_jsonrpc_server *,
                                                 const char *name);
 static void ovsdb_jsonrpc_session_create_passive(struct ovsdb_jsonrpc_server *,
                                                  struct stream *);
-static void ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *);
-static void ovsdb_jsonrpc_session_disconnect(struct ovsdb_jsonrpc_session *s);
-static int ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *);
-static void ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *);
-static void ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *,
-                                             struct jsonrpc_msg *);
-static void ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session *,
-                                             struct jsonrpc_msg *);
+static void ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_server *);
+static void ovsdb_jsonrpc_session_wait_all(struct ovsdb_jsonrpc_server *);
+
+static void ovsdb_jsonrpc_trigger_create(struct ovsdb_jsonrpc_session *,
+                                         struct json *id, struct json *params);
+static struct ovsdb_jsonrpc_trigger *ovsdb_jsonrpc_trigger_find(
+    struct ovsdb_jsonrpc_session *, const struct json *id, size_t hash);
+static void ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger *);
+static void ovsdb_jsonrpc_trigger_complete_all(struct ovsdb_jsonrpc_session *);
+static void ovsdb_jsonrpc_trigger_complete_done(
+    struct ovsdb_jsonrpc_session *);
+\f
+/* JSON-RPC database server. */
 
 struct ovsdb_jsonrpc_server {
     struct ovsdb *db;
@@ -78,50 +62,46 @@ struct ovsdb_jsonrpc_server {
     size_t n_listeners, allocated_listeners;
 };
 
-static void ovsdb_jsonrpc_server_listen(struct ovsdb_jsonrpc_server *,
-                                        struct pstream *);
-
-int
-ovsdb_jsonrpc_server_create(struct ovsdb *db, const struct svec *active,
-                            const struct svec *passive,
-                            struct ovsdb_jsonrpc_server **serverp)
+struct ovsdb_jsonrpc_server *
+ovsdb_jsonrpc_server_create(struct ovsdb *db)
 {
-    struct ovsdb_jsonrpc_server *server;
-    const char *name;
-    int retval = 0;
-    size_t i;
-
-    server = xzalloc(sizeof *server);
+    struct ovsdb_jsonrpc_server *server = xzalloc(sizeof *server);
     server->db = db;
     server->max_sessions = 64;
     server->max_triggers = 64;
     list_init(&server->sessions);
+    return server;
+}
 
-    SVEC_FOR_EACH (i, name, active) {
-        ovsdb_jsonrpc_session_create_active(server, name);
-    }
+int
+ovsdb_jsonrpc_server_listen(struct ovsdb_jsonrpc_server *svr, const char *name)
+{
+    struct pstream *pstream;
+    int error;
 
-    SVEC_FOR_EACH (i, name, passive) {
-        struct pstream *pstream;
-        int error;
+    error = pstream_open(name, &pstream);
+    if (error) {
+        return error;
+    }
 
-        error = pstream_open(name, &pstream);
-        if (!error) {
-            ovsdb_jsonrpc_server_listen(server, pstream);
-        } else {
-            ovs_error(error, "failed to listen on %s", name);
-            retval = error;
-        }
+    if (svr->n_listeners >= svr->allocated_listeners) {
+        svr->listeners = x2nrealloc(svr->listeners, &svr->allocated_listeners,
+                                    sizeof *svr->listeners);
     }
+    svr->listeners[svr->n_listeners++] = pstream;
+    return 0;
+}
 
-    *serverp = server;
-    return retval;
+void
+ovsdb_jsonrpc_server_connect(struct ovsdb_jsonrpc_server *svr,
+                             const char *name)
+{
+    ovsdb_jsonrpc_session_create_active(svr, name);
 }
 
 void
 ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *svr)
 {
-    struct ovsdb_jsonrpc_session *s, *next;
     size_t i;
 
     /* Accept new connections. */
@@ -144,20 +124,12 @@ ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *svr)
     }
 
     /* Handle each session. */
-    LIST_FOR_EACH_SAFE (s, next, struct ovsdb_jsonrpc_session, node,
-                        &svr->sessions) {
-        int error = ovsdb_jsonrpc_session_run(s);
-        if (error) {
-            ovsdb_jsonrpc_session_close(s);
-        }
-    }
+    ovsdb_jsonrpc_session_run_all(svr);
 }
 
 void
 ovsdb_jsonrpc_server_wait(struct ovsdb_jsonrpc_server *svr)
 {
-    struct ovsdb_jsonrpc_session *s;
-
     if (svr->n_sessions < svr->max_sessions) {
         size_t i;
 
@@ -166,62 +138,34 @@ ovsdb_jsonrpc_server_wait(struct ovsdb_jsonrpc_server *svr)
         }
     }
 
-    LIST_FOR_EACH (s, struct ovsdb_jsonrpc_session, node, &svr->sessions) {
-        ovsdb_jsonrpc_session_wait(s);
-    }
-}
-
-static void
-ovsdb_jsonrpc_server_listen(struct ovsdb_jsonrpc_server *svr,
-                            struct pstream *pstream)
-{
-    if (svr->n_listeners >= svr->allocated_listeners) {
-        svr->listeners = x2nrealloc(svr->listeners, &svr->allocated_listeners,
-                                    sizeof *svr->listeners);
-    }
-    svr->listeners[svr->n_listeners++] = pstream;
-}
-
-static struct ovsdb_jsonrpc_trigger *
-ovsdb_jsonrpc_trigger_find(struct ovsdb_jsonrpc_session *s,
-                           const struct json *id, size_t hash)
-{
-    struct ovsdb_jsonrpc_trigger *t;
-
-    HMAP_FOR_EACH_WITH_HASH (t, struct ovsdb_jsonrpc_trigger, hmap_node, hash,
-                             &s->triggers) {
-        if (json_equal(t->id, id)) {
-            return t;
-        }
-    }
-
-    return NULL;
+    ovsdb_jsonrpc_session_wait_all(svr);
 }
+\f
+/* JSON-RPC database server session. */
 
-static void
-ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger *t)
-{
-    struct ovsdb_jsonrpc_session *s = t->session;
+struct ovsdb_jsonrpc_session {
+    struct ovsdb_jsonrpc_server *server;
+    struct list node;           /* Element in server's sessions list. */
 
-    if (s->rpc && !jsonrpc_get_status(s->rpc)) {
-        struct jsonrpc_msg *reply;
-        struct json *result;
+    /* Triggers. */
+    struct hmap triggers;       /* Hmap of "struct ovsdb_jsonrpc_trigger"s. */
+    struct list completions;    /* Completed triggers. */
 
-        result = ovsdb_trigger_steal_result(&t->trigger);
-        if (result) {
-            reply = jsonrpc_create_reply(result, t->id);
-        } else {
-            reply = jsonrpc_create_error(json_string_create("canceled"),
-                                         t->id);
-        }
-        jsonrpc_send(s->rpc, reply);
-    }
+    /* Connecting and reconnecting. */
+    struct reconnect *reconnect; /* For back-off. */
+    bool active;                /* Active or passive connection? */
+    struct jsonrpc *rpc;
+    struct stream *stream;      /* Only if active == false and rpc == NULL. */
+};
 
-    json_destroy(t->id);
-    ovsdb_trigger_destroy(&t->trigger);
-    hmap_remove(&s->triggers, &t->hmap_node);
-    free(t);
-}
+static void ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *);
+static void ovsdb_jsonrpc_session_disconnect(struct ovsdb_jsonrpc_session *s);
+static int ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *);
+static void ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *);
+static void ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *,
+                                             struct jsonrpc_msg *);
+static void ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session *,
+                                             struct jsonrpc_msg *);
 
 static struct ovsdb_jsonrpc_session *
 ovsdb_jsonrpc_session_create(struct ovsdb_jsonrpc_server *svr,
@@ -275,14 +219,8 @@ ovsdb_jsonrpc_session_disconnect(struct ovsdb_jsonrpc_session *s)
 {
     reconnect_disconnected(s->reconnect, time_msec(), 0);
     if (s->rpc) {
-        struct ovsdb_jsonrpc_trigger *t, *next;
-
         jsonrpc_error(s->rpc, EOF);
-        HMAP_FOR_EACH_SAFE (t, next, struct ovsdb_jsonrpc_trigger, hmap_node,
-                            &s->triggers) {
-            ovsdb_jsonrpc_trigger_complete(t);
-        }
-
+        ovsdb_jsonrpc_trigger_complete_all(s);
         jsonrpc_close(s->rpc);
         s->rpc = NULL;
     } else if (s->stream) {
@@ -314,12 +252,7 @@ ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *s)
 
         jsonrpc_run(s->rpc);
 
-        while (!list_is_empty(&s->completions)) {
-            struct ovsdb_jsonrpc_trigger *t
-                = CONTAINER_OF(s->completions.next,
-                               struct ovsdb_jsonrpc_trigger, trigger.node);
-            ovsdb_jsonrpc_trigger_complete(t);
-        }
+        ovsdb_jsonrpc_trigger_complete_done(s);
 
         if (!jsonrpc_get_backlog(s->rpc) && !jsonrpc_recv(s->rpc, &msg)) {
             reconnect_received(s->reconnect, time_msec());
@@ -374,7 +307,20 @@ ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *s)
         break;
     }
     return s->active || s->rpc ? 0 : ETIMEDOUT;
+}
 
+static void
+ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_server *svr)
+{
+    struct ovsdb_jsonrpc_session *s, *next;
+
+    LIST_FOR_EACH_SAFE (s, next, struct ovsdb_jsonrpc_session, node,
+                        &svr->sessions) {
+        int error = ovsdb_jsonrpc_session_run(s);
+        if (error) {
+            ovsdb_jsonrpc_session_close(s);
+        }
+    }
 }
 
 static void
@@ -391,38 +337,23 @@ ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *s)
     reconnect_wait(s->reconnect, time_msec());
 }
 
-static struct jsonrpc_msg *
-execute_transaction(struct ovsdb_jsonrpc_session *s,
-                    struct jsonrpc_msg *request)
+static void
+ovsdb_jsonrpc_session_wait_all(struct ovsdb_jsonrpc_server *svr)
 {
-    struct ovsdb_jsonrpc_trigger *t;
-    size_t hash;
+    struct ovsdb_jsonrpc_session *s;
 
-    /* Check for duplicate ID. */
-    hash = json_hash(request->id, 0);
-    t = ovsdb_jsonrpc_trigger_find(s, request->id, hash);
-    if (t) {
-        return jsonrpc_create_error(
-            json_string_create("duplicate request ID"), request->id);
+    LIST_FOR_EACH (s, struct ovsdb_jsonrpc_session, node, &svr->sessions) {
+        ovsdb_jsonrpc_session_wait(s);
     }
+}
 
-    /* Insert into trigger table. */
-    t = xmalloc(sizeof *t);
-    ovsdb_trigger_init(s->server->db,
-                       &t->trigger, request->params, &s->completions,
-                       time_msec());
-    t->session = s;
-    t->id = request->id;
-    hmap_insert(&s->triggers, &t->hmap_node, hash);
-
+static struct jsonrpc_msg *
+execute_transaction(struct ovsdb_jsonrpc_session *s,
+                    struct jsonrpc_msg *request)
+{
+    ovsdb_jsonrpc_trigger_create(s, request->id, request->params);
     request->id = NULL;
     request->params = NULL;
-
-    /* Complete early if possible. */
-    if (ovsdb_trigger_is_complete(&t->trigger)) {
-        ovsdb_jsonrpc_trigger_complete(t);
-    }
-
     return NULL;
 }
 
@@ -474,3 +405,110 @@ ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session *s,
     }
     jsonrpc_msg_destroy(request);
 }
+\f
+/* JSON-RPC database server triggers.
+ *
+ * (Every transaction is treated as a trigger even if it doesn't actually have
+ * any "wait" operations.) */
+
+struct ovsdb_jsonrpc_trigger {
+    struct ovsdb_trigger trigger;
+    struct ovsdb_jsonrpc_session *session;
+    struct hmap_node hmap_node; /* In session's "triggers" hmap. */
+    struct json *id;
+};
+
+static void
+ovsdb_jsonrpc_trigger_create(struct ovsdb_jsonrpc_session *s,
+                             struct json *id, struct json *params)
+{
+    struct ovsdb_jsonrpc_trigger *t;
+    size_t hash;
+
+    /* Check for duplicate ID. */
+    hash = json_hash(id, 0);
+    t = ovsdb_jsonrpc_trigger_find(s, id, hash);
+    if (t) {
+        jsonrpc_send(s->rpc, jsonrpc_create_error(
+                         json_string_create("duplicate request ID"), id));
+        json_destroy(id);
+        json_destroy(params);
+        return;
+    }
+
+    /* Insert into trigger table. */
+    t = xmalloc(sizeof *t);
+    ovsdb_trigger_init(s->server->db,
+                       &t->trigger, params, &s->completions,
+                       time_msec());
+    t->session = s;
+    t->id = id;
+    hmap_insert(&s->triggers, &t->hmap_node, hash);
+
+    /* Complete early if possible. */
+    if (ovsdb_trigger_is_complete(&t->trigger)) {
+        ovsdb_jsonrpc_trigger_complete(t);
+    }
+}
+
+static struct ovsdb_jsonrpc_trigger *
+ovsdb_jsonrpc_trigger_find(struct ovsdb_jsonrpc_session *s,
+                           const struct json *id, size_t hash)
+{
+    struct ovsdb_jsonrpc_trigger *t;
+
+    HMAP_FOR_EACH_WITH_HASH (t, struct ovsdb_jsonrpc_trigger, hmap_node, hash,
+                             &s->triggers) {
+        if (json_equal(t->id, id)) {
+            return t;
+        }
+    }
+
+    return NULL;
+}
+
+static void
+ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger *t)
+{
+    struct ovsdb_jsonrpc_session *s = t->session;
+
+    if (s->rpc && !jsonrpc_get_status(s->rpc)) {
+        struct jsonrpc_msg *reply;
+        struct json *result;
+
+        result = ovsdb_trigger_steal_result(&t->trigger);
+        if (result) {
+            reply = jsonrpc_create_reply(result, t->id);
+        } else {
+            reply = jsonrpc_create_error(json_string_create("canceled"),
+                                         t->id);
+        }
+        jsonrpc_send(s->rpc, reply);
+    }
+
+    json_destroy(t->id);
+    ovsdb_trigger_destroy(&t->trigger);
+    hmap_remove(&s->triggers, &t->hmap_node);
+    free(t);
+}
+
+static void
+ovsdb_jsonrpc_trigger_complete_all(struct ovsdb_jsonrpc_session *s)
+{
+    struct ovsdb_jsonrpc_trigger *t, *next;
+    HMAP_FOR_EACH_SAFE (t, next, struct ovsdb_jsonrpc_trigger, hmap_node,
+                        &s->triggers) {
+        ovsdb_jsonrpc_trigger_complete(t);
+    }
+}
+
+static void
+ovsdb_jsonrpc_trigger_complete_done(struct ovsdb_jsonrpc_session *s)
+{
+    while (!list_is_empty(&s->completions)) {
+        struct ovsdb_jsonrpc_trigger *t
+            = CONTAINER_OF(s->completions.next,
+                           struct ovsdb_jsonrpc_trigger, trigger.node);
+        ovsdb_jsonrpc_trigger_complete(t);
+    }
+}
index 49b5f8a..0a80ac6 100644 (file)
 #define OVSDB_JSONRPC_SERVER_H 1
 
 struct ovsdb;
-struct ovsdb_jsonrpc_server;
-struct svec;
 
-int ovsdb_jsonrpc_server_create(struct ovsdb *, const struct svec *active,
-                                const struct svec *passive,
-                                struct ovsdb_jsonrpc_server **);
+struct ovsdb_jsonrpc_server *ovsdb_jsonrpc_server_create(struct ovsdb *);
+
+int ovsdb_jsonrpc_server_listen(struct ovsdb_jsonrpc_server *,
+                                const char *name);
+void ovsdb_jsonrpc_server_connect(struct ovsdb_jsonrpc_server *,
+                                  const char *name);
+
 void ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *);
 void ovsdb_jsonrpc_server_wait(struct ovsdb_jsonrpc_server *);
 
index a91e778..00c465d 100644 (file)
@@ -57,8 +57,10 @@ main(int argc, char *argv[])
     struct svec active, passive;
     struct ovsdb_error *error;
     struct ovsdb *db;
+    const char *name;
     char *file_name;
     int retval;
+    size_t i;
 
     set_program_name(argv[0]);
     register_fault_handlers();
@@ -74,9 +76,15 @@ main(int argc, char *argv[])
         ovs_fatal(0, "%s", ovsdb_error_to_string(error));
     }
 
-    retval = ovsdb_jsonrpc_server_create(db, &active, &passive, &jsonrpc);
-    if (retval) {
-        ovs_fatal(retval, "failed to initialize JSON-RPC server for OVSDB");
+    jsonrpc = ovsdb_jsonrpc_server_create(db);
+    SVEC_FOR_EACH (i, name, &active) {
+        ovsdb_jsonrpc_server_connect(jsonrpc, name);
+    }
+    SVEC_FOR_EACH (i, name, &passive) {
+        retval = ovsdb_jsonrpc_server_listen(jsonrpc, name);
+        if (retval) {
+            ovs_fatal(retval, "failed to listen on %s", name);
+        }
     }
     svec_destroy(&active);
     svec_destroy(&passive);