ovsdb: Monitor support.
authorBen Pfaff <blp@nicira.com>
Mon, 16 Nov 2009 18:38:14 +0000 (10:38 -0800)
committerBen Pfaff <blp@nicira.com>
Wed, 18 Nov 2009 00:02:46 +0000 (16:02 -0800)
ovsdb/SPECS
ovsdb/column.c
ovsdb/column.h
ovsdb/jsonrpc-server.c
ovsdb/ovsdb-client.1.in
ovsdb/ovsdb-client.c
tests/automake.mk
tests/ovsdb-monitor.at [new file with mode: 0644]
tests/ovsdb-server.at
tests/ovsdb.at

index ae4d649..97c9a78 100644 (file)
@@ -281,6 +281,134 @@ form:
 
 The "cancel" notification itself has no reply.
 
+monitor
+.......
+
+Request object members:
+
+    "method": "monitor"                        required
+    "params": [<value>, <monitor-requests>]    required
+    "id": any JSON value except null           required
+
+<monitor-requests> is an object that maps from a table name to a
+<monitor-request>.
+
+Each <monitor-request> is an object with the following members:
+
+    "columns": [<column>*]            optional
+    "select": <monitor-select>        optional
+
+<monitor-select> is an object with the following members:
+
+    "initial": <boolean>              optional
+    "insert": <boolean>               optional
+    "delete": <boolean>               optional
+    "modify": <boolean>               optional
+
+Response object members:
+
+    "result": <table-updates>
+    "error": null
+    "id": same "id" as request
+
+This JSON-RPC request enables a client to replicate tables or subsets
+of tables.  Each <monitor-request> specifies a table to be replicated.
+The JSON-RPC response to the "monitor" includes the initial contents
+of each table.  Afterward, when changes to those tables are committed,
+the changes are automatically sent to the client using the "update"
+monitor notification.  This monitoring persists until the JSON-RPC
+session terminates or until the client sends a "monitor_cancel"
+JSON-RPC request.
+
+Each <monitor-request> describes how to monitor a table:
+
+    The circumstances in which an "update" notification is sent for a
+    row within the table are determined by <monitor-select>:
+
+        If "initial" is omitted or true, every row in the table is
+        sent as part of the reply to the "monitor" request.
+
+        If "insert" is omitted or true, "update" notifications are
+        sent for rows newly inserted into the table.
+
+        If "delete" is omitted or true, "update" notifications are
+        sent for rows deleted from the table.
+
+        If "modify" is omitted or true, "update" notifications are
+        sent whenever when a row in the table is modified.
+
+    The "columns" member specifies the columns whose values are
+    monitored.  If "columns" is omitted, all columns in the table,
+    except for "_uuid", are monitored.
+
+The "result" in the JSON-RPC response to the "monitor" request is a
+<table-updates> object (see below) that contains the contents of the
+tables for which "initial" rows are selected.  If no tables' initial
+contents are requested, then "result" is an empty object.
+
+update
+......
+
+Notification object members:
+
+    "method": "update"
+    "params": [<value>, <table-updates>]
+    "id": null
+
+The <value> in "params" is the same as the value passed as the <value>
+in "params" for the "monitor" request.
+
+<table-updates> is an object that maps from a table name to a
+<table-update>.
+
+A <table-update> is an object that maps from the row's UUID (as a
+36-byte string) to a <row-update> object.
+
+A <row-update> is an object with the following members:
+
+    "old": <row>         present for "delete" and "modify" updates
+    "new": <row>         present for "initial", "insert", and "modify" updates
+
+This JSON-RPC notification is sent from the server to the client to
+tell it about changes to a monitored table (or the initial state of a
+modified table).  Each table in which one or more rows has changed (or
+whose initial view is being presented) is represented in "updates".
+Each row that has changed (or whose initial view is being presented)
+is represented in its <table-update> as a member with its name taken
+from the row's _uuid member.  The corresponding value is a
+<row-update>:
+
+    The "old" member is present for "delete" and "modify" updates.
+    For "delete" updates, each monitored column is included.  For
+    "modify" updates, the prior value of each monitored column whose
+    value has changed is included (monitored columns that have not
+    changed are represented in "new").
+
+    The "new" member is present for "initial", "insert", and "modify"
+    updates.  For "initial" and "insert" updates, each monitored
+    column is included.  For "modify" updates, the new value of each
+    monitored column is included.
+
+monitor_cancel
+..............
+
+Request object members:
+
+    "method": "monitor_cancel"                              required
+    "params": [<value>]                                     required
+    "id": any JSON value except null                        required
+
+Response object members:
+
+    "result": {}
+    "error": null
+    "id": the request "id" member
+
+Cancels the ongoing table monitor request, identified by the <value>
+in "params" matching the <value> in "params" for an ongoing "monitor"
+request.  No more "update" messages will be sent for this table
+monitor.
+
 echo
 ....
 
index 1e8a2d0..fc21cdc 100644 (file)
@@ -174,6 +174,19 @@ error:
                               "array of distinct column names expected");
 }
 
+struct json *
+ovsdb_column_set_to_json(const struct ovsdb_column_set *set)
+{
+    struct json *json;
+    size_t i;
+
+    json = json_array_create_empty();
+    for (i = 0; i < set->n_columns; i++) {
+        json_array_add(json, json_string_create(set->columns[i]->name));
+    }
+    return json;
+}
+
 void
 ovsdb_column_set_add(struct ovsdb_column_set *set,
                      const struct ovsdb_column *column)
index 5942151..5fd39ae 100644 (file)
@@ -71,6 +71,7 @@ void ovsdb_column_set_clone(struct ovsdb_column_set *,
 struct ovsdb_error *ovsdb_column_set_from_json(const struct json *,
                                                const struct ovsdb_table *,
                                                struct ovsdb_column_set *);
+struct json *ovsdb_column_set_to_json(const struct ovsdb_column_set *);
 
 void ovsdb_column_set_add(struct ovsdb_column_set *,
                           const struct ovsdb_column *);
index 977d302..fc8b194 100644 (file)
 #include "column.h"
 #include "json.h"
 #include "jsonrpc.h"
+#include "ovsdb-error.h"
+#include "ovsdb-parser.h"
 #include "ovsdb.h"
 #include "reconnect.h"
+#include "row.h"
 #include "stream.h"
+#include "table.h"
 #include "timeval.h"
+#include "transaction.h"
 #include "trigger.h"
 
 #define THIS_MODULE VLM_ovsdb_jsonrpc_server
@@ -33,6 +38,7 @@
 
 struct ovsdb_jsonrpc_session;
 
+/* Sessions. */
 static void ovsdb_jsonrpc_session_create_active(struct ovsdb_jsonrpc_server *,
                                                 const char *name);
 static void ovsdb_jsonrpc_session_create_passive(struct ovsdb_jsonrpc_server *,
@@ -40,6 +46,7 @@ static void ovsdb_jsonrpc_session_create_passive(struct ovsdb_jsonrpc_server *,
 static void ovsdb_jsonrpc_session_run_all(struct ovsdb_jsonrpc_server *);
 static void ovsdb_jsonrpc_session_wait_all(struct ovsdb_jsonrpc_server *);
 
+/* Triggers. */
 static void ovsdb_jsonrpc_trigger_create(struct ovsdb_jsonrpc_session *,
                                          struct json *id, struct json *params);
 static struct ovsdb_jsonrpc_trigger *ovsdb_jsonrpc_trigger_find(
@@ -48,6 +55,15 @@ static void ovsdb_jsonrpc_trigger_complete(struct ovsdb_jsonrpc_trigger *);
 static void ovsdb_jsonrpc_trigger_complete_all(struct ovsdb_jsonrpc_session *);
 static void ovsdb_jsonrpc_trigger_complete_done(
     struct ovsdb_jsonrpc_session *);
+
+/* Monitors. */
+static struct json *ovsdb_jsonrpc_monitor_create(
+    struct ovsdb_jsonrpc_session *, struct json *params);
+static struct jsonrpc_msg *ovsdb_jsonrpc_monitor_cancel(
+    struct ovsdb_jsonrpc_session *,
+    struct json_array *params,
+    const struct json *request_id);
+static void ovsdb_jsonrpc_monitor_remove_all(struct ovsdb_jsonrpc_session *);
 \f
 /* JSON-RPC database server. */
 
@@ -151,6 +167,9 @@ struct ovsdb_jsonrpc_session {
     struct hmap triggers;       /* Hmap of "struct ovsdb_jsonrpc_trigger"s. */
     struct list completions;    /* Completed triggers. */
 
+    /* Monitors. */
+    struct hmap monitors;       /* Hmap of "struct ovsdb_jsonrpc_monitor"s. */
+
     /* Connecting and reconnecting. */
     struct reconnect *reconnect; /* For back-off. */
     bool active;                /* Active or passive connection? */
@@ -177,6 +196,7 @@ ovsdb_jsonrpc_session_create(struct ovsdb_jsonrpc_server *svr,
     s->server = svr;
     list_push_back(&svr->sessions, &s->node);
     hmap_init(&s->triggers);
+    hmap_init(&s->monitors);
     list_init(&s->completions);
     s->reconnect = reconnect_create(time_msec());
     reconnect_set_name(s->reconnect, name);
@@ -221,6 +241,7 @@ ovsdb_jsonrpc_session_disconnect(struct ovsdb_jsonrpc_session *s)
     if (s->rpc) {
         jsonrpc_error(s->rpc, EOF);
         ovsdb_jsonrpc_trigger_complete_all(s);
+        ovsdb_jsonrpc_monitor_remove_all(s);
         jsonrpc_close(s->rpc);
         s->rpc = NULL;
     } else if (s->stream) {
@@ -375,6 +396,12 @@ ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *s,
 
     if (!strcmp(request->method, "transact")) {
         reply = execute_transaction(s, request);
+    } else if (!strcmp(request->method, "monitor")) {
+        reply = jsonrpc_create_reply(
+            ovsdb_jsonrpc_monitor_create(s, request->params), request->id);
+    } else if (!strcmp(request->method, "monitor_cancel")) {
+        reply = ovsdb_jsonrpc_monitor_cancel(s, json_array(request->params),
+                                             request->id);
     } else if (!strcmp(request->method, "get_schema")) {
         reply = jsonrpc_create_reply(
             ovsdb_schema_to_json(s->server->db->schema), request->id);
@@ -522,3 +549,402 @@ ovsdb_jsonrpc_trigger_complete_done(struct ovsdb_jsonrpc_session *s)
         ovsdb_jsonrpc_trigger_complete(t);
     }
 }
+\f
+/* JSON-RPC database table monitors. */
+
+enum ovsdb_jsonrpc_monitor_selection {
+    OJMS_INITIAL = 1 << 0,      /* All rows when monitor is created. */
+    OJMS_INSERT = 1 << 1,       /* New rows. */
+    OJMS_DELETE = 1 << 2,       /* Deleted rows. */
+    OJMS_MODIFY = 1 << 3        /* Modified rows. */
+};
+
+struct ovsdb_jsonrpc_monitor_table {
+    const struct ovsdb_table *table;
+    enum ovsdb_jsonrpc_monitor_selection select;
+    struct ovsdb_column_set columns;
+};
+
+struct ovsdb_jsonrpc_monitor {
+    struct ovsdb_replica replica;
+    struct ovsdb_jsonrpc_session *session;
+    struct hmap_node node;      /* In ovsdb_jsonrpc_session's "monitors". */
+
+    struct json *monitor_id;
+    struct shash tables;     /* Holds "struct ovsdb_jsonrpc_monitor_table"s. */
+};
+
+static const struct ovsdb_replica_class ovsdb_jsonrpc_replica_class;
+
+struct ovsdb_jsonrpc_monitor *ovsdb_jsonrpc_monitor_find(
+    struct ovsdb_jsonrpc_session *, const struct json *monitor_id);
+static void ovsdb_jsonrpc_monitor_destroy(struct ovsdb_replica *);
+static struct json *ovsdb_jsonrpc_monitor_get_initial(
+    const struct ovsdb_jsonrpc_monitor *);
+
+static bool
+parse_bool(struct ovsdb_parser *parser, const char *name, bool default_value)
+{
+    const struct json *json;
+
+    json = ovsdb_parser_member(parser, name, OP_BOOLEAN | OP_OPTIONAL);
+    return json ? json_boolean(json) : default_value;
+}
+
+struct ovsdb_jsonrpc_monitor *
+ovsdb_jsonrpc_monitor_find(struct ovsdb_jsonrpc_session *s,
+                           const struct json *monitor_id)
+{
+    struct ovsdb_jsonrpc_monitor *m;
+
+    HMAP_FOR_EACH_WITH_HASH (m, struct ovsdb_jsonrpc_monitor, node,
+                             json_hash(monitor_id, 0), &s->monitors) {
+        if (json_equal(m->monitor_id, monitor_id)) {
+            return m;
+        }
+    }
+
+    return NULL;
+}
+
+static struct json *
+ovsdb_jsonrpc_monitor_create(struct ovsdb_jsonrpc_session *s,
+                             struct json *params)
+{
+    struct ovsdb_jsonrpc_monitor *m = NULL;
+    struct json *monitor_id, *monitor_requests;
+    struct ovsdb_error *error = NULL;
+    struct shash_node *node;
+    struct json *json;
+
+    if (json_array(params)->n != 2) {
+        error = ovsdb_syntax_error(params, NULL, "invalid parameters");
+        goto error;
+    }
+    monitor_id = params->u.array.elems[0];
+    monitor_requests = params->u.array.elems[1];
+    if (monitor_requests->type != JSON_OBJECT) {
+        error = ovsdb_syntax_error(monitor_requests, NULL,
+                                   "monitor-requests must be object");
+        goto error;
+    }
+
+    if (ovsdb_jsonrpc_monitor_find(s, monitor_id)) {
+        error = ovsdb_syntax_error(monitor_id, NULL, "duplicate monitor ID");
+        goto error;
+    }
+
+    m = xzalloc(sizeof *m);
+    ovsdb_replica_init(&m->replica, &ovsdb_jsonrpc_replica_class);
+    ovsdb_add_replica(s->server->db, &m->replica);
+    m->session = s;
+    hmap_insert(&s->monitors, &m->node, json_hash(monitor_id, 0));
+    m->monitor_id = json_clone(monitor_id);
+    shash_init(&m->tables);
+
+    SHASH_FOR_EACH (node, json_object(monitor_requests)) {
+        const struct ovsdb_table *table;
+        struct ovsdb_jsonrpc_monitor_table *mt;
+        const struct json *columns_json, *select_json;
+        struct ovsdb_parser parser;
+
+        table = ovsdb_get_table(s->server->db, node->name);
+        if (!table) {
+            error = ovsdb_syntax_error(NULL, NULL,
+                                       "no table named %s", node->name);
+            goto error;
+        }
+
+        mt = xzalloc(sizeof *mt);
+        mt->table = table;
+        mt->select = OJMS_INITIAL | OJMS_INSERT | OJMS_DELETE | OJMS_MODIFY;
+        ovsdb_column_set_init(&mt->columns);
+        shash_add(&m->tables, table->schema->name, mt);
+
+        ovsdb_parser_init(&parser, node->data, "table %s", node->name);
+        columns_json = ovsdb_parser_member(&parser, "columns",
+                                           OP_ARRAY | OP_OPTIONAL);
+        select_json = ovsdb_parser_member(&parser, "select",
+                                          OP_OBJECT | OP_OPTIONAL);
+        error = ovsdb_parser_finish(&parser);
+        if (error) {
+            goto error;
+        }
+
+        if (columns_json) {
+            error = ovsdb_column_set_from_json(columns_json, table,
+                                               &mt->columns);
+            if (error) {
+                goto error;
+            }
+        } else {
+            struct shash_node *node;
+
+            SHASH_FOR_EACH (node, &table->schema->columns) {
+                const struct ovsdb_column *column = node->data;
+                if (column->index != OVSDB_COL_UUID) {
+                    ovsdb_column_set_add(&mt->columns, column);
+                }
+            }
+        }
+
+        if (select_json) {
+            mt->select = 0;
+            ovsdb_parser_init(&parser, select_json, "table %s select",
+                              table->schema->name);
+            if (parse_bool(&parser, "initial", true)) {
+                mt->select |= OJMS_INITIAL;
+            }
+            if (parse_bool(&parser, "insert", true)) {
+                mt->select |= OJMS_INSERT;
+            }
+            if (parse_bool(&parser, "delete", true)) {
+                mt->select |= OJMS_DELETE;
+            }
+            if (parse_bool(&parser, "modify", true)) {
+                mt->select |= OJMS_MODIFY;
+            }
+            error = ovsdb_parser_finish(&parser);
+            if (error) {
+                goto error;
+            }
+        }
+    }
+
+    return ovsdb_jsonrpc_monitor_get_initial(m);
+
+error:
+    ovsdb_remove_replica(s->server->db, &m->replica);
+
+    json = ovsdb_error_to_json(error);
+    ovsdb_error_destroy(error);
+    return json;
+}
+
+static struct jsonrpc_msg *
+ovsdb_jsonrpc_monitor_cancel(struct ovsdb_jsonrpc_session *s,
+                             struct json_array *params,
+                             const struct json *request_id)
+{
+    if (params->n != 1) {
+        return jsonrpc_create_error(json_string_create("invalid parameters"),
+                                    request_id);
+    } else {
+        struct ovsdb_jsonrpc_monitor *m;
+
+        m = ovsdb_jsonrpc_monitor_find(s, params->elems[0]);
+        if (!m) {
+            return jsonrpc_create_error(json_string_create("unknown monitor"),
+                                        request_id);
+        } else {
+            ovsdb_remove_replica(s->server->db, &m->replica);
+            return jsonrpc_create_reply(json_object_create(), request_id);
+        }
+    }
+}
+
+static void
+ovsdb_jsonrpc_monitor_remove_all(struct ovsdb_jsonrpc_session *s)
+{
+    struct ovsdb_jsonrpc_monitor *m, *next;
+
+    HMAP_FOR_EACH_SAFE (m, next,
+                        struct ovsdb_jsonrpc_monitor, node, &s->monitors) {
+        ovsdb_remove_replica(s->server->db, &m->replica);
+    }
+}
+
+static struct ovsdb_jsonrpc_monitor *
+ovsdb_jsonrpc_monitor_cast(struct ovsdb_replica *replica)
+{
+    assert(replica->class == &ovsdb_jsonrpc_replica_class);
+    return CONTAINER_OF(replica, struct ovsdb_jsonrpc_monitor, replica);
+}
+
+struct ovsdb_jsonrpc_monitor_aux {
+    bool initial;               /* Sending initial contents of table? */
+    const struct ovsdb_jsonrpc_monitor *monitor;
+    struct json *json;          /* JSON for the whole transaction. */
+
+    /* Current table.  */
+    struct ovsdb_jsonrpc_monitor_table *mt;
+    struct json *table_json;    /* JSON for table's transaction. */
+};
+
+static bool
+ovsdb_jsonrpc_monitor_change_cb(const struct ovsdb_row *old,
+                                const struct ovsdb_row *new,
+                                void *aux_)
+{
+    struct ovsdb_jsonrpc_monitor_aux *aux = aux_;
+    const struct ovsdb_jsonrpc_monitor *m = aux->monitor;
+    struct ovsdb_table *table = new ? new->table : old->table;
+    enum ovsdb_jsonrpc_monitor_selection type;
+    struct json *old_json, *new_json;
+    struct json *row_json;
+    char uuid[UUID_LEN + 1];
+    int n_changed;
+    size_t i;
+
+    if (!aux->mt || table != aux->mt->table) {
+        aux->mt = shash_find_data(&m->tables, table->schema->name);
+        aux->table_json = NULL;
+        if (!aux->mt) {
+            /* We don't care about rows in this table at all.  Tell the caller
+             * to skip it.  */
+            return false;
+        }
+    }
+
+    type = (aux->initial ? OJMS_INITIAL
+            : !old ? OJMS_INSERT
+            : !new ? OJMS_DELETE
+            : OJMS_MODIFY);
+    if (!(aux->mt->select & type)) {
+        /* We don't care about this type of change (but do want to be called
+         * back for changes to other rows in the same table). */
+        return true;
+    }
+
+    old_json = new_json = NULL;
+    n_changed = 0;
+    for (i = 0; i < aux->mt->columns.n_columns; i++) {
+        const struct ovsdb_column *column = aux->mt->columns.columns[i];
+        unsigned int idx = column->index;
+        bool changed = false;
+
+        if (type == OJMS_MODIFY) {
+            changed = !ovsdb_datum_equals(&old->fields[idx],
+                                          &new->fields[idx], &column->type);
+            n_changed += changed;
+        }
+        if (changed || type == OJMS_DELETE) {
+            if (!old_json) {
+                old_json = json_object_create();
+            }
+            json_object_put(old_json, column->name,
+                            ovsdb_datum_to_json(&old->fields[idx],
+                                                &column->type));
+        }
+        if (type & (OJMS_INITIAL | OJMS_INSERT | OJMS_MODIFY)) {
+            if (!new_json) {
+                new_json = json_object_create();
+            }
+            json_object_put(new_json, column->name,
+                            ovsdb_datum_to_json(&new->fields[idx],
+                                                &column->type));
+        }
+    }
+    if ((type == OJMS_MODIFY && !n_changed) || (!old_json && !new_json)) {
+        /* No reportable changes. */
+        json_destroy(old_json);
+        json_destroy(new_json);
+        return true;
+    }
+
+    /* Create JSON object for transaction overall. */
+    if (!aux->json) {
+        aux->json = json_object_create();
+    }
+
+    /* Create JSON object for transaction on this table. */
+    if (!aux->table_json) {
+        aux->table_json = json_object_create();
+        json_object_put(aux->json, aux->mt->table->schema->name,
+                        aux->table_json);
+    }
+
+    /* Create JSON object for transaction on this row. */
+    row_json = json_object_create();
+    if (old_json) {
+        json_object_put(row_json, "old", old_json);
+    }
+    if (new_json) {
+        json_object_put(row_json, "new", new_json);
+    }
+
+    /* Add JSON row to JSON table. */
+    snprintf(uuid, sizeof uuid,
+             UUID_FMT, UUID_ARGS(ovsdb_row_get_uuid(new ? new : old)));
+    json_object_put(aux->table_json, uuid, row_json);
+
+    return true;
+}
+
+static void
+ovsdb_jsonrpc_monitor_init_aux(struct ovsdb_jsonrpc_monitor_aux *aux,
+                               const struct ovsdb_jsonrpc_monitor *m,
+                               bool initial)
+{
+    aux->initial = initial;
+    aux->monitor = m;
+    aux->json = NULL;
+    aux->mt = NULL;
+    aux->table_json = NULL;
+}
+
+static struct ovsdb_error *
+ovsdb_jsonrpc_monitor_commit(struct ovsdb_replica *replica,
+                             const struct ovsdb_txn *txn, bool durable UNUSED)
+{
+    struct ovsdb_jsonrpc_monitor *m = ovsdb_jsonrpc_monitor_cast(replica);
+    struct ovsdb_jsonrpc_monitor_aux aux;
+
+    ovsdb_jsonrpc_monitor_init_aux(&aux, m, false);
+    ovsdb_txn_for_each_change(txn, ovsdb_jsonrpc_monitor_change_cb, &aux);
+    if (aux.json) {
+        struct jsonrpc_msg *msg;
+        struct json *params;
+
+        params = json_array_create_2(json_clone(aux.monitor->monitor_id),
+                                     aux.json);
+        msg = jsonrpc_create_notify("update", params);
+        jsonrpc_send(aux.monitor->session->rpc, msg);
+    }
+
+    return NULL;
+}
+
+static struct json *
+ovsdb_jsonrpc_monitor_get_initial(const struct ovsdb_jsonrpc_monitor *m)
+{
+    struct ovsdb_jsonrpc_monitor_aux aux;
+    struct shash_node *node;
+
+    ovsdb_jsonrpc_monitor_init_aux(&aux, m, true);
+    SHASH_FOR_EACH (node, &m->tables) {
+        struct ovsdb_jsonrpc_monitor_table *mt = node->data;
+
+        if (mt->select & OJMS_INITIAL) {
+            struct ovsdb_row *row;
+
+            HMAP_FOR_EACH (row, struct ovsdb_row, hmap_node,
+                           &mt->table->rows) {
+                ovsdb_jsonrpc_monitor_change_cb(NULL, row, &aux);
+            }
+        }
+    }
+    return aux.json ? aux.json : json_object_create();
+}
+
+static void
+ovsdb_jsonrpc_monitor_destroy(struct ovsdb_replica *replica)
+{
+    struct ovsdb_jsonrpc_monitor *m = ovsdb_jsonrpc_monitor_cast(replica);
+    struct shash_node *node;
+
+    json_destroy(m->monitor_id);
+    SHASH_FOR_EACH (node, &m->tables) {
+        struct ovsdb_jsonrpc_monitor_table *mt = node->data;
+        ovsdb_column_set_destroy(&mt->columns);
+        free(mt);
+    }
+    shash_destroy(&m->tables);
+    hmap_remove(&m->session->monitors, &m->node);
+    free(m);
+}
+
+static const struct ovsdb_replica_class ovsdb_jsonrpc_replica_class = {
+    ovsdb_jsonrpc_monitor_commit,
+    ovsdb_jsonrpc_monitor_destroy
+};
index 0337c3d..9825d32 100644 (file)
@@ -20,6 +20,10 @@ ovsdb\-client \- command-line interface to \fBovsdb-server\fR(1)
 .br
 \fBovsdb\-client \fR[\fIoptions\fR] \fBtransact\fI server transaction\fR
 .br
+\fBovsdb\-client \fR[\fIoptions\fR] \fBmonitor\fI server table\fR
+[\fIcolumn\fR[\fB,\fIcolumn\fR]...]
+[\fIselect\fR[\fB,\fIselect\fR]...]
+.br
 \fBovsdb\-client help\fR
 .IP "Output formatting options:"
 [\fB--format=\fIformat\fR]
@@ -62,10 +66,28 @@ Connects to \fIserver\fR, retrieves the database schema, and prints
 a table listing the names, type, and comment (if any) on each column.  If
 \fItable\fR is specified, only columns in that table are listed;
 otherwise, the tables include columns in all tables.
-.IP "\fBovsdb\-client \fR[\fIoptions\fR] \fBtransact\fI server transaction\fR"
+.
+.IP "\fBtransact\fI server transaction\fR"
 Connects to \fIserver\fR, sends it the specified \fItransaction\fR,
 which must be a JSON array containing one or more valid OVSDB
 operations, and prints the received reply on stdout.
+.
+.IP "\fBmonitor\fI server table\fR [\fIcolumn\fR[\fB,\fIcolumn\fR]...] [\fIselect\fR[\fB,\fIselect\fR]...]"
+Connects to \fIserver\fR and monitors the contents of \fItable\fR.  By
+default, the initial contents of \fItable\fR are printed, followed by
+each change as it occurs.  If at least one \fIcolumn\fR is specified,
+only those columns are monitored.  If at least one \fIselect\fR is
+specified, they are interpreted as follows:
+.RS
+.IP "\fBinitial\fR"
+Print the initial contents of the specified columns.
+.IP "\fBinsert\fR"
+Print newly inserted rows.
+.IP "\fBdelete\fR"
+Print deleted rows.
+.IP "\fBmodify\fR"
+Print old and new values of modified rows.
+.RE
 .SH OPTIONS
 .SS "Output Formatting Options"
 Much of the output from \fBovsdb\-client\fR is in the form of tables.
index 249cafe..6e00681 100644 (file)
@@ -149,7 +149,10 @@ usage(void)
            "    list columns in TABLE (or all tables) on SERVER\n"
            "\n  transact SERVER TRANSACTION\n"
            "    run TRANSACTION (a JSON array of operations) on SERVER\n"
-           "    and print the results as JSON on stdout\n",
+           "    and print the results as JSON on stdout\n"
+           "\n  monitor SERVER TABLE [COLUMN,...] [SELECT,...]\n"
+           "    monitor contents of (COLUMNs in) TABLE on SERVER\n"
+           "    Valid SELECTs are: initial, insert, delete, modify\n",
            program_name, program_name);
     stream_usage("SERVER", true, true);
     printf("\nOutput formatting options:\n"
@@ -227,14 +230,12 @@ check_ovsdb_error(struct ovsdb_error *error)
 }
 
 static struct ovsdb_schema *
-fetch_schema(const char *server)
+fetch_schema_from_rpc(struct jsonrpc *rpc)
 {
     struct jsonrpc_msg *request, *reply;
     struct ovsdb_schema *schema;
-    struct jsonrpc *rpc;
     int error;
 
-    rpc = open_jsonrpc(server);
     request = jsonrpc_create_request("get_schema", json_array_create_empty());
     error = jsonrpc_transact_block(rpc, request, &reply);
     if (error) {
@@ -242,6 +243,18 @@ fetch_schema(const char *server)
     }
     check_ovsdb_error(ovsdb_schema_from_json(reply->result, &schema));
     jsonrpc_msg_destroy(reply);
+
+    return schema;
+}
+
+static struct ovsdb_schema *
+fetch_schema(const char *server)
+{
+    struct ovsdb_schema *schema;
+    struct jsonrpc *rpc;
+
+    rpc = open_jsonrpc(server);
+    schema = fetch_schema_from_rpc(rpc);
     jsonrpc_close(rpc);
 
     return schema;
@@ -266,6 +279,22 @@ table_init(struct table *table)
     memset(table, 0, sizeof *table);
 }
 
+static void
+table_destroy(struct table *table)
+{
+    size_t i;
+
+    for (i = 0; i < table->n_columns; i++) {
+        free(table->columns[i].heading);
+    }
+    free(table->columns);
+
+    for (i = 0; i < table->n_columns * table->n_rows; i++) {
+        free(table->cells[i]);
+    }
+    free(table->cells);
+}
+
 static void
 table_add_column(struct table *table, const char *heading, ...)
     PRINTF_FORMAT(2, 3);
@@ -590,7 +619,7 @@ do_list_columns(int argc UNUSED, char *argv[])
 }
 
 static void
-do_transact(int argc UNUSED, char *argv[] UNUSED)
+do_transact(int argc UNUSED, char *argv[])
 {
     struct jsonrpc_msg *request, *reply;
     struct json *transaction;
@@ -615,6 +644,188 @@ do_transact(int argc UNUSED, char *argv[] UNUSED)
     jsonrpc_close(rpc);
 }
 
+static void
+monitor_print_row(struct json *row, const char *type, const char *uuid,
+                  const struct ovsdb_column_set *columns, struct table *t)
+{
+    size_t i;
+
+    if (!row) {
+        ovs_error(0, "missing %s row", type);
+        return;
+    } else if (row->type != JSON_OBJECT) {
+        ovs_error(0, "<row> is not object");
+        return;
+    }
+
+    table_add_row(t);
+    table_add_cell(t, uuid);
+    table_add_cell(t, type);
+    for (i = 0; i < columns->n_columns; i++) {
+        const struct ovsdb_column *column = columns->columns[i];
+        struct json *value = shash_find_data(json_object(row), column->name);
+        if (value) {
+            table_add_cell_nocopy(t, json_to_string(value, JSSF_SORT));
+        } else {
+            table_add_cell(t, "");
+        }
+    }
+}
+
+static void
+monitor_print(struct json *table_updates,
+              const struct ovsdb_table_schema *table,
+              const struct ovsdb_column_set *columns, bool initial)
+{
+    struct json *table_update;
+    struct shash_node *node;
+    struct table t;
+    size_t i;
+
+    table_init(&t);
+
+    if (table_updates->type != JSON_OBJECT) {
+        ovs_error(0, "<table-updates> is not object");
+        return;
+    }
+    table_update = shash_find_data(json_object(table_updates), table->name);
+    if (!table_update) {
+        return;
+    }
+    if (table_update->type != JSON_OBJECT) {
+        ovs_error(0, "<table-update> is not object");
+        return;
+    }
+
+    table_add_column(&t, "row");
+    table_add_column(&t, "action");
+    for (i = 0; i < columns->n_columns; i++) {
+        table_add_column(&t, "%s", columns->columns[i]->name);
+    }
+    SHASH_FOR_EACH (node, json_object(table_update)) {
+        struct json *row_update = node->data;
+        struct json *old, *new;
+
+        if (row_update->type != JSON_OBJECT) {
+            ovs_error(0, "<row-update> is not object");
+            continue;
+        }
+        old = shash_find_data(json_object(row_update), "old");
+        new = shash_find_data(json_object(row_update), "new");
+        if (initial) {
+            monitor_print_row(new, "initial", node->name, columns, &t);
+        } else if (!old) {
+            monitor_print_row(new, "insert", node->name, columns, &t);
+        } else if (!new) {
+            monitor_print_row(old, "delete", node->name, columns, &t);
+        } else {
+            monitor_print_row(old, "old", node->name, columns, &t);
+            monitor_print_row(new, "new", "", columns, &t);
+        }
+    }
+    table_print(&t);
+    table_destroy(&t);
+}
+
+static void
+do_monitor(int argc, char *argv[])
+{
+    struct ovsdb_column_set columns = OVSDB_COLUMN_SET_INITIALIZER;
+    struct ovsdb_table_schema *table;
+    struct ovsdb_schema *schema;
+    struct jsonrpc_msg *request;
+    struct jsonrpc *rpc;
+    struct json *select, *monitor, *monitor_request, *monitor_requests,
+        *request_id;
+
+    rpc = open_jsonrpc(argv[1]);
+
+    schema = fetch_schema_from_rpc(rpc);
+    table = shash_find_data(&schema->tables, argv[2]);
+    if (!table) {
+        ovs_fatal(0, "%s: no table named \"%s\"", argv[1], argv[2]);
+    }
+
+    if (argc >= 4 && *argv[3] != '\0') {
+        char *save_ptr = NULL;
+        char *token;
+
+        for (token = strtok_r(argv[3], ",", &save_ptr); token != NULL;
+             token = strtok_r(NULL, ",", &save_ptr)) {
+            const struct ovsdb_column *column;
+            column = ovsdb_table_schema_get_column(table, token);
+            if (!column) {
+                ovs_fatal(0, "%s: table \"%s\" does not have a "
+                          "column named \"%s\"", argv[1], argv[2], token);
+            }
+            ovsdb_column_set_add(&columns, column);
+        }
+    } else {
+        struct shash_node *node;
+
+        SHASH_FOR_EACH (node, &table->columns) {
+            const struct ovsdb_column *column = node->data;
+            if (column->index != OVSDB_COL_UUID) {
+                ovsdb_column_set_add(&columns, column);
+            }
+        }
+    }
+
+    if (argc >= 5 && *argv[4] != '\0') {
+        char *save_ptr = NULL;
+        char *token;
+
+        select = json_object_create();
+        for (token = strtok_r(argv[4], ",", &save_ptr); token != NULL;
+             token = strtok_r(NULL, ",", &save_ptr)) {
+            json_object_put(select, token, json_boolean_create(true));
+        }
+    } else {
+        select = NULL;
+    }
+
+    monitor_request = json_object_create();
+    json_object_put(monitor_request,
+                    "columns", ovsdb_column_set_to_json(&columns));
+    if (select) {
+        json_object_put(monitor_request, "select", select);
+    }
+
+    monitor_requests = json_object_create();
+    json_object_put(monitor_requests, argv[2], monitor_request);
+
+    monitor = json_array_create_2(json_null_create(), monitor_requests);
+    request = jsonrpc_create_request("monitor", monitor);
+    request_id = json_clone(request->id);
+    jsonrpc_send(rpc, request);
+    for (;;) {
+        struct jsonrpc_msg *msg;
+        int error;
+
+        error = jsonrpc_recv_block(rpc, &msg);
+        if (error) {
+            ovs_fatal(error, "%s: receive failed", argv[1]);
+        }
+
+        if (msg->type == JSONRPC_REQUEST && !strcmp(msg->method, "echo")) {
+            jsonrpc_send(rpc, jsonrpc_create_reply(json_clone(msg->params),
+                                                   msg->id));
+        } else if (msg->type == JSONRPC_REPLY
+                   && json_equal(msg->id, request_id)) {
+            monitor_print(msg->result, table, &columns, true);
+        } else if (msg->type == JSONRPC_NOTIFY
+                   && !strcmp(msg->method, "update")) {
+            struct json *params = msg->params;
+            if (params->type == JSON_ARRAY
+                && params->u.array.n == 2
+                && params->u.array.elems[0]->type == JSON_NULL) {
+                monitor_print(params->u.array.elems[1],
+                              table, &columns, false);
+            }
+        }
+    }
+}
+
 static void
 do_help(int argc UNUSED, char *argv[] UNUSED)
 {
@@ -626,6 +837,7 @@ static const struct command all_commands[] = {
     { "list-tables", 1, 1, do_list_tables },
     { "list-columns", 1, 2, do_list_columns },
     { "transact", 2, 2, do_transact },
+    { "monitor", 2, 4, do_monitor },
     { "help", 0, INT_MAX, do_help },
     { NULL, 0, 0, NULL },
 };
index ba2d95f..b9d7483 100644 (file)
@@ -30,6 +30,7 @@ TESTSUITE_AT = \
        tests/ovsdb-trigger.at \
        tests/ovsdb-file.at \
        tests/ovsdb-server.at \
+       tests/ovsdb-monitor.at \
        tests/stp.at \
        tests/ovs-vsctl.at \
        tests/lcov-post.at
diff --git a/tests/ovsdb-monitor.at b/tests/ovsdb-monitor.at
new file mode 100644 (file)
index 0000000..cd62ca8
--- /dev/null
@@ -0,0 +1,46 @@
+AT_BANNER([OVSDB -- ovsdb-server monitors])
+
+# OVSDB_CHECK_MONITOR(TITLE, SCHEMA, [PRE-MONITOR-TXN], MONITOR-ARGS,
+#                     TRANSACTIONS, OUTPUT, [KEYWORDS])
+#
+# Creates a database with the given SCHEMA, starts an ovsdb-server on
+# that database, and runs each of the TRANSACTIONS (which should be a
+# quoted list of quoted strings) against it with ovsdb-client one at a
+# time.
+#
+# Checks that the overall output is OUTPUT, but UUIDs in the output
+# are replaced by markers of the form <N> where N is a number.  The
+# first unique UUID is replaced by <0>, the next by <1>, and so on.
+# If a given UUID appears more than once it is always replaced by the
+# same marker.
+#
+# TITLE is provided to AT_SETUP and KEYWORDS to AT_KEYWORDS.
+m4_define([OVSDB_CHECK_MONITOR], 
+  [AT_SETUP([$1])
+   AT_KEYWORDS([ovsdb server monitor positive $7])
+   AT_DATA([schema], [$2
+])
+   OVS_CHECK_LCOV([ovsdb-tool create db schema], [0], [stdout], [ignore])
+   m4_if([$3], [], [],
+     [OVS_CHECK_LCOV([ovsdb-tool transact db '$2'], [0], [ignore], [ignore])])
+   AT_CHECK([ovsdb-server --detach --pidfile=$PWD/server-pid --listen=punix:socket --unixctl=$PWD/unixctl db])
+   AT_CHECK([ovsdb-client monitor --format=csv unix:socket $4 > output & echo $! > monitor-pid], 
+            [0], [ignore], [ignore], [kill `cat server-pid`])
+   m4_foreach([txn], [$5],
+     [OVS_CHECK_LCOV([ovsdb-client transact unix:socket 'txn'], [0],
+                     [ignore], [ignore], [kill `cat server-pid monitor-pid`])])
+   AT_CHECK([ovs-appctl -t $PWD/unixctl -e exit], [0], [ignore], [ignore])
+   wait
+   AT_CHECK([perl $srcdir/uuidfilt.pl output], [0], [$6])
+   AT_CLEANUP])
+
+OVSDB_CHECK_MONITOR([monitor initially empty table],
+  [ORDINAL_SCHEMA],
+  [],
+  [ordinals],
+  [[[[{"op": "insert",
+       "table": "ordinals",
+       "row": {"number": 0, "name": "zero"}}]]]],
+  [[row,action,name,number,_version
+<0>,insert,"""zero""",0,"[""uuid"",""<1>""]"
+]])
index 95f0e90..5d5eebd 100644 (file)
@@ -20,7 +20,7 @@ m4_define([OVSDB_CHECK_EXECUTION],
    AT_DATA([schema], [$2
 ])
    OVS_CHECK_LCOV([ovsdb-tool create db schema], [0], [stdout], [ignore])
-   AT_CHECK([ovsdb-server --verbose --detach --pidfile=$PWD/pid --listen=punix:socket db])
+   AT_CHECK([ovsdb-server --detach --pidfile=$PWD/pid --listen=punix:socket db])
    m4_foreach([txn], [$3], 
      [OVS_CHECK_LCOV([ovsdb-client transact unix:socket 'txn'], [0], [stdout], [ignore],
      [test ! -e pid || kill `cat pid`])
index bdd15f9..690e998 100644 (file)
@@ -48,3 +48,4 @@ m4_include([tests/ovsdb-execution.at])
 m4_include([tests/ovsdb-trigger.at])
 m4_include([tests/ovsdb-file.at])
 m4_include([tests/ovsdb-server.at])
+m4_include([tests/ovsdb-monitor.at])