jsonrpc-server: make setting mt->select into its own functions
[cascardo/ovs.git] / ovsdb / jsonrpc-server.c
index 692830c..a70ed38 100644 (file)
@@ -1,4 +1,4 @@
-/* Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014 Nicira, Inc.
+/* Copyright (c) 2009, 2010, 2011, 2012, 2013, 2014, 2015 Nicira, Inc.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -37,7 +37,7 @@
 #include "timeval.h"
 #include "transaction.h"
 #include "trigger.h"
-#include "vlog.h"
+#include "openvswitch/vlog.h"
 
 VLOG_DEFINE_THIS_MODULE(ovsdb_jsonrpc_server);
 
@@ -78,8 +78,9 @@ static void ovsdb_jsonrpc_trigger_complete_done(
     struct ovsdb_jsonrpc_session *);
 
 /* Monitors. */
-static struct json *ovsdb_jsonrpc_monitor_create(
-    struct ovsdb_jsonrpc_session *, struct ovsdb *, struct json *params);
+static struct jsonrpc_msg *ovsdb_jsonrpc_monitor_create(
+    struct ovsdb_jsonrpc_session *, struct ovsdb *, struct json *params,
+    const struct json *request_id);
 static struct jsonrpc_msg *ovsdb_jsonrpc_monitor_cancel(
     struct ovsdb_jsonrpc_session *,
     struct json_array *params,
@@ -102,7 +103,7 @@ struct ovsdb_jsonrpc_server {
 struct ovsdb_jsonrpc_remote {
     struct ovsdb_jsonrpc_server *server;
     struct pstream *listener;   /* Listener, if passive. */
-    struct list sessions;       /* List of "struct ovsdb_jsonrpc_session"s. */
+    struct ovs_list sessions;   /* List of "struct ovsdb_jsonrpc_session"s. */
     uint8_t dscp;
 };
 
@@ -121,7 +122,7 @@ ovsdb_jsonrpc_server_create(void)
 {
     struct ovsdb_jsonrpc_server *server = xzalloc(sizeof *server);
     ovsdb_server_init(&server->up);
-    server->max_sessions = 64;
+    server->max_sessions = 330;   /* Random limit. */
     shash_init(&server->remotes);
     return server;
 }
@@ -198,10 +199,16 @@ ovsdb_jsonrpc_server_set_remotes(struct ovsdb_jsonrpc_server *svr,
     struct shash_node *node, *next;
 
     SHASH_FOR_EACH_SAFE (node, next, &svr->remotes) {
-        if (!shash_find(new_remotes, node->name)) {
+        struct ovsdb_jsonrpc_remote *remote = node->data;
+        struct ovsdb_jsonrpc_options *options
+            = shash_find_data(new_remotes, node->name);
+
+        if (!options) {
             VLOG_INFO("%s: remote deconfigured", node->name);
             ovsdb_jsonrpc_server_del_remote(node);
-        }
+        } else if (options->dscp != remote->dscp) {
+            ovsdb_jsonrpc_server_del_remote(node);
+         }
     }
     SHASH_FOR_EACH (node, new_remotes) {
         const struct ovsdb_jsonrpc_options *options = node->data;
@@ -308,20 +315,26 @@ ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *svr)
     SHASH_FOR_EACH (node, &svr->remotes) {
         struct ovsdb_jsonrpc_remote *remote = node->data;
 
-        if (remote->listener && svr->n_sessions < svr->max_sessions) {
-            struct stream *stream;
-            int error;
-
-            error = pstream_accept(remote->listener, &stream);
-            if (!error) {
-                struct jsonrpc_session *js;
-                js = jsonrpc_session_open_unreliably(jsonrpc_open(stream),
-                                                     remote->dscp);
-                ovsdb_jsonrpc_session_create(remote, js);
-            } else if (error != EAGAIN) {
-                VLOG_WARN_RL(&rl, "%s: accept failed: %s",
+        if (remote->listener) {
+            if (svr->n_sessions < svr->max_sessions) {
+                struct stream *stream;
+                int error;
+
+                error = pstream_accept(remote->listener, &stream);
+                if (!error) {
+                    struct jsonrpc_session *js;
+                    js = jsonrpc_session_open_unreliably(jsonrpc_open(stream),
+                                                         remote->dscp);
+                    ovsdb_jsonrpc_session_create(remote, js);
+                } else if (error != EAGAIN) {
+                    VLOG_WARN_RL(&rl, "%s: accept failed: %s",
+                                 pstream_get_name(remote->listener),
+                                 ovs_strerror(error));
+                }
+            } else {
+                VLOG_WARN_RL(&rl, "%s: connection exceeded maximum (%d)",
                              pstream_get_name(remote->listener),
-                             ovs_strerror(error));
+                             svr->max_sessions);
             }
         }
 
@@ -364,7 +377,7 @@ ovsdb_jsonrpc_server_get_memory_usage(const struct ovsdb_jsonrpc_server *svr,
 /* JSON-RPC database server session. */
 
 struct ovsdb_jsonrpc_session {
-    struct list node;           /* Element in remote's sessions list. */
+    struct ovs_list node;       /* Element in remote's sessions list. */
     struct ovsdb_session up;
     struct ovsdb_jsonrpc_remote *remote;
 
@@ -384,8 +397,6 @@ static int ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session *);
 static void ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *);
 static void ovsdb_jsonrpc_session_get_memory_usage(
     const struct ovsdb_jsonrpc_session *, struct simap *usage);
-static void ovsdb_jsonrpc_session_set_options(
-    struct ovsdb_jsonrpc_session *, const struct ovsdb_jsonrpc_options *);
 static void ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *,
                                              struct jsonrpc_msg *);
 static void ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session *,
@@ -556,7 +567,10 @@ ovsdb_jsonrpc_session_reconnect_all(struct ovsdb_jsonrpc_remote *remote)
 }
 
 /* Sets the options for all of the JSON-RPC sessions managed by 'remote' to
- * 'options'. */
+ * 'options'.
+ *
+ * (The dscp value can't be changed directly; the caller must instead close and
+ * re-open the session.) */
 static void
 ovsdb_jsonrpc_session_set_all_options(
     struct ovsdb_jsonrpc_remote *remote,
@@ -564,22 +578,6 @@ ovsdb_jsonrpc_session_set_all_options(
 {
     struct ovsdb_jsonrpc_session *s;
 
-    if (remote->listener) {
-        int error;
-
-        error = pstream_set_dscp(remote->listener, options->dscp);
-        if (error) {
-            VLOG_ERR("%s: set_dscp failed %s",
-                     pstream_get_name(remote->listener), ovs_strerror(error));
-        } else {
-            remote->dscp = options->dscp;
-        }
-        /*
-         * XXX race window between setting dscp to listening socket
-         * and accepting socket. Accepted socket may have old dscp value.
-         * Ignore this race window for now.
-         */
-    }
     LIST_FOR_EACH (s, node, &remote->sessions) {
         ovsdb_jsonrpc_session_set_options(s, options);
     }
@@ -677,7 +675,7 @@ ovsdb_jsonrpc_lookup_db(const struct ovsdb_jsonrpc_session *s,
     return db;
 
 error:
-    *replyp = jsonrpc_create_reply(ovsdb_error_to_json(error), request->id);
+    *replyp = jsonrpc_create_error(ovsdb_error_to_json(error), request->id);
     ovsdb_error_destroy(error);
     return NULL;
 }
@@ -755,7 +753,7 @@ ovsdb_jsonrpc_session_lock(struct ovsdb_jsonrpc_session *s,
     return jsonrpc_create_reply(result, request->id);
 
 error:
-    reply = jsonrpc_create_reply(ovsdb_error_to_json(error), request->id);
+    reply = jsonrpc_create_error(ovsdb_error_to_json(error), request->id);
     ovsdb_error_destroy(error);
     return reply;
 }
@@ -815,7 +813,7 @@ ovsdb_jsonrpc_session_unlock(struct ovsdb_jsonrpc_session *s,
     return jsonrpc_create_reply(json_object_create(), request->id);
 
 error:
-    reply = jsonrpc_create_reply(ovsdb_error_to_json(error), request->id);
+    reply = jsonrpc_create_error(ovsdb_error_to_json(error), request->id);
     ovsdb_error_destroy(error);
     return reply;
 }
@@ -845,9 +843,8 @@ ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *s,
     } else if (!strcmp(request->method, "monitor")) {
         struct ovsdb *db = ovsdb_jsonrpc_lookup_db(s, request, &reply);
         if (!reply) {
-            reply = jsonrpc_create_reply(
-                ovsdb_jsonrpc_monitor_create(s, db, request->params),
-                request->id);
+            reply = ovsdb_jsonrpc_monitor_create(s, db, request->params,
+                                                 request->id);
         }
     } else if (!strcmp(request->method, "monitor_cancel")) {
         reply = ovsdb_jsonrpc_monitor_cancel(s, json_array(request->params),
@@ -1029,9 +1026,9 @@ ovsdb_jsonrpc_trigger_complete_done(struct ovsdb_jsonrpc_session *s)
     }
 }
 \f
-/* JSON-RPC database table monitors. */
+/* database table monitors. */
 
-enum ovsdb_jsonrpc_monitor_selection {
+enum ovsdb_monitor_selection {
     OJMS_INITIAL = 1 << 0,      /* All rows when monitor is created. */
     OJMS_INSERT = 1 << 1,       /* New rows. */
     OJMS_DELETE = 1 << 2,       /* Deleted rows. */
@@ -1039,13 +1036,13 @@ enum ovsdb_jsonrpc_monitor_selection {
 };
 
 /* A particular column being monitored. */
-struct ovsdb_jsonrpc_monitor_column {
+struct ovsdb_monitor_column {
     const struct ovsdb_column *column;
-    enum ovsdb_jsonrpc_monitor_selection select;
+    enum ovsdb_monitor_selection select;
 };
 
 /* A row that has changed in a monitored table. */
-struct ovsdb_jsonrpc_monitor_row {
+struct ovsdb_monitor_row {
     struct hmap_node hmap_node; /* In ovsdb_jsonrpc_monitor_table.changes. */
     struct uuid uuid;           /* UUID of row that changed. */
     struct ovsdb_datum *old;    /* Old data, NULL for an inserted row. */
@@ -1053,38 +1050,49 @@ struct ovsdb_jsonrpc_monitor_row {
 };
 
 /* A particular table being monitored. */
-struct ovsdb_jsonrpc_monitor_table {
+struct ovsdb_monitor_table {
     const struct ovsdb_table *table;
 
     /* This is the union (bitwise-OR) of the 'select' values in all of the
      * members of 'columns' below. */
-    enum ovsdb_jsonrpc_monitor_selection select;
+    enum ovsdb_monitor_selection select;
 
     /* Columns being monitored. */
-    struct ovsdb_jsonrpc_monitor_column *columns;
+    struct ovsdb_monitor_column *columns;
     size_t n_columns;
 
-    /* Contains 'struct ovsdb_jsonrpc_monitor_row's for rows that have been
+    /* Contains 'struct ovsdb_monitor_row's for rows that have been
      * updated but not yet flushed to the jsonrpc connection. */
     struct hmap changes;
 };
 
+struct ovsdb_jsonrpc_monitor;
+/*  Backend monitor.
+ *
+ *  ovsdb_monitor keep track of the ovsdb changes.
+ */
 /* A collection of tables being monitored. */
-struct ovsdb_jsonrpc_monitor {
+struct ovsdb_monitor {
     struct ovsdb_replica replica;
+    struct shash tables;     /* Holds "struct ovsdb_monitor_table"s. */
+    struct ovsdb_jsonrpc_monitor *jsonrpc_monitor;
+};
+
+/* Jsonrpc front end monitor. */
+struct ovsdb_jsonrpc_monitor {
     struct ovsdb_jsonrpc_session *session;
     struct ovsdb *db;
     struct hmap_node node;      /* In ovsdb_jsonrpc_session's "monitors". */
 
     struct json *monitor_id;
-    struct shash tables;     /* Holds "struct ovsdb_jsonrpc_monitor_table"s. */
+    struct ovsdb_monitor *dbmon;
 };
 
 static const struct ovsdb_replica_class ovsdb_jsonrpc_replica_class;
 
 struct ovsdb_jsonrpc_monitor *ovsdb_jsonrpc_monitor_find(
     struct ovsdb_jsonrpc_session *, const struct json *monitor_id);
-static void ovsdb_jsonrpc_monitor_destroy(struct ovsdb_replica *);
+static void ovsdb_monitor_destroy(struct ovsdb_replica *);
 static struct json *ovsdb_jsonrpc_monitor_get_initial(
     const struct ovsdb_jsonrpc_monitor *);
 
@@ -1113,12 +1121,12 @@ ovsdb_jsonrpc_monitor_find(struct ovsdb_jsonrpc_session *s,
 }
 
 static void
-ovsdb_jsonrpc_add_monitor_column(struct ovsdb_jsonrpc_monitor_table *mt,
-                                 const struct ovsdb_column *column,
-                                 enum ovsdb_jsonrpc_monitor_selection select,
-                                 size_t *allocated_columns)
+ovsdb_add_monitor_column(struct ovsdb_monitor_table *mt,
+                         const struct ovsdb_column *column,
+                         enum ovsdb_monitor_selection select,
+                         size_t *allocated_columns)
 {
-    struct ovsdb_jsonrpc_monitor_column *c;
+    struct ovsdb_monitor_column *c;
 
     if (mt->n_columns >= *allocated_columns) {
         mt->columns = x2nrealloc(mt->columns, allocated_columns,
@@ -1131,21 +1139,28 @@ ovsdb_jsonrpc_add_monitor_column(struct ovsdb_jsonrpc_monitor_table *mt,
 }
 
 static int
-compare_ovsdb_jsonrpc_monitor_column(const void *a_, const void *b_)
+compare_ovsdb_monitor_column(const void *a_, const void *b_)
 {
-    const struct ovsdb_jsonrpc_monitor_column *a = a_;
-    const struct ovsdb_jsonrpc_monitor_column *b = b_;
+    const struct ovsdb_monitor_column *a = a_;
+    const struct ovsdb_monitor_column *b = b_;
 
     return a->column < b->column ? -1 : a->column > b->column;
 }
 
-static struct ovsdb_error * WARN_UNUSED_RESULT
-ovsdb_jsonrpc_parse_monitor_request(struct ovsdb_jsonrpc_monitor_table *mt,
+static void
+ovsdb_monitor_add_select(struct ovsdb_monitor_table *mt,
+                         enum ovsdb_monitor_selection select)
+{
+    mt->select |= select;
+}
+
+static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
+ovsdb_jsonrpc_parse_monitor_request(struct ovsdb_monitor_table *mt,
                                     const struct json *monitor_request,
                                     size_t *allocated_columns)
 {
     const struct ovsdb_table_schema *ts = mt->table->schema;
-    enum ovsdb_jsonrpc_monitor_selection select;
+    enum ovsdb_monitor_selection select;
     const struct json *columns, *select_json;
     struct ovsdb_parser parser;
     struct ovsdb_error *error;
@@ -1181,8 +1196,8 @@ ovsdb_jsonrpc_parse_monitor_request(struct ovsdb_jsonrpc_monitor_table *mt,
     } else {
         select = OJMS_INITIAL | OJMS_INSERT | OJMS_DELETE | OJMS_MODIFY;
     }
-    mt->select |= select;
 
+    ovsdb_monitor_add_select(mt, select);
     if (columns) {
         size_t i;
 
@@ -1206,8 +1221,7 @@ ovsdb_jsonrpc_parse_monitor_request(struct ovsdb_jsonrpc_monitor_table *mt,
                 return ovsdb_syntax_error(columns, NULL, "%s is not a valid "
                                           "column name", s);
             }
-            ovsdb_jsonrpc_add_monitor_column(mt, column, select,
-                                             allocated_columns);
+            ovsdb_add_monitor_column(mt, column, select, allocated_columns);
         }
     } else {
         struct shash_node *node;
@@ -1215,8 +1229,8 @@ ovsdb_jsonrpc_parse_monitor_request(struct ovsdb_jsonrpc_monitor_table *mt,
         SHASH_FOR_EACH (node, &ts->columns) {
             const struct ovsdb_column *column = node->data;
             if (column->index != OVSDB_COL_UUID) {
-                ovsdb_jsonrpc_add_monitor_column(mt, column, select,
-                                                 allocated_columns);
+                ovsdb_add_monitor_column(mt, column, select,
+                                         allocated_columns);
             }
         }
     }
@@ -1224,9 +1238,27 @@ ovsdb_jsonrpc_parse_monitor_request(struct ovsdb_jsonrpc_monitor_table *mt,
     return NULL;
 }
 
-static struct json *
+static struct ovsdb_monitor *
+ovsdb_monitor_create(struct ovsdb *db,
+                     struct ovsdb_jsonrpc_monitor *jsonrpc_monitor,
+                     const struct ovsdb_replica_class *replica_class)
+{
+    struct ovsdb_monitor *m;
+
+    m = xzalloc(sizeof *m);
+
+    ovsdb_replica_init(&m->replica, replica_class);
+    ovsdb_add_replica(db, &m->replica);
+    m->jsonrpc_monitor = jsonrpc_monitor;
+    shash_init(&m->tables);
+
+    return m;
+}
+
+static struct jsonrpc_msg *
 ovsdb_jsonrpc_monitor_create(struct ovsdb_jsonrpc_session *s, struct ovsdb *db,
-                             struct json *params)
+                             struct json *params,
+                             const struct json *request_id)
 {
     struct ovsdb_jsonrpc_monitor *m = NULL;
     struct json *monitor_id, *monitor_requests;
@@ -1252,17 +1284,15 @@ ovsdb_jsonrpc_monitor_create(struct ovsdb_jsonrpc_session *s, struct ovsdb *db,
     }
 
     m = xzalloc(sizeof *m);
-    ovsdb_replica_init(&m->replica, &ovsdb_jsonrpc_replica_class);
-    ovsdb_add_replica(db, &m->replica);
     m->session = s;
     m->db = db;
+    m->dbmon = ovsdb_monitor_create(db, m, &ovsdb_jsonrpc_replica_class);
     hmap_insert(&s->monitors, &m->node, json_hash(monitor_id, 0));
     m->monitor_id = json_clone(monitor_id);
-    shash_init(&m->tables);
 
     SHASH_FOR_EACH (node, json_object(monitor_requests)) {
         const struct ovsdb_table *table;
-        struct ovsdb_jsonrpc_monitor_table *mt;
+        struct ovsdb_monitor_table *mt;
         size_t allocated_columns;
         const struct json *mr_value;
         size_t i;
@@ -1277,7 +1307,7 @@ ovsdb_jsonrpc_monitor_create(struct ovsdb_jsonrpc_session *s, struct ovsdb *db,
         mt = xzalloc(sizeof *mt);
         mt->table = table;
         hmap_init(&mt->changes);
-        shash_add(&m->tables, table->schema->name, mt);
+        shash_add(&m->dbmon->tables, table->schema->name, mt);
 
         /* Parse columns. */
         mr_value = node->data;
@@ -1302,7 +1332,7 @@ ovsdb_jsonrpc_monitor_create(struct ovsdb_jsonrpc_session *s, struct ovsdb *db,
 
         /* Check for duplicate columns. */
         qsort(mt->columns, mt->n_columns, sizeof *mt->columns,
-              compare_ovsdb_jsonrpc_monitor_column);
+              compare_ovsdb_monitor_column);
         for (i = 1; i < mt->n_columns; i++) {
             if (mt->columns[i].column == mt->columns[i - 1].column) {
                 error = ovsdb_syntax_error(mr_value, NULL, "column %s "
@@ -1313,16 +1343,17 @@ ovsdb_jsonrpc_monitor_create(struct ovsdb_jsonrpc_session *s, struct ovsdb *db,
         }
     }
 
-    return ovsdb_jsonrpc_monitor_get_initial(m);
+    return jsonrpc_create_reply(ovsdb_jsonrpc_monitor_get_initial(m),
+                                request_id);
 
 error:
     if (m) {
-        ovsdb_remove_replica(m->db, &m->replica);
+        ovsdb_remove_replica(m->db, &m->dbmon->replica);
     }
 
     json = ovsdb_error_to_json(error);
     ovsdb_error_destroy(error);
-    return json;
+    return jsonrpc_create_error(json, request_id);
 }
 
 static struct jsonrpc_msg *
@@ -1341,7 +1372,7 @@ ovsdb_jsonrpc_monitor_cancel(struct ovsdb_jsonrpc_session *s,
             return jsonrpc_create_error(json_string_create("unknown monitor"),
                                         request_id);
         } else {
-            ovsdb_remove_replica(m->db, &m->replica);
+            ovsdb_remove_replica(m->db, &m->dbmon->replica);
             return jsonrpc_create_reply(json_object_create(), request_id);
         }
     }
@@ -1353,29 +1384,29 @@ ovsdb_jsonrpc_monitor_remove_all(struct ovsdb_jsonrpc_session *s)
     struct ovsdb_jsonrpc_monitor *m, *next;
 
     HMAP_FOR_EACH_SAFE (m, next, node, &s->monitors) {
-        ovsdb_remove_replica(m->db, &m->replica);
+        ovsdb_remove_replica(m->db, &m->dbmon->replica);
     }
 }
 
-static struct ovsdb_jsonrpc_monitor *
-ovsdb_jsonrpc_monitor_cast(struct ovsdb_replica *replica)
+static struct ovsdb_monitor *
+ovsdb_monitor_cast(struct ovsdb_replica *replica)
 {
     ovs_assert(replica->class == &ovsdb_jsonrpc_replica_class);
-    return CONTAINER_OF(replica, struct ovsdb_jsonrpc_monitor, replica);
+    return CONTAINER_OF(replica, struct ovsdb_monitor, replica);
 }
 
-struct ovsdb_jsonrpc_monitor_aux {
-    const struct ovsdb_jsonrpc_monitor *monitor;
-    struct ovsdb_jsonrpc_monitor_table *mt;
+struct ovsdb_monitor_aux {
+    const struct ovsdb_monitor *monitor;
+    struct ovsdb_monitor_table *mt;
 };
 
-/* Finds and returns the ovsdb_jsonrpc_monitor_row in 'mt->changes' for the
+/* Finds and returns the ovsdb_monitor_row in 'mt->changes' for the
  * given 'uuid', or NULL if there is no such row. */
-static struct ovsdb_jsonrpc_monitor_row *
-ovsdb_jsonrpc_monitor_row_find(const struct ovsdb_jsonrpc_monitor_table *mt,
-                               const struct uuid *uuid)
+static struct ovsdb_monitor_row *
+ovsdb_monitor_row_find(const struct ovsdb_monitor_table *mt,
+                       const struct uuid *uuid)
 {
-    struct ovsdb_jsonrpc_monitor_row *row;
+    struct ovsdb_monitor_row *row;
 
     HMAP_FOR_EACH_WITH_HASH (row, hmap_node, uuid_hash(uuid), &mt->changes) {
         if (uuid_equals(uuid, &row->uuid)) {
@@ -1391,7 +1422,7 @@ ovsdb_jsonrpc_monitor_row_find(const struct ovsdb_jsonrpc_monitor_table *mt,
  *
  * If 'row' is NULL, returns NULL. */
 static struct ovsdb_datum *
-clone_monitor_row_data(const struct ovsdb_jsonrpc_monitor_table *mt,
+clone_monitor_row_data(const struct ovsdb_monitor_table *mt,
                        const struct ovsdb_row *row)
 {
     struct ovsdb_datum *data;
@@ -1416,7 +1447,7 @@ clone_monitor_row_data(const struct ovsdb_jsonrpc_monitor_table *mt,
 /* Replaces the mt->n_columns ovsdb_datums in row[] by copies of the data from
  * in 'row' drawn from the columns represented by mt->columns[]. */
 static void
-update_monitor_row_data(const struct ovsdb_jsonrpc_monitor_table *mt,
+update_monitor_row_data(const struct ovsdb_monitor_table *mt,
                         const struct ovsdb_row *row,
                         struct ovsdb_datum *data)
 {
@@ -1438,7 +1469,7 @@ update_monitor_row_data(const struct ovsdb_jsonrpc_monitor_table *mt,
 /* Frees all of the mt->n_columns ovsdb_datums in data[], using the types taken
  * from mt->columns[], plus 'data' itself. */
 static void
-free_monitor_row_data(const struct ovsdb_jsonrpc_monitor_table *mt,
+free_monitor_row_data(const struct ovsdb_monitor_table *mt,
                       struct ovsdb_datum *data)
 {
     if (data) {
@@ -1455,8 +1486,8 @@ free_monitor_row_data(const struct ovsdb_jsonrpc_monitor_table *mt,
 
 /* Frees 'row', which must have been created from 'mt'. */
 static void
-ovsdb_jsonrpc_monitor_row_destroy(const struct ovsdb_jsonrpc_monitor_table *mt,
-                                  struct ovsdb_jsonrpc_monitor_row *row)
+ovsdb_monitor_row_destroy(const struct ovsdb_monitor_table *mt,
+                          struct ovsdb_monitor_row *row)
 {
     if (row) {
         free_monitor_row_data(mt, row->old);
@@ -1466,17 +1497,17 @@ ovsdb_jsonrpc_monitor_row_destroy(const struct ovsdb_jsonrpc_monitor_table *mt,
 }
 
 static bool
-ovsdb_jsonrpc_monitor_change_cb(const struct ovsdb_row *old,
-                                const struct ovsdb_row *new,
-                                const unsigned long int *changed OVS_UNUSED,
-                                void *aux_)
+ovsdb_monitor_change_cb(const struct ovsdb_row *old,
+                        const struct ovsdb_row *new,
+                        const unsigned long int *changed OVS_UNUSED,
+                        void *aux_)
 {
-    struct ovsdb_jsonrpc_monitor_aux *aux = aux_;
-    const struct ovsdb_jsonrpc_monitor *m = aux->monitor;
+    struct ovsdb_monitor_aux *aux = aux_;
+    const struct ovsdb_monitor *m = aux->monitor;
     struct ovsdb_table *table = new ? new->table : old->table;
     const struct uuid *uuid = ovsdb_row_get_uuid(new ? new : old);
-    struct ovsdb_jsonrpc_monitor_row *change;
-    struct ovsdb_jsonrpc_monitor_table *mt;
+    struct ovsdb_monitor_row *change;
+    struct ovsdb_monitor_table *mt;
 
     if (!aux->mt || table != aux->mt->table) {
         aux->mt = shash_find_data(&m->tables, table->schema->name);
@@ -1488,7 +1519,7 @@ ovsdb_jsonrpc_monitor_change_cb(const struct ovsdb_row *old,
     }
     mt = aux->mt;
 
-    change = ovsdb_jsonrpc_monitor_row_find(mt, uuid);
+    change = ovsdb_monitor_row_find(mt, uuid);
     if (!change) {
         change = xmalloc(sizeof *change);
         hmap_insert(&mt->changes, &change->hmap_node, uuid_hash(uuid));
@@ -1512,8 +1543,8 @@ ovsdb_jsonrpc_monitor_change_cb(const struct ovsdb_row *old,
     return true;
 }
 
-/* Returns JSON for a <row-update> (as described in ovsdb/SPECS) for 'row'
- * within 'mt', or NULL if no row update should be sent.
+/* Returns JSON for a <row-update> (as described in RFC 7047) for 'row' within
+ * 'mt', or NULL if no row update should be sent.
  *
  * The caller should specify 'initial' as true if the returned JSON is going to
  * be used as part of the initial reply to a "monitor" request, false if it is
@@ -1522,12 +1553,12 @@ ovsdb_jsonrpc_monitor_change_cb(const struct ovsdb_row *old,
  * 'changed' must be a scratch buffer for internal use that is at least
  * bitmap_n_bytes(mt->n_columns) bytes long. */
 static struct json *
-ovsdb_jsonrpc_monitor_compose_row_update(
-    const struct ovsdb_jsonrpc_monitor_table *mt,
-    const struct ovsdb_jsonrpc_monitor_row *row,
+ovsdb_monitor_compose_row_update(
+    const struct ovsdb_monitor_table *mt,
+    const struct ovsdb_monitor_row *row,
     bool initial, unsigned long int *changed)
 {
-    enum ovsdb_jsonrpc_monitor_selection type;
+    enum ovsdb_monitor_selection type;
     struct json *old_json, *new_json;
     struct json *row_json;
     size_t i;
@@ -1570,7 +1601,7 @@ ovsdb_jsonrpc_monitor_compose_row_update(
         json_object_put(row_json, "new", new_json);
     }
     for (i = 0; i < mt->n_columns; i++) {
-        const struct ovsdb_jsonrpc_monitor_column *c = &mt->columns[i];
+        const struct ovsdb_monitor_column *c = &mt->columns[i];
 
         if (!(type & c->select)) {
             /* We don't care about this type of change for this
@@ -1596,9 +1627,9 @@ ovsdb_jsonrpc_monitor_compose_row_update(
 }
 
 /* Constructs and returns JSON for a <table-updates> object (as described in
- * ovsdb/SPECS) for all the outstanding changes within 'monitor', and deletes
- * all the outstanding changes from 'monitor'.  Returns NULL if no update needs
- * to be sent.
+ * RFC 7047) for all the outstanding changes within 'monitor', and deletes all
+ * the outstanding changes from 'monitor'.  Returns NULL if no update needs to
+ * be sent.
  *
  * The caller should specify 'initial' as true if the returned JSON is going to
  * be used as part of the initial reply to a "monitor" request, false if it is
@@ -1613,23 +1644,23 @@ ovsdb_jsonrpc_monitor_compose_table_update(
     size_t max_columns;
 
     max_columns = 0;
-    SHASH_FOR_EACH (node, &monitor->tables) {
-        struct ovsdb_jsonrpc_monitor_table *mt = node->data;
+    SHASH_FOR_EACH (node, &monitor->dbmon->tables) {
+        struct ovsdb_monitor_table *mt = node->data;
 
         max_columns = MAX(max_columns, mt->n_columns);
     }
     changed = xmalloc(bitmap_n_bytes(max_columns));
 
     json = NULL;
-    SHASH_FOR_EACH (node, &monitor->tables) {
-        struct ovsdb_jsonrpc_monitor_table *mt = node->data;
-        struct ovsdb_jsonrpc_monitor_row *row, *next;
+    SHASH_FOR_EACH (node, &monitor->dbmon->tables) {
+        struct ovsdb_monitor_table *mt = node->data;
+        struct ovsdb_monitor_row *row, *next;
         struct json *table_json = NULL;
 
         HMAP_FOR_EACH_SAFE (row, next, hmap_node, &mt->changes) {
             struct json *row_json;
 
-            row_json = ovsdb_jsonrpc_monitor_compose_row_update(
+            row_json = ovsdb_monitor_compose_row_update(
                 mt, row, initial, changed);
             if (row_json) {
                 char uuid[UUID_LEN + 1];
@@ -1651,7 +1682,7 @@ ovsdb_jsonrpc_monitor_compose_table_update(
             }
 
             hmap_remove(&mt->changes, &row->hmap_node);
-            ovsdb_jsonrpc_monitor_row_destroy(mt, row);
+            ovsdb_monitor_row_destroy(mt, row);
         }
     }
 
@@ -1668,8 +1699,8 @@ ovsdb_jsonrpc_monitor_needs_flush(struct ovsdb_jsonrpc_session *s)
     HMAP_FOR_EACH (m, node, &s->monitors) {
         struct shash_node *node;
 
-        SHASH_FOR_EACH (node, &m->tables) {
-            struct ovsdb_jsonrpc_monitor_table *mt = node->data;
+        SHASH_FOR_EACH (node, &m->dbmon->tables) {
+            struct ovsdb_monitor_table *mt = node->data;
 
             if (!hmap_is_empty(&mt->changes)) {
                 return true;
@@ -1701,23 +1732,23 @@ ovsdb_jsonrpc_monitor_flush_all(struct ovsdb_jsonrpc_session *s)
 }
 
 static void
-ovsdb_jsonrpc_monitor_init_aux(struct ovsdb_jsonrpc_monitor_aux *aux,
-                               const struct ovsdb_jsonrpc_monitor *m)
+ovsdb_monitor_init_aux(struct ovsdb_monitor_aux *aux,
+                       const struct ovsdb_monitor *m)
 {
     aux->monitor = m;
     aux->mt = NULL;
 }
 
 static struct ovsdb_error *
-ovsdb_jsonrpc_monitor_commit(struct ovsdb_replica *replica,
-                             const struct ovsdb_txn *txn,
-                             bool durable OVS_UNUSED)
+ovsdb_monitor_commit(struct ovsdb_replica *replica,
+                     const struct ovsdb_txn *txn,
+                     bool durable OVS_UNUSED)
 {
-    struct ovsdb_jsonrpc_monitor *m = ovsdb_jsonrpc_monitor_cast(replica);
-    struct ovsdb_jsonrpc_monitor_aux aux;
+    struct ovsdb_monitor *m = ovsdb_monitor_cast(replica);
+    struct ovsdb_monitor_aux aux;
 
-    ovsdb_jsonrpc_monitor_init_aux(&aux, m);
-    ovsdb_txn_for_each_change(txn, ovsdb_jsonrpc_monitor_change_cb, &aux);
+    ovsdb_monitor_init_aux(&aux, m);
+    ovsdb_txn_for_each_change(txn, ovsdb_monitor_change_cb, &aux);
 
     return NULL;
 }
@@ -1725,19 +1756,19 @@ ovsdb_jsonrpc_monitor_commit(struct ovsdb_replica *replica,
 static struct json *
 ovsdb_jsonrpc_monitor_get_initial(const struct ovsdb_jsonrpc_monitor *m)
 {
-    struct ovsdb_jsonrpc_monitor_aux aux;
+    struct ovsdb_monitor_aux aux;
     struct shash_node *node;
     struct json *json;
 
-    ovsdb_jsonrpc_monitor_init_aux(&aux, m);
-    SHASH_FOR_EACH (node, &m->tables) {
-        struct ovsdb_jsonrpc_monitor_table *mt = node->data;
+    ovsdb_monitor_init_aux(&aux, m->dbmon);
+    SHASH_FOR_EACH (node, &m->dbmon->tables) {
+        struct ovsdb_monitor_table *mt = node->data;
 
         if (mt->select & OJMS_INITIAL) {
             struct ovsdb_row *row;
 
             HMAP_FOR_EACH (row, hmap_node, &mt->table->rows) {
-                ovsdb_jsonrpc_monitor_change_cb(NULL, row, NULL, &aux);
+                ovsdb_monitor_change_cb(NULL, row, NULL, &aux);
             }
         }
     }
@@ -1746,19 +1777,20 @@ ovsdb_jsonrpc_monitor_get_initial(const struct ovsdb_jsonrpc_monitor *m)
 }
 
 static void
-ovsdb_jsonrpc_monitor_destroy(struct ovsdb_replica *replica)
+ovsdb_monitor_destroy(struct ovsdb_replica *replica)
 {
-    struct ovsdb_jsonrpc_monitor *m = ovsdb_jsonrpc_monitor_cast(replica);
+    struct ovsdb_monitor *m = ovsdb_monitor_cast(replica);
+    struct ovsdb_jsonrpc_monitor *jsonrpc_monitor = m->jsonrpc_monitor;
     struct shash_node *node;
 
-    json_destroy(m->monitor_id);
+    json_destroy(jsonrpc_monitor->monitor_id);
     SHASH_FOR_EACH (node, &m->tables) {
-        struct ovsdb_jsonrpc_monitor_table *mt = node->data;
-        struct ovsdb_jsonrpc_monitor_row *row, *next;
+        struct ovsdb_monitor_table *mt = node->data;
+        struct ovsdb_monitor_row *row, *next;
 
         HMAP_FOR_EACH_SAFE (row, next, hmap_node, &mt->changes) {
             hmap_remove(&mt->changes, &row->hmap_node);
-            ovsdb_jsonrpc_monitor_row_destroy(mt, row);
+            ovsdb_monitor_row_destroy(mt, row);
         }
         hmap_destroy(&mt->changes);
 
@@ -1766,11 +1798,12 @@ ovsdb_jsonrpc_monitor_destroy(struct ovsdb_replica *replica)
         free(mt);
     }
     shash_destroy(&m->tables);
-    hmap_remove(&m->session->monitors, &m->node);
+    hmap_remove(&jsonrpc_monitor->session->monitors, &jsonrpc_monitor->node);
+    free(jsonrpc_monitor);
     free(m);
 }
 
 static const struct ovsdb_replica_class ovsdb_jsonrpc_replica_class = {
-    ovsdb_jsonrpc_monitor_commit,
-    ovsdb_jsonrpc_monitor_destroy
+    ovsdb_monitor_commit,
+    ovsdb_monitor_destroy
 };