ovsdb-client: support monitor-cond method
[cascardo/ovs.git] / ovsdb / jsonrpc-server.c
index efd83b8..864fb03 100644 (file)
 
 #include "bitmap.h"
 #include "column.h"
-#include "dynamic-string.h"
+#include "openvswitch/dynamic-string.h"
+#include "monitor.h"
 #include "json.h"
 #include "jsonrpc.h"
 #include "ovsdb-error.h"
 #include "ovsdb-parser.h"
 #include "ovsdb.h"
+#include "condition.h"
 #include "poll-loop.h"
 #include "reconnect.h"
 #include "row.h"
@@ -37,7 +39,6 @@
 #include "timeval.h"
 #include "transaction.h"
 #include "trigger.h"
-#include "monitor.h"
 #include "openvswitch/vlog.h"
 
 VLOG_DEFINE_THIS_MODULE(ovsdb_jsonrpc_server);
@@ -45,6 +46,10 @@ VLOG_DEFINE_THIS_MODULE(ovsdb_jsonrpc_server);
 struct ovsdb_jsonrpc_remote;
 struct ovsdb_jsonrpc_session;
 
+/* Set false to defeature monitor_cond, causing jsonrpc to respond to
+ * monitor_cond method with an error.  */
+static bool monitor_cond_enable__ = true;
+
 /* Message rate-limiting. */
 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
 
@@ -59,9 +64,12 @@ static void ovsdb_jsonrpc_session_close_all(struct ovsdb_jsonrpc_remote *);
 static void ovsdb_jsonrpc_session_reconnect_all(struct ovsdb_jsonrpc_remote *);
 static void ovsdb_jsonrpc_session_set_all_options(
     struct ovsdb_jsonrpc_remote *, const struct ovsdb_jsonrpc_options *);
-static bool ovsdb_jsonrpc_session_get_status(
+static bool ovsdb_jsonrpc_active_session_get_status(
     const struct ovsdb_jsonrpc_remote *,
     struct ovsdb_jsonrpc_remote_status *);
+static void ovsdb_jsonrpc_session_get_status(
+    const struct ovsdb_jsonrpc_session *,
+    struct ovsdb_jsonrpc_remote_status *);
 static void ovsdb_jsonrpc_session_unlock_all(struct ovsdb_jsonrpc_session *);
 static void ovsdb_jsonrpc_session_unlock__(struct ovsdb_lock_waiter *);
 static void ovsdb_jsonrpc_session_send(struct ovsdb_jsonrpc_session *,
@@ -81,6 +89,10 @@ static void ovsdb_jsonrpc_trigger_complete_done(
 /* Monitors. */
 static struct jsonrpc_msg *ovsdb_jsonrpc_monitor_create(
     struct ovsdb_jsonrpc_session *, struct ovsdb *, struct json *params,
+    enum ovsdb_monitor_version, const struct json *request_id);
+static struct jsonrpc_msg *ovsdb_jsonrpc_monitor_cond_change(
+    struct ovsdb_jsonrpc_session *s,
+    struct json *params,
     const struct json *request_id);
 static struct jsonrpc_msg *ovsdb_jsonrpc_monitor_cancel(
     struct ovsdb_jsonrpc_session *,
@@ -91,13 +103,16 @@ static void ovsdb_jsonrpc_monitor_flush_all(struct ovsdb_jsonrpc_session *);
 static bool ovsdb_jsonrpc_monitor_needs_flush(struct ovsdb_jsonrpc_session *);
 static struct json *ovsdb_jsonrpc_monitor_compose_update(
     struct ovsdb_jsonrpc_monitor *monitor, bool initial);
+static struct jsonrpc_msg * ovsdb_jsonrpc_create_notify(
+                                        const struct ovsdb_jsonrpc_monitor *m,
+                                        struct json *params);
 
 \f
 /* JSON-RPC database server. */
 
 struct ovsdb_jsonrpc_server {
     struct ovsdb_server up;
-    unsigned int n_sessions, max_sessions;
+    unsigned int n_sessions;
     struct shash remotes;      /* Contains "struct ovsdb_jsonrpc_remote *"s. */
 };
 
@@ -126,7 +141,6 @@ ovsdb_jsonrpc_server_create(void)
 {
     struct ovsdb_jsonrpc_server *server = xzalloc(sizeof *server);
     ovsdb_server_init(&server->up);
-    server->max_sessions = 330;   /* Random limit. */
     shash_init(&server->remotes);
     return server;
 }
@@ -248,7 +262,7 @@ ovsdb_jsonrpc_server_add_remote(struct ovsdb_jsonrpc_server *svr,
     remote = xmalloc(sizeof *remote);
     remote->server = svr;
     remote->listener = listener;
-    list_init(&remote->sessions);
+    ovs_list_init(&remote->sessions);
     remote->dscp = options->dscp;
     shash_add(&svr->remotes, name, remote);
 
@@ -272,7 +286,7 @@ ovsdb_jsonrpc_server_del_remote(struct shash_node *node)
 /* Stores status information for the remote named 'target', which should have
  * been configured on 'svr' with a call to ovsdb_jsonrpc_server_set_remotes(),
  * into '*status'.  On success returns true, on failure (if 'svr' doesn't have
- * a remote named 'target' or if that remote is an inbound remote that has no
+ * a remote named 'target' or if that remote is an outbound remote that has no
  * active connections) returns false.  On failure, 'status' will be zeroed.
  */
 bool
@@ -285,7 +299,19 @@ ovsdb_jsonrpc_server_get_remote_status(
     memset(status, 0, sizeof *status);
 
     remote = shash_find_data(&svr->remotes, target);
-    return remote && ovsdb_jsonrpc_session_get_status(remote, status);
+
+    if (!remote) {
+        return false;
+    }
+
+    if (remote->listener) {
+        status->bound_port = pstream_get_bound_port(remote->listener);
+        status->is_connected = !ovs_list_is_empty(&remote->sessions);
+        status->n_connections = ovs_list_size(&remote->sessions);
+        return true;
+    }
+
+    return ovsdb_jsonrpc_active_session_get_status(remote, status);
 }
 
 void
@@ -320,25 +346,19 @@ ovsdb_jsonrpc_server_run(struct ovsdb_jsonrpc_server *svr)
         struct ovsdb_jsonrpc_remote *remote = node->data;
 
         if (remote->listener) {
-            if (svr->n_sessions < svr->max_sessions) {
-                struct stream *stream;
-                int error;
-
-                error = pstream_accept(remote->listener, &stream);
-                if (!error) {
-                    struct jsonrpc_session *js;
-                    js = jsonrpc_session_open_unreliably(jsonrpc_open(stream),
-                                                         remote->dscp);
-                    ovsdb_jsonrpc_session_create(remote, js);
-                } else if (error != EAGAIN) {
-                    VLOG_WARN_RL(&rl, "%s: accept failed: %s",
-                                 pstream_get_name(remote->listener),
-                                 ovs_strerror(error));
-                }
-            } else {
-                VLOG_WARN_RL(&rl, "%s: connection exceeded maximum (%d)",
+            struct stream *stream;
+            int error;
+
+            error = pstream_accept(remote->listener, &stream);
+            if (!error) {
+                struct jsonrpc_session *js;
+                js = jsonrpc_session_open_unreliably(jsonrpc_open(stream),
+                                                     remote->dscp);
+                ovsdb_jsonrpc_session_create(remote, js);
+            } else if (error != EAGAIN) {
+                VLOG_WARN_RL(&rl, "%s: accept failed: %s",
                              pstream_get_name(remote->listener),
-                             svr->max_sessions);
+                             ovs_strerror(error));
             }
         }
 
@@ -354,7 +374,7 @@ ovsdb_jsonrpc_server_wait(struct ovsdb_jsonrpc_server *svr)
     SHASH_FOR_EACH (node, &svr->remotes) {
         struct ovsdb_jsonrpc_remote *remote = node->data;
 
-        if (remote->listener && svr->n_sessions < svr->max_sessions) {
+        if (remote->listener) {
             pstream_wait(remote->listener);
         }
 
@@ -402,7 +422,7 @@ static void ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *);
 static void ovsdb_jsonrpc_session_get_memory_usage(
     const struct ovsdb_jsonrpc_session *, struct simap *usage);
 static void ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *,
-                                             struct jsonrpc_msg *);
+                                              struct jsonrpc_msg *);
 static void ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session *,
                                              struct jsonrpc_msg *);
 
@@ -415,7 +435,7 @@ ovsdb_jsonrpc_session_create(struct ovsdb_jsonrpc_remote *remote,
     s = xzalloc(sizeof *s);
     ovsdb_session_init(&s->up, &remote->server->up);
     s->remote = remote;
-    list_push_back(&remote->sessions, &s->node);
+    ovs_list_push_back(&remote->sessions, &s->node);
     hmap_init(&s->triggers);
     hmap_init(&s->monitors);
     s->js = js;
@@ -437,7 +457,7 @@ ovsdb_jsonrpc_session_close(struct ovsdb_jsonrpc_session *s)
     hmap_destroy(&s->triggers);
 
     jsonrpc_session_close(s->js);
-    list_remove(&s->node);
+    ovs_list_remove(&s->node);
     s->remote->server->n_sessions--;
     ovsdb_session_destroy(&s->up);
     free(s);
@@ -529,7 +549,6 @@ ovsdb_jsonrpc_session_get_memory_usage(const struct ovsdb_jsonrpc_session *s,
                                        struct simap *usage)
 {
     simap_increase(usage, "triggers", hmap_count(&s->triggers));
-    simap_increase(usage, "monitors", hmap_count(&s->monitors));
     simap_increase(usage, "backlog", jsonrpc_session_get_backlog(s->js));
 }
 
@@ -587,24 +606,37 @@ ovsdb_jsonrpc_session_set_all_options(
     }
 }
 
+/* Sets the 'status' of for the 'remote' with an outgoing connection.   */
 static bool
-ovsdb_jsonrpc_session_get_status(const struct ovsdb_jsonrpc_remote *remote,
-                                 struct ovsdb_jsonrpc_remote_status *status)
+ovsdb_jsonrpc_active_session_get_status(
+    const struct ovsdb_jsonrpc_remote *remote,
+    struct ovsdb_jsonrpc_remote_status *status)
 {
+    const struct ovs_list *sessions = &remote->sessions;
     const struct ovsdb_jsonrpc_session *s;
+
+    if (ovs_list_is_empty(sessions)) {
+        return false;
+    }
+
+    ovs_assert(ovs_list_is_singleton(sessions));
+    s = CONTAINER_OF(ovs_list_front(sessions), struct ovsdb_jsonrpc_session, node);
+    ovsdb_jsonrpc_session_get_status(s, status);
+    status->n_connections = 1;
+
+    return true;
+}
+
+static void
+ovsdb_jsonrpc_session_get_status(const struct ovsdb_jsonrpc_session *session,
+                                 struct ovsdb_jsonrpc_remote_status *status)
+{
+    const struct ovsdb_jsonrpc_session *s = session;
     const struct jsonrpc_session *js;
     struct ovsdb_lock_waiter *waiter;
     struct reconnect_stats rstats;
     struct ds locks_held, locks_waiting, locks_lost;
 
-    status->bound_port = (remote->listener
-                          ? pstream_get_bound_port(remote->listener)
-                          : htons(0));
-
-    if (list_is_empty(&remote->sessions)) {
-        return false;
-    }
-    s = CONTAINER_OF(remote->sessions.next, struct ovsdb_jsonrpc_session, node);
     js = s->js;
 
     status->is_connected = jsonrpc_session_is_connected(js);
@@ -634,10 +666,6 @@ ovsdb_jsonrpc_session_get_status(const struct ovsdb_jsonrpc_remote *remote,
     status->locks_held = ds_steal_cstr(&locks_held);
     status->locks_waiting = ds_steal_cstr(&locks_waiting);
     status->locks_lost = ds_steal_cstr(&locks_lost);
-
-    status->n_connections = list_size(&remote->sessions);
-
-    return true;
 }
 
 /* Examines 'request' to determine the database to which it relates, and then
@@ -844,12 +872,20 @@ ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *s,
         if (!reply) {
             reply = execute_transaction(s, db, request);
         }
-    } else if (!strcmp(request->method, "monitor")) {
+    } else if (!strcmp(request->method, "monitor") ||
+               (monitor_cond_enable__ && !strcmp(request->method,
+                                                 "monitor_cond"))) {
         struct ovsdb *db = ovsdb_jsonrpc_lookup_db(s, request, &reply);
         if (!reply) {
+            int l = strlen(request->method) - strlen("monitor");
+            enum ovsdb_monitor_version version = l ? OVSDB_MONITOR_V2
+                                                   : OVSDB_MONITOR_V1;
             reply = ovsdb_jsonrpc_monitor_create(s, db, request->params,
-                                                 request->id);
+                                                 version, request->id);
         }
+    } else if (!strcmp(request->method, "monitor_cond_change")) {
+        reply = ovsdb_jsonrpc_monitor_cond_change(s, request->params,
+                                                  request->id);
     } else if (!strcmp(request->method, "monitor_cancel")) {
         reply = ovsdb_jsonrpc_monitor_cancel(s, json_array(request->params),
                                              request->id);
@@ -1022,7 +1058,7 @@ ovsdb_jsonrpc_trigger_complete_all(struct ovsdb_jsonrpc_session *s)
 static void
 ovsdb_jsonrpc_trigger_complete_done(struct ovsdb_jsonrpc_session *s)
 {
-    while (!list_is_empty(&s->up.completions)) {
+    while (!ovs_list_is_empty(&s->up.completions)) {
         struct ovsdb_jsonrpc_trigger *t
             = CONTAINER_OF(s->up.completions.next,
                            struct ovsdb_jsonrpc_trigger, trigger.node);
@@ -1039,6 +1075,8 @@ struct ovsdb_jsonrpc_monitor {
     struct ovsdb_monitor *dbmon;
     uint64_t unflushed;         /* The first transaction that has not been
                                        flushed to the jsonrpc remote client. */
+    enum ovsdb_monitor_version version;
+    struct ovsdb_monitor_session_condition *condition;/* Session's condition */
 };
 
 static struct ovsdb_jsonrpc_monitor *
@@ -1066,21 +1104,27 @@ parse_bool(struct ovsdb_parser *parser, const char *name, bool default_value)
 }
 
 static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
-ovsdb_jsonrpc_parse_monitor_request(struct ovsdb_monitor *dbmon,
-                                    const struct ovsdb_table *table,
-                                    const struct json *monitor_request,
-                                    size_t *allocated_columns)
+ovsdb_jsonrpc_parse_monitor_request(
+                               struct ovsdb_monitor *dbmon,
+                               const struct ovsdb_table *table,
+                               struct ovsdb_monitor_session_condition *cond,
+                               const struct json *monitor_request)
 {
     const struct ovsdb_table_schema *ts = table->schema;
     enum ovsdb_monitor_selection select;
-    const struct json *columns, *select_json;
+    const struct json *columns, *select_json, *where = NULL;
     struct ovsdb_parser parser;
     struct ovsdb_error *error;
 
     ovsdb_parser_init(&parser, monitor_request, "table %s", ts->name);
+    if (cond) {
+        where = ovsdb_parser_member(&parser, "where", OP_ARRAY | OP_OPTIONAL);
+    }
     columns = ovsdb_parser_member(&parser, "columns", OP_ARRAY | OP_OPTIONAL);
+
     select_json = ovsdb_parser_member(&parser, "select",
                                       OP_OBJECT | OP_OPTIONAL);
+
     error = ovsdb_parser_finish(&parser);
     if (error) {
         return error;
@@ -1133,8 +1177,12 @@ ovsdb_jsonrpc_parse_monitor_request(struct ovsdb_monitor *dbmon,
                 return ovsdb_syntax_error(columns, NULL, "%s is not a valid "
                                           "column name", s);
             }
-            ovsdb_monitor_add_column(dbmon, table, column, select,
-                                     allocated_columns);
+            if (ovsdb_monitor_add_column(dbmon, table, column,
+                                         select, true)) {
+                return ovsdb_syntax_error(columns, NULL, "column %s "
+                                          "mentioned more than once",
+                                          column->name);
+            }
         }
     } else {
         struct shash_node *node;
@@ -1142,11 +1190,21 @@ ovsdb_jsonrpc_parse_monitor_request(struct ovsdb_monitor *dbmon,
         SHASH_FOR_EACH (node, &ts->columns) {
             const struct ovsdb_column *column = node->data;
             if (column->index != OVSDB_COL_UUID) {
-                ovsdb_monitor_add_column(dbmon, table, column, select,
-                                         allocated_columns);
+                if (ovsdb_monitor_add_column(dbmon, table, column,
+                                             select, true)) {
+                    return ovsdb_syntax_error(columns, NULL, "column %s "
+                                              "mentioned more than once",
+                                              column->name);
+                }
             }
         }
     }
+    if (cond) {
+        error = ovsdb_monitor_table_condition_create(cond, table, where);
+        if (error) {
+            return error;
+        }
+    }
 
     return NULL;
 }
@@ -1154,9 +1212,11 @@ ovsdb_jsonrpc_parse_monitor_request(struct ovsdb_monitor *dbmon,
 static struct jsonrpc_msg *
 ovsdb_jsonrpc_monitor_create(struct ovsdb_jsonrpc_session *s, struct ovsdb *db,
                              struct json *params,
+                             enum ovsdb_monitor_version version,
                              const struct json *request_id)
 {
     struct ovsdb_jsonrpc_monitor *m = NULL;
+    struct ovsdb_monitor *dbmon = NULL;
     struct json *monitor_id, *monitor_requests;
     struct ovsdb_error *error = NULL;
     struct shash_node *node;
@@ -1183,14 +1243,16 @@ ovsdb_jsonrpc_monitor_create(struct ovsdb_jsonrpc_session *s, struct ovsdb *db,
     m->session = s;
     m->db = db;
     m->dbmon = ovsdb_monitor_create(db, m);
+    if (version == OVSDB_MONITOR_V2) {
+        m->condition = ovsdb_monitor_session_condition_create();
+    }
     m->unflushed = 0;
+    m->version = version;
     hmap_insert(&s->monitors, &m->node, json_hash(monitor_id, 0));
     m->monitor_id = json_clone(monitor_id);
 
     SHASH_FOR_EACH (node, json_object(monitor_requests)) {
         const struct ovsdb_table *table;
-        const char *column_name;
-        size_t allocated_columns;
         const struct json *mr_value;
         size_t i;
 
@@ -1205,33 +1267,40 @@ ovsdb_jsonrpc_monitor_create(struct ovsdb_jsonrpc_session *s, struct ovsdb *db,
 
         /* Parse columns. */
         mr_value = node->data;
-        allocated_columns = 0;
         if (mr_value->type == JSON_ARRAY) {
             const struct json_array *array = &mr_value->u.array;
 
             for (i = 0; i < array->n; i++) {
-                error = ovsdb_jsonrpc_parse_monitor_request(
-                    m->dbmon, table, array->elems[i], &allocated_columns);
+                error = ovsdb_jsonrpc_parse_monitor_request(m->dbmon,
+                                                            table,
+                                                            m->condition,
+                                                            array->elems[i]);
                 if (error) {
                     goto error;
                 }
             }
         } else {
-            error = ovsdb_jsonrpc_parse_monitor_request(
-                m->dbmon, table, mr_value, &allocated_columns);
+            error = ovsdb_jsonrpc_parse_monitor_request(m->dbmon,
+                                                        table,
+                                                        m->condition,
+                                                        mr_value);
             if (error) {
                 goto error;
             }
         }
+    }
 
-        column_name = ovsdb_monitor_table_check_duplicates(m->dbmon, table);
+    dbmon = ovsdb_monitor_add(m->dbmon);
+    if (dbmon != m->dbmon) {
+        /* Found an exisiting dbmon, reuse the current one. */
+        ovsdb_monitor_remove_jsonrpc_monitor(m->dbmon, m, m->unflushed);
+        ovsdb_monitor_add_jsonrpc_monitor(dbmon, m);
+        m->dbmon = dbmon;
+    }
 
-        if (column_name) {
-            error = ovsdb_syntax_error(mr_value, NULL, "column %s "
-                                       "mentioned more than once",
-                                        column_name);
-            goto error;
-        }
+    /* Only now we can bind session's condition to ovsdb_monitor */
+    if (m->condition) {
+        ovsdb_monitor_condition_bind(m->dbmon, m->condition);
     }
 
     ovsdb_monitor_get_initial(m->dbmon);
@@ -1249,6 +1318,136 @@ error:
     return jsonrpc_create_error(json, request_id);
 }
 
+static struct ovsdb_error *
+ovsdb_jsonrpc_parse_monitor_cond_change_request(
+                                struct ovsdb_jsonrpc_monitor *m,
+                                const struct ovsdb_table *table,
+                                const struct json *cond_change_req)
+{
+    const struct ovsdb_table_schema *ts = table->schema;
+    const struct json *condition, *columns;
+    struct ovsdb_parser parser;
+    struct ovsdb_error *error;
+
+    ovsdb_parser_init(&parser, cond_change_req, "table %s", ts->name);
+    columns = ovsdb_parser_member(&parser, "columns", OP_ARRAY | OP_OPTIONAL);
+    condition = ovsdb_parser_member(&parser, "where", OP_ARRAY | OP_OPTIONAL);
+
+    error = ovsdb_parser_finish(&parser);
+    if (error) {
+        return error;
+    }
+
+    if (columns) {
+        error = ovsdb_syntax_error(cond_change_req, NULL, "changing columns "
+                                   "is unsupported");
+        return error;
+    }
+    error = ovsdb_monitor_table_condition_update(m->dbmon, m->condition, table,
+                                                 condition);
+
+    return error;
+}
+
+static struct jsonrpc_msg *
+ovsdb_jsonrpc_monitor_cond_change(struct ovsdb_jsonrpc_session *s,
+                                  struct json *params,
+                                  const struct json *request_id)
+{
+    struct ovsdb_error *error;
+    struct ovsdb_jsonrpc_monitor *m;
+    struct json *monitor_cond_change_reqs;
+    struct shash_node *node;
+    struct json *json;
+
+    if (json_array(params)->n != 3) {
+        error = ovsdb_syntax_error(params, NULL, "invalid parameters");
+        goto error;
+    }
+
+    m = ovsdb_jsonrpc_monitor_find(s, params->u.array.elems[0]);
+    if (!m) {
+        error = ovsdb_syntax_error(request_id, NULL,
+                                   "unknown monitor session");
+        goto error;
+    }
+
+    monitor_cond_change_reqs = params->u.array.elems[2];
+    if (monitor_cond_change_reqs->type != JSON_OBJECT) {
+        error =
+            ovsdb_syntax_error(NULL, NULL,
+                               "monitor-cond-change-requests must be object");
+        goto error;
+    }
+
+    SHASH_FOR_EACH (node, json_object(monitor_cond_change_reqs)) {
+        const struct ovsdb_table *table;
+        const struct json *mr_value;
+        size_t i;
+
+        table = ovsdb_get_table(m->db, node->name);
+        if (!table) {
+            error = ovsdb_syntax_error(NULL, NULL,
+                                       "no table named %s", node->name);
+            goto error;
+        }
+        if (!ovsdb_monitor_table_exists(m->dbmon, table)) {
+            error = ovsdb_syntax_error(NULL, NULL,
+                                       "no table named %s in monitor session",
+                                       node->name);
+            goto error;
+        }
+
+        mr_value = node->data;
+        if (mr_value->type == JSON_ARRAY) {
+            const struct json_array *array = &mr_value->u.array;
+
+            for (i = 0; i < array->n; i++) {
+                error = ovsdb_jsonrpc_parse_monitor_cond_change_request(
+                                            m, table, array->elems[i]);
+                if (error) {
+                    goto error;
+                }
+            }
+        } else {
+            error = ovsdb_syntax_error(
+                       NULL, NULL,
+                       "table %s no monitor-cond-change JSON array",
+                       node->name);
+            goto error;
+        }
+    }
+
+    /* Change monitor id */
+    hmap_remove(&s->monitors, &m->node);
+    json_destroy(m->monitor_id);
+    m->monitor_id = json_clone(params->u.array.elems[1]);
+    hmap_insert(&s->monitors, &m->node, json_hash(m->monitor_id, 0));
+
+    /* Send the new update, if any,  represents the difference from the old
+     * condition and the new one. */
+    struct json *update_json;
+
+    update_json = ovsdb_monitor_get_update(m->dbmon, false, true,
+                                    &m->unflushed, m->condition, m->version);
+    if (update_json) {
+        struct jsonrpc_msg *msg;
+        struct json *params;
+
+        params = json_array_create_2(json_clone(m->monitor_id), update_json);
+        msg = ovsdb_jsonrpc_create_notify(m, params);
+        jsonrpc_session_send(s->js, msg);
+    }
+
+    return jsonrpc_create_reply(json_object_create(), request_id);
+
+error:
+
+    json = ovsdb_error_to_json(error);
+    ovsdb_error_destroy(error);
+    return jsonrpc_create_error(json, request_id);
+}
+
 static struct jsonrpc_msg *
 ovsdb_jsonrpc_monitor_cancel(struct ovsdb_jsonrpc_session *s,
                              struct json_array *params,
@@ -1285,7 +1484,13 @@ static struct json *
 ovsdb_jsonrpc_monitor_compose_update(struct ovsdb_jsonrpc_monitor *m,
                                      bool initial)
 {
-    return ovsdb_monitor_compose_update(m->dbmon, initial, &m->unflushed);
+
+    if (!ovsdb_monitor_needs_flush(m->dbmon, m->unflushed)) {
+        return NULL;
+    }
+
+    return ovsdb_monitor_get_update(m->dbmon, initial, false,
+                                    &m->unflushed, m->condition, m->version);
 }
 
 static bool
@@ -1307,10 +1512,32 @@ ovsdb_jsonrpc_monitor_destroy(struct ovsdb_jsonrpc_monitor *m)
 {
     json_destroy(m->monitor_id);
     hmap_remove(&m->session->monitors, &m->node);
-    ovsdb_monitor_remove_jsonrpc_monitor(m->dbmon, m);
+    ovsdb_monitor_remove_jsonrpc_monitor(m->dbmon, m, m->unflushed);
+    ovsdb_monitor_session_condition_destroy(m->condition);
     free(m);
 }
 
+static struct jsonrpc_msg *
+ovsdb_jsonrpc_create_notify(const struct ovsdb_jsonrpc_monitor *m,
+                            struct json *params)
+{
+    const char *method;
+
+    switch(m->version) {
+    case OVSDB_MONITOR_V1:
+        method = "update";
+        break;
+    case OVSDB_MONITOR_V2:
+        method = "update2";
+        break;
+    case OVSDB_MONITOR_VERSION_MAX:
+    default:
+        OVS_NOT_REACHED();
+    }
+
+    return jsonrpc_create_notify(method, params);
+}
+
 static void
 ovsdb_jsonrpc_monitor_flush_all(struct ovsdb_jsonrpc_session *s)
 {
@@ -1325,8 +1552,15 @@ ovsdb_jsonrpc_monitor_flush_all(struct ovsdb_jsonrpc_session *s)
             struct json *params;
 
             params = json_array_create_2(json_clone(m->monitor_id), json);
-            msg = jsonrpc_create_notify("update", params);
+            msg = ovsdb_jsonrpc_create_notify(m, params);
             jsonrpc_session_send(s->js, msg);
         }
     }
 }
+
+void
+ovsdb_jsonrpc_disable_monitor_cond(void)
+{
+    /* Once disabled, it is not possible to re-enable it. */
+    monitor_cond_enable__ = false;
+}