ovsdb-monitor: allow multiple jsonrpc monitors to share a single ovsdb
[cascardo/ovs.git] / ovsdb / monitor.c
index 18d5e3a..fb45ca6 100644 (file)
@@ -30,6 +30,7 @@
 #include "simap.h"
 #include "hash.h"
 #include "table.h"
+#include "hash.h"
 #include "timeval.h"
 #include "transaction.h"
 #include "jsonrpc-server.h"
@@ -38,6 +39,7 @@
 
 
 static const struct ovsdb_replica_class ovsdb_jsonrpc_replica_class;
+static struct hmap ovsdb_monitors = HMAP_INITIALIZER(&ovsdb_monitors);
 
 /*  Backend monitor.
  *
@@ -51,6 +53,7 @@ struct ovsdb_monitor {
     struct ovs_list jsonrpc_monitors;  /* Contains "jsonrpc_monitor_node"s. */
     struct ovsdb *db;
     uint64_t n_transactions;      /* Count number of committed transactions. */
+    struct hmap_node hmap_node;   /* Elements within ovsdb_monitors.  */
 };
 
 struct jsonrpc_monitor_node {
@@ -108,8 +111,8 @@ struct ovsdb_monitor_table {
 };
 
 static void ovsdb_monitor_destroy(struct ovsdb_monitor *dbmon);
-static void ovsdb_monitor_table_add_changes(struct ovsdb_monitor_table *mt,
-                                            uint64_t next_txn);
+static struct ovsdb_monitor_changes * ovsdb_monitor_table_add_changes(
+    struct ovsdb_monitor_table *mt, uint64_t next_txn);
 static struct ovsdb_monitor_changes *ovsdb_monitor_table_find_changes(
     struct ovsdb_monitor_table *mt, uint64_t unflushed);
 static void ovsdb_monitor_changes_destroy(
@@ -230,7 +233,7 @@ ovsdb_monitor_row_destroy(const struct ovsdb_monitor_table *mt,
     }
 }
 
-static void
+void
 ovsdb_monitor_add_jsonrpc_monitor(struct ovsdb_monitor *dbmon,
                                   struct ovsdb_jsonrpc_monitor *jsonrpc_monitor)
 {
@@ -255,6 +258,7 @@ ovsdb_monitor_create(struct ovsdb *db,
     dbmon->db = db;
     dbmon->n_transactions = 0;
     shash_init(&dbmon->tables);
+    hmap_node_nullify(&dbmon->hmap_node);
 
     ovsdb_monitor_add_jsonrpc_monitor(dbmon, jsonrpc_monitor);
     return dbmon;
@@ -321,7 +325,7 @@ ovsdb_monitor_table_check_duplicates(struct ovsdb_monitor *m,
     return NULL;
 }
 
-static void
+static struct ovsdb_monitor_changes *
 ovsdb_monitor_table_add_changes(struct ovsdb_monitor_table *mt,
                                 uint64_t next_txn)
 {
@@ -334,6 +338,8 @@ ovsdb_monitor_table_add_changes(struct ovsdb_monitor_table *mt,
     changes->n_refs = 1;
     hmap_init(&changes->rows);
     hmap_insert(&mt->changes, &changes->hmap_node, hash_uint64(next_txn));
+
+    return changes;
 };
 
 static struct ovsdb_monitor_changes *
@@ -362,7 +368,6 @@ ovsdb_monitor_table_untrack_changes(struct ovsdb_monitor_table *mt,
     struct ovsdb_monitor_changes *changes =
                 ovsdb_monitor_table_find_changes(mt, transaction);
     if (changes) {
-        ovs_assert(changes->transaction == transaction);
         if (--changes->n_refs == 0) {
             hmap_remove(&mt->changes, &changes->hmap_node);
             ovsdb_monitor_changes_destroy(changes);
@@ -551,9 +556,6 @@ ovsdb_monitor_compose_update(const struct ovsdb_monitor *dbmon,
                 snprintf(uuid, sizeof uuid, UUID_FMT, UUID_ARGS(&row->uuid));
                 json_object_put(table_json, uuid, row_json);
             }
-
-            hmap_remove(&changes->rows, &row->hmap_node);
-            ovsdb_monitor_row_destroy(mt, row);
         }
 
         ovsdb_monitor_table_untrack_changes(mt, prev_txn);
@@ -669,13 +671,16 @@ ovsdb_monitor_get_initial(const struct ovsdb_monitor *dbmon)
 
         if (mt->select & OJMS_INITIAL) {
             struct ovsdb_row *row;
+            struct ovsdb_monitor_changes *changes;
 
-            if (hmap_is_empty(&mt->changes)) {
-                ovsdb_monitor_table_add_changes(mt, 0);
-            }
-
-            HMAP_FOR_EACH (row, hmap_node, &mt->table->rows) {
-                ovsdb_monitor_change_cb(NULL, row, NULL, &aux);
+            changes = ovsdb_monitor_table_find_changes(mt, 0);
+            if (!changes) {
+                changes = ovsdb_monitor_table_add_changes(mt, 0);
+                HMAP_FOR_EACH (row, hmap_node, &mt->table->rows) {
+                    ovsdb_monitor_changes_update(NULL, row, mt, changes);
+                }
+            } else {
+                changes->n_refs++;
             }
         }
     }
@@ -687,6 +692,11 @@ ovsdb_monitor_remove_jsonrpc_monitor(struct ovsdb_monitor *dbmon,
 {
     struct jsonrpc_monitor_node *jm;
 
+    if (list_is_empty(&dbmon->jsonrpc_monitors)) {
+        ovsdb_monitor_destroy(dbmon);
+        return;
+    }
+
     /* Find and remove the jsonrpc monitor from the list.  */
     LIST_FOR_EACH(jm, node, &dbmon->jsonrpc_monitors) {
         if (jm->jsonrpc_monitor == jsonrpc_monitor) {
@@ -706,6 +716,101 @@ ovsdb_monitor_remove_jsonrpc_monitor(struct ovsdb_monitor *dbmon,
     OVS_NOT_REACHED();
 }
 
+static bool
+ovsdb_monitor_table_equal(const struct ovsdb_monitor_table *a,
+                          const struct ovsdb_monitor_table *b)
+{
+    size_t i;
+
+    if ((a->table != b->table) ||
+        (a->select != b->select) ||
+        (a->n_columns != b->n_columns)) {
+        return false;
+    }
+
+    for (i = 0; i < a->n_columns; i++) {
+        if ((a->columns[i].column != b->columns[i].column) ||
+            (a->columns[i].select != b->columns[i].select)) {
+            return false;
+        }
+    }
+
+    return true;
+}
+
+static bool
+ovsdb_monitor_equal(const struct ovsdb_monitor *a,
+                    const struct ovsdb_monitor *b)
+{
+    struct shash_node *node;
+
+    if (shash_count(&a->tables) != shash_count(&b->tables)) {
+        return false;
+    }
+
+    SHASH_FOR_EACH(node, &a->tables) {
+        const struct ovsdb_monitor_table *mta = node->data;
+        const struct ovsdb_monitor_table *mtb;
+
+        mtb = shash_find_data(&b->tables, node->name);
+        if (!mtb) {
+            return false;
+        }
+
+        if (!ovsdb_monitor_table_equal(mta, mtb)) {
+            return false;
+        }
+    }
+
+    return true;
+}
+
+static size_t
+ovsdb_monitor_hash(const struct ovsdb_monitor *dbmon, size_t basis)
+{
+    const struct shash_node **nodes;
+    size_t i, j, n;
+
+    nodes = shash_sort(&dbmon->tables);
+    n = shash_count(&dbmon->tables);
+
+    for (i = 0; i < n; i++) {
+        struct ovsdb_monitor_table *mt = nodes[i]->data;
+
+        basis = hash_pointer(mt->table, basis);
+        basis = hash_3words(mt->select, mt->n_columns, basis);
+
+        for (j = 0; j < mt->n_columns; j++) {
+            basis = hash_pointer(mt->columns[j].column, basis);
+            basis = hash_2words(mt->columns[j].select, basis);
+        }
+    }
+    free(nodes);
+
+    return basis;
+}
+
+struct ovsdb_monitor *
+ovsdb_monitor_add(struct ovsdb_monitor *new_dbmon)
+{
+    struct ovsdb_monitor *dbmon;
+    size_t hash;
+
+    /* New_dbmon should be associated with only one jsonrpc
+     * connections.  */
+    ovs_assert(list_is_singleton(&new_dbmon->jsonrpc_monitors));
+
+    hash = ovsdb_monitor_hash(new_dbmon, 0);
+    HMAP_FOR_EACH_WITH_HASH(dbmon, hmap_node, hash, &ovsdb_monitors) {
+        if (ovsdb_monitor_equal(dbmon,  new_dbmon)) {
+            return dbmon;
+        }
+    }
+
+    hmap_insert(&ovsdb_monitors, &new_dbmon->hmap_node, hash);
+    return new_dbmon;
+}
+
 static void
 ovsdb_monitor_destroy(struct ovsdb_monitor *dbmon)
 {
@@ -713,6 +818,10 @@ ovsdb_monitor_destroy(struct ovsdb_monitor *dbmon)
 
     list_remove(&dbmon->replica.node);
 
+    if (!hmap_node_is_null(&dbmon->hmap_node)) {
+        hmap_remove(&ovsdb_monitors, &dbmon->hmap_node);
+    }
+
     SHASH_FOR_EACH (node, &dbmon->tables) {
         struct ovsdb_monitor_table *mt = node->data;
         struct ovsdb_monitor_changes *changes, *next;