ovsdb-monitor: allow multiple of 'ovsdb_monitor_changes' in each ovsdb monitor table
[cascardo/ovs.git] / ovsdb / monitor.c
index a1aeb5f..9541f3e 100644 (file)
@@ -28,6 +28,7 @@
 #include "ovsdb.h"
 #include "row.h"
 #include "simap.h"
+#include "hash.h"
 #include "table.h"
 #include "timeval.h"
 #include "transaction.h"
@@ -47,8 +48,14 @@ static const struct ovsdb_replica_class ovsdb_jsonrpc_replica_class;
 struct ovsdb_monitor {
     struct ovsdb_replica replica;
     struct shash tables;     /* Holds "struct ovsdb_monitor_table"s. */
-    struct ovsdb_jsonrpc_monitor *jsonrpc_monitor;
+    struct ovs_list jsonrpc_monitors;  /* Contains "jsonrpc_monitor_node"s. */
     struct ovsdb *db;
+    uint64_t n_transactions;      /* Count number of committed transactions. */
+};
+
+struct jsonrpc_monitor_node {
+    struct ovsdb_jsonrpc_monitor *jsonrpc_monitor;
+    struct ovs_list node;
 };
 
 /* A particular column being monitored. */
@@ -65,6 +72,25 @@ struct ovsdb_monitor_row {
     struct ovsdb_datum *new;    /* New data, NULL for a deleted row. */
 };
 
+/* Contains 'struct ovsdb_monitor_row's for rows that have been
+ * updated but not yet flushed to all the jsonrpc connection.
+ *
+ * 'n_refs' represent the number of jsonrpc connections that have
+ * not received updates. Generate the update for the last jsonprc
+ * connection will also destroy the whole "struct ovsdb_monitor_changes"
+ * object.
+ *
+ * 'transaction' stores the first update's transaction id.
+ * */
+struct ovsdb_monitor_changes {
+    struct ovsdb_monitor_table *mt;
+    struct hmap rows;
+    int n_refs;
+    uint64_t transaction;
+    struct hmap_node hmap_node;  /* Element in ovsdb_monitor_tables' changes
+                                    hmap.  */
+};
+
 /* A particular table being monitored. */
 struct ovsdb_monitor_table {
     const struct ovsdb_table *table;
@@ -77,11 +103,20 @@ struct ovsdb_monitor_table {
     struct ovsdb_monitor_column *columns;
     size_t n_columns;
 
-    /* Contains 'struct ovsdb_monitor_row's for rows that have been
-     * updated but not yet flushed to the jsonrpc connection. */
+    /* Contains 'ovsdb_monitor_changes' indexed by 'transaction'. */
     struct hmap changes;
 };
 
+static void ovsdb_monitor_destroy(struct ovsdb_monitor *dbmon);
+static void ovsdb_monitor_table_add_changes(struct ovsdb_monitor_table *mt,
+                                            uint64_t next_txn);
+static struct ovsdb_monitor_changes *ovsdb_monitor_table_find_changes(
+    struct ovsdb_monitor_table *mt, uint64_t unflushed);
+static void ovsdb_monitor_changes_destroy(
+                                  struct ovsdb_monitor_changes *changes);
+static void ovsdb_monitor_table_track_changes(struct ovsdb_monitor_table *mt,
+                                  uint64_t unflushed);
+
 static int
 compare_ovsdb_monitor_column(const void *a_, const void *b_)
 {
@@ -98,15 +133,16 @@ ovsdb_monitor_cast(struct ovsdb_replica *replica)
     return CONTAINER_OF(replica, struct ovsdb_monitor, replica);
 }
 
-/* Finds and returns the ovsdb_monitor_row in 'mt->changes' for the
+/* Finds and returns the ovsdb_monitor_row in 'mt->changes->rows' for the
  * given 'uuid', or NULL if there is no such row. */
 static struct ovsdb_monitor_row *
-ovsdb_monitor_row_find(const struct ovsdb_monitor_table *mt,
-                       const struct uuid *uuid)
+ovsdb_monitor_changes_row_find(const struct ovsdb_monitor_changes *changes,
+                               const struct uuid *uuid)
 {
     struct ovsdb_monitor_row *row;
 
-    HMAP_FOR_EACH_WITH_HASH (row, hmap_node, uuid_hash(uuid), &mt->changes) {
+    HMAP_FOR_EACH_WITH_HASH (row, hmap_node, uuid_hash(uuid),
+                             &changes->rows) {
         if (uuid_equals(uuid, &row->uuid)) {
             return row;
         }
@@ -199,15 +235,21 @@ ovsdb_monitor_create(struct ovsdb *db,
                      struct ovsdb_jsonrpc_monitor *jsonrpc_monitor)
 {
     struct ovsdb_monitor *dbmon;
+    struct jsonrpc_monitor_node *jm;
 
     dbmon = xzalloc(sizeof *dbmon);
 
     ovsdb_replica_init(&dbmon->replica, &ovsdb_jsonrpc_replica_class);
     ovsdb_add_replica(db, &dbmon->replica);
-    dbmon->jsonrpc_monitor = jsonrpc_monitor;
+    list_init(&dbmon->jsonrpc_monitors);
     dbmon->db = db;
+    dbmon->n_transactions = 0;
     shash_init(&dbmon->tables);
 
+    jm = xzalloc(sizeof *jm);
+    jm->jsonrpc_monitor = jsonrpc_monitor;
+    list_push_back(&dbmon->jsonrpc_monitors, &jm->node);
+
     return dbmon;
 }
 
@@ -219,8 +261,8 @@ ovsdb_monitor_add_table(struct ovsdb_monitor *m,
 
     mt = xzalloc(sizeof *mt);
     mt->table = table;
-    hmap_init(&mt->changes);
     shash_add(&m->tables, table->schema->name, mt);
+    hmap_init(&mt->changes);
 }
 
 void
@@ -272,6 +314,84 @@ ovsdb_monitor_table_check_duplicates(struct ovsdb_monitor *m,
     return NULL;
 }
 
+static void
+ovsdb_monitor_table_add_changes(struct ovsdb_monitor_table *mt,
+                                uint64_t next_txn)
+{
+    struct ovsdb_monitor_changes *changes;
+
+    changes = xzalloc(sizeof *changes);
+
+    changes->transaction = next_txn;
+    changes->mt = mt;
+    changes->n_refs = 1;
+    hmap_init(&changes->rows);
+    hmap_insert(&mt->changes, &changes->hmap_node, hash_uint64(next_txn));
+};
+
+static struct ovsdb_monitor_changes *
+ovsdb_monitor_table_find_changes(struct ovsdb_monitor_table *mt,
+                                 uint64_t transaction)
+{
+    struct ovsdb_monitor_changes *changes;
+    size_t hash = hash_uint64(transaction);
+
+    HMAP_FOR_EACH_WITH_HASH(changes, hmap_node, hash, &mt->changes) {
+        if (changes->transaction == transaction) {
+            return changes;
+        }
+    }
+
+    return NULL;
+}
+
+/* Stop currently tracking changes to table 'mt' since 'transaction'.
+ *
+ * Return 'true' if the 'transaction' is being tracked. 'false' otherwise. */
+static void
+ovsdb_monitor_table_untrack_changes(struct ovsdb_monitor_table *mt,
+                                    uint64_t transaction)
+{
+    struct ovsdb_monitor_changes *changes =
+                ovsdb_monitor_table_find_changes(mt, transaction);
+    if (changes) {
+        ovs_assert(changes->transaction == transaction);
+        if (--changes->n_refs == 0) {
+            hmap_remove(&mt->changes, &changes->hmap_node);
+            ovsdb_monitor_changes_destroy(changes);
+        }
+    }
+}
+
+/* Start tracking changes to table 'mt' begins from 'transaction' inclusive.
+ */
+static void
+ovsdb_monitor_table_track_changes(struct ovsdb_monitor_table *mt,
+                                  uint64_t transaction)
+{
+    struct ovsdb_monitor_changes *changes;
+
+    changes = ovsdb_monitor_table_find_changes(mt, transaction);
+    if (changes) {
+        changes->n_refs++;
+    } else {
+        ovsdb_monitor_table_add_changes(mt, transaction);
+    }
+}
+
+static void
+ovsdb_monitor_changes_destroy(struct ovsdb_monitor_changes *changes)
+{
+    struct ovsdb_monitor_row *row, *next;
+
+    HMAP_FOR_EACH_SAFE (row, next, hmap_node, &changes->rows) {
+        hmap_remove(&changes->rows, &row->hmap_node);
+        ovsdb_monitor_row_destroy(changes->mt, row);
+    }
+    hmap_destroy(&changes->rows);
+    free(changes);
+}
+
 /* Returns JSON for a <row-update> (as described in RFC 7047) for 'row' within
  * 'mt', or NULL if no row update should be sent.
  *
@@ -362,15 +482,23 @@ ovsdb_monitor_compose_row_update(
  *
  * The caller should specify 'initial' as true if the returned JSON is going to
  * be used as part of the initial reply to a "monitor" request, false if it is
- * going to be used as part of an "update" notification. */
+ * going to be used as part of an "update" notification.
+ *
+ * 'unflushed' should point to value that is the transaction ID that did
+ * was not updated. The update contains changes between
+ * ['unflushed, ovsdb->n_transcations]. Before the function returns, this
+ * value will be updated to ovsdb->n_transactions + 1, ready for the next
+ * update.  */
 struct json *
-ovsdb_monitor_compose_table_update(
-    const struct ovsdb_monitor *dbmon, bool initial)
+ovsdb_monitor_compose_update(const struct ovsdb_monitor *dbmon,
+                             bool initial, uint64_t *unflushed)
 {
     struct shash_node *node;
     unsigned long int *changed;
     struct json *json;
     size_t max_columns;
+    uint64_t prev_txn = *unflushed;
+    uint64_t next_txn = dbmon->n_transactions + 1;
 
     max_columns = 0;
     SHASH_FOR_EACH (node, &dbmon->tables) {
@@ -384,9 +512,16 @@ ovsdb_monitor_compose_table_update(
     SHASH_FOR_EACH (node, &dbmon->tables) {
         struct ovsdb_monitor_table *mt = node->data;
         struct ovsdb_monitor_row *row, *next;
+        struct ovsdb_monitor_changes *changes;
         struct json *table_json = NULL;
 
-        HMAP_FOR_EACH_SAFE (row, next, hmap_node, &mt->changes) {
+        changes = ovsdb_monitor_table_find_changes(mt, prev_txn);
+        if (!changes) {
+            ovsdb_monitor_table_track_changes(mt, next_txn);
+            continue;
+        }
+
+        HMAP_FOR_EACH_SAFE (row, next, hmap_node, &changes->rows) {
             struct json *row_json;
 
             row_json = ovsdb_monitor_compose_row_update(
@@ -410,28 +545,25 @@ ovsdb_monitor_compose_table_update(
                 json_object_put(table_json, uuid, row_json);
             }
 
-            hmap_remove(&mt->changes, &row->hmap_node);
+            hmap_remove(&changes->rows, &row->hmap_node);
             ovsdb_monitor_row_destroy(mt, row);
         }
+
+        ovsdb_monitor_table_untrack_changes(mt, prev_txn);
+        ovsdb_monitor_table_track_changes(mt, next_txn);
     }
 
+    *unflushed = next_txn;
     free(changed);
     return json;
 }
 
 bool
-ovsdb_monitor_needs_flush(struct ovsdb_monitor *dbmon)
+ovsdb_monitor_needs_flush(struct ovsdb_monitor *dbmon,
+                          uint64_t next_transaction)
 {
-    struct shash_node *node;
-
-    SHASH_FOR_EACH (node, &dbmon->tables) {
-        struct ovsdb_monitor_table *mt = node->data;
-
-        if (!hmap_is_empty(&mt->changes)) {
-            return true;
-        }
-    }
-    return false;
+    ovs_assert(next_transaction <= dbmon->n_transactions + 1);
+    return (next_transaction <= dbmon->n_transactions);
 }
 
 void
@@ -458,6 +590,38 @@ ovsdb_monitor_init_aux(struct ovsdb_monitor_aux *aux,
     aux->mt = NULL;
 }
 
+static void
+ovsdb_monitor_changes_update(const struct ovsdb_row *old,
+                             const struct ovsdb_row *new,
+                             const struct ovsdb_monitor_table *mt,
+                             struct ovsdb_monitor_changes *changes)
+{
+    const struct uuid *uuid = ovsdb_row_get_uuid(new ? new : old);
+    struct ovsdb_monitor_row *change;
+
+    change = ovsdb_monitor_changes_row_find(changes, uuid);
+    if (!change) {
+        change = xzalloc(sizeof *change);
+        hmap_insert(&changes->rows, &change->hmap_node, uuid_hash(uuid));
+        change->uuid = *uuid;
+        change->old = clone_monitor_row_data(mt, old);
+        change->new = clone_monitor_row_data(mt, new);
+    } else {
+        if (new) {
+            update_monitor_row_data(mt, new, change->new);
+        } else {
+            free_monitor_row_data(mt, change->new);
+            change->new = NULL;
+
+            if (!change->old) {
+                /* This row was added then deleted.  Forget about it. */
+                hmap_remove(&changes->rows, &change->hmap_node);
+                free(change);
+            }
+        }
+    }
+}
+
 static bool
 ovsdb_monitor_change_cb(const struct ovsdb_row *old,
                         const struct ovsdb_row *new,
@@ -467,9 +631,8 @@ ovsdb_monitor_change_cb(const struct ovsdb_row *old,
     struct ovsdb_monitor_aux *aux = aux_;
     const struct ovsdb_monitor *m = aux->monitor;
     struct ovsdb_table *table = new ? new->table : old->table;
-    const struct uuid *uuid = ovsdb_row_get_uuid(new ? new : old);
-    struct ovsdb_monitor_row *change;
     struct ovsdb_monitor_table *mt;
+    struct ovsdb_monitor_changes *changes;
 
     if (!aux->mt || table != aux->mt->table) {
         aux->mt = shash_find_data(&m->tables, table->schema->name);
@@ -481,36 +644,17 @@ ovsdb_monitor_change_cb(const struct ovsdb_row *old,
     }
     mt = aux->mt;
 
-    change = ovsdb_monitor_row_find(mt, uuid);
-    if (!change) {
-        change = xmalloc(sizeof *change);
-        hmap_insert(&mt->changes, &change->hmap_node, uuid_hash(uuid));
-        change->uuid = *uuid;
-        change->old = clone_monitor_row_data(mt, old);
-        change->new = clone_monitor_row_data(mt, new);
-    } else {
-        if (new) {
-            update_monitor_row_data(mt, new, change->new);
-        } else {
-            free_monitor_row_data(mt, change->new);
-            change->new = NULL;
-
-            if (!change->old) {
-                /* This row was added then deleted.  Forget about it. */
-                hmap_remove(&mt->changes, &change->hmap_node);
-                free(change);
-            }
-        }
+    HMAP_FOR_EACH(changes, hmap_node, &mt->changes) {
+        ovsdb_monitor_changes_update(old, new, mt, changes);
     }
     return true;
 }
 
-struct json *
+void
 ovsdb_monitor_get_initial(const struct ovsdb_monitor *dbmon)
 {
     struct ovsdb_monitor_aux aux;
     struct shash_node *node;
-    struct json *json;
 
     ovsdb_monitor_init_aux(&aux, dbmon);
     SHASH_FOR_EACH (node, &dbmon->tables) {
@@ -519,16 +663,43 @@ ovsdb_monitor_get_initial(const struct ovsdb_monitor *dbmon)
         if (mt->select & OJMS_INITIAL) {
             struct ovsdb_row *row;
 
+            if (hmap_is_empty(&mt->changes)) {
+                ovsdb_monitor_table_add_changes(mt, 0);
+            }
+
             HMAP_FOR_EACH (row, hmap_node, &mt->table->rows) {
                 ovsdb_monitor_change_cb(NULL, row, NULL, &aux);
             }
         }
     }
-    json = ovsdb_monitor_compose_table_update(dbmon, true);
-    return json ? json : json_object_create();
 }
 
 void
+ovsdb_monitor_remove_jsonrpc_monitor(struct ovsdb_monitor *dbmon,
+                   struct ovsdb_jsonrpc_monitor *jsonrpc_monitor)
+{
+    struct jsonrpc_monitor_node *jm;
+
+    /* Find and remove the jsonrpc monitor from the list.  */
+    LIST_FOR_EACH(jm, node, &dbmon->jsonrpc_monitors) {
+        if (jm->jsonrpc_monitor == jsonrpc_monitor) {
+            list_remove(&jm->node);
+            free(jm);
+
+            /* Destroy ovsdb monitor if this is the last user.  */
+            if (list_is_empty(&dbmon->jsonrpc_monitors)) {
+                ovsdb_monitor_destroy(dbmon);
+            }
+
+            return;
+        };
+    }
+
+    /* Should never reach here. jsonrpc_monitor should be on the list.  */
+    OVS_NOT_REACHED();
+}
+
+static void
 ovsdb_monitor_destroy(struct ovsdb_monitor *dbmon)
 {
     struct shash_node *node;
@@ -537,14 +708,12 @@ ovsdb_monitor_destroy(struct ovsdb_monitor *dbmon)
 
     SHASH_FOR_EACH (node, &dbmon->tables) {
         struct ovsdb_monitor_table *mt = node->data;
-        struct ovsdb_monitor_row *row, *next;
+        struct ovsdb_monitor_changes *changes, *next;
 
-        HMAP_FOR_EACH_SAFE (row, next, hmap_node, &mt->changes) {
-            hmap_remove(&mt->changes, &row->hmap_node);
-            ovsdb_monitor_row_destroy(mt, row);
+        HMAP_FOR_EACH_SAFE (changes, next, hmap_node, &mt->changes) {
+            hmap_remove(&mt->changes, &changes->hmap_node);
+            ovsdb_monitor_changes_destroy(changes);
         }
-        hmap_destroy(&mt->changes);
-
         free(mt->columns);
         free(mt);
     }
@@ -562,6 +731,7 @@ ovsdb_monitor_commit(struct ovsdb_replica *replica,
 
     ovsdb_monitor_init_aux(&aux, m);
     ovsdb_txn_for_each_change(txn, ovsdb_monitor_change_cb, &aux);
+    m->n_transactions++;
 
     return NULL;
 }
@@ -570,9 +740,14 @@ static void
 ovsdb_monitor_destroy_callback(struct ovsdb_replica *replica)
 {
     struct ovsdb_monitor *dbmon = ovsdb_monitor_cast(replica);
-    struct ovsdb_jsonrpc_monitor *m = dbmon->jsonrpc_monitor;
+    struct jsonrpc_monitor_node *jm, *next;
 
-    ovsdb_jsonrpc_monitor_destroy(m);
+    /* Delete all front end monitors. Removing the last front
+     * end monitor will also destroy the corresponding 'ovsdb_monitor'.
+     * ovsdb monitor will also be destroied.  */
+    LIST_FOR_EACH_SAFE(jm, next, node, &dbmon->jsonrpc_monitors) {
+        ovsdb_jsonrpc_monitor_destroy(jm->jsonrpc_monitor);
+    }
 }
 
 static const struct ovsdb_replica_class ovsdb_jsonrpc_replica_class = {