ovsdb-client: support monitor-cond method
[cascardo/ovs.git] / ovsdb / jsonrpc-server.c
index 3a69a85..864fb03 100644 (file)
@@ -28,6 +28,7 @@
 #include "ovsdb-error.h"
 #include "ovsdb-parser.h"
 #include "ovsdb.h"
+#include "condition.h"
 #include "poll-loop.h"
 #include "reconnect.h"
 #include "row.h"
@@ -45,9 +46,9 @@ VLOG_DEFINE_THIS_MODULE(ovsdb_jsonrpc_server);
 struct ovsdb_jsonrpc_remote;
 struct ovsdb_jsonrpc_session;
 
-/* Set false to defeature monitor2, causing jsonrpc to respond to monitor2
- * method with an error.  */
-static bool monitor2_enable__ = true;
+/* Set false to defeature monitor_cond, causing jsonrpc to respond to
+ * monitor_cond method with an error.  */
+static bool monitor_cond_enable__ = true;
 
 /* Message rate-limiting. */
 static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 5);
@@ -89,6 +90,10 @@ static void ovsdb_jsonrpc_trigger_complete_done(
 static struct jsonrpc_msg *ovsdb_jsonrpc_monitor_create(
     struct ovsdb_jsonrpc_session *, struct ovsdb *, struct json *params,
     enum ovsdb_monitor_version, const struct json *request_id);
+static struct jsonrpc_msg *ovsdb_jsonrpc_monitor_cond_change(
+    struct ovsdb_jsonrpc_session *s,
+    struct json *params,
+    const struct json *request_id);
 static struct jsonrpc_msg *ovsdb_jsonrpc_monitor_cancel(
     struct ovsdb_jsonrpc_session *,
     struct json_array *params,
@@ -98,6 +103,9 @@ static void ovsdb_jsonrpc_monitor_flush_all(struct ovsdb_jsonrpc_session *);
 static bool ovsdb_jsonrpc_monitor_needs_flush(struct ovsdb_jsonrpc_session *);
 static struct json *ovsdb_jsonrpc_monitor_compose_update(
     struct ovsdb_jsonrpc_monitor *monitor, bool initial);
+static struct jsonrpc_msg * ovsdb_jsonrpc_create_notify(
+                                        const struct ovsdb_jsonrpc_monitor *m,
+                                        struct json *params);
 
 \f
 /* JSON-RPC database server. */
@@ -414,7 +422,7 @@ static void ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session *);
 static void ovsdb_jsonrpc_session_get_memory_usage(
     const struct ovsdb_jsonrpc_session *, struct simap *usage);
 static void ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *,
-                                             struct jsonrpc_msg *);
+                                              struct jsonrpc_msg *);
 static void ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session *,
                                              struct jsonrpc_msg *);
 
@@ -865,7 +873,8 @@ ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *s,
             reply = execute_transaction(s, db, request);
         }
     } else if (!strcmp(request->method, "monitor") ||
-               (monitor2_enable__ && !strcmp(request->method, "monitor2"))) {
+               (monitor_cond_enable__ && !strcmp(request->method,
+                                                 "monitor_cond"))) {
         struct ovsdb *db = ovsdb_jsonrpc_lookup_db(s, request, &reply);
         if (!reply) {
             int l = strlen(request->method) - strlen("monitor");
@@ -874,6 +883,9 @@ ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *s,
             reply = ovsdb_jsonrpc_monitor_create(s, db, request->params,
                                                  version, request->id);
         }
+    } else if (!strcmp(request->method, "monitor_cond_change")) {
+        reply = ovsdb_jsonrpc_monitor_cond_change(s, request->params,
+                                                  request->id);
     } else if (!strcmp(request->method, "monitor_cancel")) {
         reply = ovsdb_jsonrpc_monitor_cancel(s, json_array(request->params),
                                              request->id);
@@ -1064,6 +1076,7 @@ struct ovsdb_jsonrpc_monitor {
     uint64_t unflushed;         /* The first transaction that has not been
                                        flushed to the jsonrpc remote client. */
     enum ovsdb_monitor_version version;
+    struct ovsdb_monitor_session_condition *condition;/* Session's condition */
 };
 
 static struct ovsdb_jsonrpc_monitor *
@@ -1091,20 +1104,27 @@ parse_bool(struct ovsdb_parser *parser, const char *name, bool default_value)
 }
 
 static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
-ovsdb_jsonrpc_parse_monitor_request(struct ovsdb_monitor *dbmon,
-                                    const struct ovsdb_table *table,
-                                    const struct json *monitor_request)
+ovsdb_jsonrpc_parse_monitor_request(
+                               struct ovsdb_monitor *dbmon,
+                               const struct ovsdb_table *table,
+                               struct ovsdb_monitor_session_condition *cond,
+                               const struct json *monitor_request)
 {
     const struct ovsdb_table_schema *ts = table->schema;
     enum ovsdb_monitor_selection select;
-    const struct json *columns, *select_json;
+    const struct json *columns, *select_json, *where = NULL;
     struct ovsdb_parser parser;
     struct ovsdb_error *error;
 
     ovsdb_parser_init(&parser, monitor_request, "table %s", ts->name);
+    if (cond) {
+        where = ovsdb_parser_member(&parser, "where", OP_ARRAY | OP_OPTIONAL);
+    }
     columns = ovsdb_parser_member(&parser, "columns", OP_ARRAY | OP_OPTIONAL);
+
     select_json = ovsdb_parser_member(&parser, "select",
                                       OP_OBJECT | OP_OPTIONAL);
+
     error = ovsdb_parser_finish(&parser);
     if (error) {
         return error;
@@ -1179,6 +1199,12 @@ ovsdb_jsonrpc_parse_monitor_request(struct ovsdb_monitor *dbmon,
             }
         }
     }
+    if (cond) {
+        error = ovsdb_monitor_table_condition_create(cond, table, where);
+        if (error) {
+            return error;
+        }
+    }
 
     return NULL;
 }
@@ -1217,6 +1243,9 @@ ovsdb_jsonrpc_monitor_create(struct ovsdb_jsonrpc_session *s, struct ovsdb *db,
     m->session = s;
     m->db = db;
     m->dbmon = ovsdb_monitor_create(db, m);
+    if (version == OVSDB_MONITOR_V2) {
+        m->condition = ovsdb_monitor_session_condition_create();
+    }
     m->unflushed = 0;
     m->version = version;
     hmap_insert(&s->monitors, &m->node, json_hash(monitor_id, 0));
@@ -1244,6 +1273,7 @@ ovsdb_jsonrpc_monitor_create(struct ovsdb_jsonrpc_session *s, struct ovsdb *db,
             for (i = 0; i < array->n; i++) {
                 error = ovsdb_jsonrpc_parse_monitor_request(m->dbmon,
                                                             table,
+                                                            m->condition,
                                                             array->elems[i]);
                 if (error) {
                     goto error;
@@ -1252,6 +1282,7 @@ ovsdb_jsonrpc_monitor_create(struct ovsdb_jsonrpc_session *s, struct ovsdb *db,
         } else {
             error = ovsdb_jsonrpc_parse_monitor_request(m->dbmon,
                                                         table,
+                                                        m->condition,
                                                         mr_value);
             if (error) {
                 goto error;
@@ -1267,6 +1298,11 @@ ovsdb_jsonrpc_monitor_create(struct ovsdb_jsonrpc_session *s, struct ovsdb *db,
         m->dbmon = dbmon;
     }
 
+    /* Only now we can bind session's condition to ovsdb_monitor */
+    if (m->condition) {
+        ovsdb_monitor_condition_bind(m->dbmon, m->condition);
+    }
+
     ovsdb_monitor_get_initial(m->dbmon);
     json = ovsdb_jsonrpc_monitor_compose_update(m, true);
     json = json ? json : json_object_create();
@@ -1282,6 +1318,136 @@ error:
     return jsonrpc_create_error(json, request_id);
 }
 
+static struct ovsdb_error *
+ovsdb_jsonrpc_parse_monitor_cond_change_request(
+                                struct ovsdb_jsonrpc_monitor *m,
+                                const struct ovsdb_table *table,
+                                const struct json *cond_change_req)
+{
+    const struct ovsdb_table_schema *ts = table->schema;
+    const struct json *condition, *columns;
+    struct ovsdb_parser parser;
+    struct ovsdb_error *error;
+
+    ovsdb_parser_init(&parser, cond_change_req, "table %s", ts->name);
+    columns = ovsdb_parser_member(&parser, "columns", OP_ARRAY | OP_OPTIONAL);
+    condition = ovsdb_parser_member(&parser, "where", OP_ARRAY | OP_OPTIONAL);
+
+    error = ovsdb_parser_finish(&parser);
+    if (error) {
+        return error;
+    }
+
+    if (columns) {
+        error = ovsdb_syntax_error(cond_change_req, NULL, "changing columns "
+                                   "is unsupported");
+        return error;
+    }
+    error = ovsdb_monitor_table_condition_update(m->dbmon, m->condition, table,
+                                                 condition);
+
+    return error;
+}
+
+static struct jsonrpc_msg *
+ovsdb_jsonrpc_monitor_cond_change(struct ovsdb_jsonrpc_session *s,
+                                  struct json *params,
+                                  const struct json *request_id)
+{
+    struct ovsdb_error *error;
+    struct ovsdb_jsonrpc_monitor *m;
+    struct json *monitor_cond_change_reqs;
+    struct shash_node *node;
+    struct json *json;
+
+    if (json_array(params)->n != 3) {
+        error = ovsdb_syntax_error(params, NULL, "invalid parameters");
+        goto error;
+    }
+
+    m = ovsdb_jsonrpc_monitor_find(s, params->u.array.elems[0]);
+    if (!m) {
+        error = ovsdb_syntax_error(request_id, NULL,
+                                   "unknown monitor session");
+        goto error;
+    }
+
+    monitor_cond_change_reqs = params->u.array.elems[2];
+    if (monitor_cond_change_reqs->type != JSON_OBJECT) {
+        error =
+            ovsdb_syntax_error(NULL, NULL,
+                               "monitor-cond-change-requests must be object");
+        goto error;
+    }
+
+    SHASH_FOR_EACH (node, json_object(monitor_cond_change_reqs)) {
+        const struct ovsdb_table *table;
+        const struct json *mr_value;
+        size_t i;
+
+        table = ovsdb_get_table(m->db, node->name);
+        if (!table) {
+            error = ovsdb_syntax_error(NULL, NULL,
+                                       "no table named %s", node->name);
+            goto error;
+        }
+        if (!ovsdb_monitor_table_exists(m->dbmon, table)) {
+            error = ovsdb_syntax_error(NULL, NULL,
+                                       "no table named %s in monitor session",
+                                       node->name);
+            goto error;
+        }
+
+        mr_value = node->data;
+        if (mr_value->type == JSON_ARRAY) {
+            const struct json_array *array = &mr_value->u.array;
+
+            for (i = 0; i < array->n; i++) {
+                error = ovsdb_jsonrpc_parse_monitor_cond_change_request(
+                                            m, table, array->elems[i]);
+                if (error) {
+                    goto error;
+                }
+            }
+        } else {
+            error = ovsdb_syntax_error(
+                       NULL, NULL,
+                       "table %s no monitor-cond-change JSON array",
+                       node->name);
+            goto error;
+        }
+    }
+
+    /* Change monitor id */
+    hmap_remove(&s->monitors, &m->node);
+    json_destroy(m->monitor_id);
+    m->monitor_id = json_clone(params->u.array.elems[1]);
+    hmap_insert(&s->monitors, &m->node, json_hash(m->monitor_id, 0));
+
+    /* Send the new update, if any,  represents the difference from the old
+     * condition and the new one. */
+    struct json *update_json;
+
+    update_json = ovsdb_monitor_get_update(m->dbmon, false, true,
+                                    &m->unflushed, m->condition, m->version);
+    if (update_json) {
+        struct jsonrpc_msg *msg;
+        struct json *params;
+
+        params = json_array_create_2(json_clone(m->monitor_id), update_json);
+        msg = ovsdb_jsonrpc_create_notify(m, params);
+        jsonrpc_session_send(s->js, msg);
+    }
+
+    return jsonrpc_create_reply(json_object_create(), request_id);
+
+error:
+
+    json = ovsdb_error_to_json(error);
+    ovsdb_error_destroy(error);
+    return jsonrpc_create_error(json, request_id);
+}
+
 static struct jsonrpc_msg *
 ovsdb_jsonrpc_monitor_cancel(struct ovsdb_jsonrpc_session *s,
                              struct json_array *params,
@@ -1318,12 +1484,13 @@ static struct json *
 ovsdb_jsonrpc_monitor_compose_update(struct ovsdb_jsonrpc_monitor *m,
                                      bool initial)
 {
+
     if (!ovsdb_monitor_needs_flush(m->dbmon, m->unflushed)) {
         return NULL;
     }
 
-    return ovsdb_monitor_get_update(m->dbmon, initial, &m->unflushed,
-                                    m->version);
+    return ovsdb_monitor_get_update(m->dbmon, initial, false,
+                                    &m->unflushed, m->condition, m->version);
 }
 
 static bool
@@ -1346,6 +1513,7 @@ ovsdb_jsonrpc_monitor_destroy(struct ovsdb_jsonrpc_monitor *m)
     json_destroy(m->monitor_id);
     hmap_remove(&m->session->monitors, &m->node);
     ovsdb_monitor_remove_jsonrpc_monitor(m->dbmon, m, m->unflushed);
+    ovsdb_monitor_session_condition_destroy(m->condition);
     free(m);
 }
 
@@ -1391,8 +1559,8 @@ ovsdb_jsonrpc_monitor_flush_all(struct ovsdb_jsonrpc_session *s)
 }
 
 void
-ovsdb_jsonrpc_disable_monitor2(void)
+ovsdb_jsonrpc_disable_monitor_cond(void)
 {
     /* Once disabled, it is not possible to re-enable it. */
-    monitor2_enable__ = false;
+    monitor_cond_enable__ = false;
 }